From c52ccfa5c66961a2bfd4758a8afd23db11f6bd53 Mon Sep 17 00:00:00 2001 From: Kunni Date: Thu, 9 Nov 2023 16:05:09 +0800 Subject: [PATCH] [3.0][cdc-common] Add SchemaChangeEvent implementations (#2664) --- .../cdc/common/event/AddColumnEvent.java | 163 ++++++++++++++++++ .../common/event/AlterColumnTypeEvent.java | 82 +++++++++ .../cdc/common/event/CreateTableEvent.java | 78 +++++++++ .../cdc/common/event/DropColumnEvent.java | 84 +++++++++ .../cdc/common/event/RenameColumnEvent.java | 75 ++++++++ .../cdc/common/event/SchemaChangeEvent.java | 4 +- .../ververica/cdc/common/event/TableId.java | 3 +- 7 files changed, 487 insertions(+), 2 deletions(-) create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AddColumnEvent.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AlterColumnTypeEvent.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/event/CreateTableEvent.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DropColumnEvent.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/event/RenameColumnEvent.java diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AddColumnEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AddColumnEvent.java new file mode 100644 index 000000000..9799c955d --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AddColumnEvent.java @@ -0,0 +1,163 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.schema.Column; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.List; +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code ADD COLUMN} DDL, which may contain the + * lenient column type changes. + */ +@PublicEvolving +public final class AddColumnEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + private final List addedColumns; + + public AddColumnEvent(TableId tableId, List addedColumns) { + this.tableId = tableId; + this.addedColumns = addedColumns; + } + + /** Returns the added columns. */ + public List getAddedColumns() { + return addedColumns; + } + + /** relative Position of column. */ + public enum ColumnPosition implements Serializable { + BEFORE, + AFTER, + FIRST, + LAST + } + + /** represents result of an ADD COLUMN DDL that may change column sequence. */ + public static class ColumnWithPosition implements Serializable { + + /** The added column. */ + private final Column addColumn; + + /** The position of the added column. */ + private final ColumnPosition position; + + /** The added column lies in the position relative to this column. */ + private final @Nullable Column existingColumn; + + /** In the default scenario, we add fields at the end of the column. */ + public ColumnWithPosition(Column addColumn) { + this.addColumn = addColumn; + position = ColumnPosition.LAST; + existingColumn = null; + } + + public ColumnWithPosition( + Column addColumn, ColumnPosition position, Column existingColumn) { + this.addColumn = addColumn; + this.position = position; + this.existingColumn = existingColumn; + } + + public Column getAddColumn() { + return addColumn; + } + + public ColumnPosition getPosition() { + return position; + } + + public Column getExistingColumn() { + return existingColumn; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ColumnWithPosition that = (ColumnWithPosition) o; + return Objects.equals(addColumn, that.addColumn) + && Objects.equals(position, that.position) + && Objects.equals(existingColumn, that.existingColumn); + } + + @Override + public int hashCode() { + return Objects.hash(addColumn, position, existingColumn); + } + + @Override + public String toString() { + return "ColumnWithPosition{" + + "column=" + + addColumn + + ", position=" + + position + + ", existingColumn=" + + existingColumn + + '}'; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + AddColumnEvent that = (AddColumnEvent) o; + return Objects.equals(tableId, that.tableId) + && Objects.equals(addedColumns, that.addedColumns); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, addedColumns); + } + + @Override + public String toString() { + return "AddColumnEvent{" + "tableId=" + tableId + ", addedColumns=" + addedColumns + '}'; + } + + @Override + public TableId tableId() { + return tableId; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AlterColumnTypeEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AlterColumnTypeEvent.java new file mode 100644 index 000000000..b7ec113d6 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AlterColumnTypeEvent.java @@ -0,0 +1,82 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import com.ververica.cdc.common.types.DataType; + +import java.util.Map; +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the + * lenient column type changes. + */ +public class AlterColumnTypeEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + /** key => column name, value => column type after changing. */ + private final Map typeMapping; + + public AlterColumnTypeEvent(TableId tableId, Map typeMapping) { + this.tableId = tableId; + this.typeMapping = typeMapping; + } + + /** Returns the type mapping. */ + public Map getTypeMapping() { + return typeMapping; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + AlterColumnTypeEvent that = (AlterColumnTypeEvent) o; + return Objects.equals(tableId, that.tableId) + && Objects.equals(typeMapping, that.typeMapping); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, typeMapping); + } + + @Override + public String toString() { + return "AlterColumnTypeEvent{" + + "tableId=" + + tableId + + ", nameMapping=" + + typeMapping + + '}'; + } + + @Override + public TableId tableId() { + return tableId; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/CreateTableEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/CreateTableEvent.java new file mode 100644 index 000000000..1c3e1669b --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/CreateTableEvent.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.schema.Schema; +import com.ververica.cdc.common.source.DataSource; + +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code CREATE TABLE} DDL. this will be sent by + * {@link DataSource} before all {@link DataChangeEvent} with the same tableId + */ +@PublicEvolving +public class CreateTableEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + private final Schema schema; + + public CreateTableEvent(TableId tableId, Schema schema) { + this.tableId = tableId; + this.schema = schema; + } + + /** Returns the table schema. */ + public Schema getSchema() { + return schema; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + CreateTableEvent that = (CreateTableEvent) o; + return Objects.equals(tableId, that.tableId) && Objects.equals(schema, that.schema); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, schema); + } + + @Override + public String toString() { + return "CreateTableEvent{" + "tableId=" + tableId + ", schema=" + schema + '}'; + } + + @Override + public TableId tableId() { + return tableId; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DropColumnEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DropColumnEvent.java new file mode 100644 index 000000000..cf0da7664 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DropColumnEvent.java @@ -0,0 +1,84 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.schema.Column; + +import java.util.List; +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code DROP COLUMN} DDL, which may contain the + * lenient column type changes. + */ +@PublicEvolving +public class DropColumnEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + private final List droppedColumns; + + public DropColumnEvent(TableId tableId, List droppedColumns) { + this.tableId = tableId; + this.droppedColumns = droppedColumns; + } + + /** Returns the dropped columns. */ + public List getDroppedColumns() { + return droppedColumns; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + DropColumnEvent that = (DropColumnEvent) o; + return Objects.equals(tableId, that.tableId) + && Objects.equals(droppedColumns, that.droppedColumns); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, droppedColumns); + } + + @Override + public String toString() { + return "DropColumnEvent{" + + "tableId=" + + tableId + + ", droppedColumns=" + + droppedColumns + + '}'; + } + + @Override + public TableId tableId() { + return tableId; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/RenameColumnEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/RenameColumnEvent.java new file mode 100644 index 000000000..b3b1c3954 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/RenameColumnEvent.java @@ -0,0 +1,75 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import java.util.Map; +import java.util.Objects; + +/** + * A {@link SchemaChangeEvent} that represents an {@code RENAME COLUMN} DDL, which may contain the + * lenient column type changes. + */ +public class RenameColumnEvent implements SchemaChangeEvent { + + private static final long serialVersionUID = 1L; + + private final TableId tableId; + + /** key => column name before changing, value => column name after changing. */ + private final Map nameMapping; + + public RenameColumnEvent(TableId tableId, Map nameMapping) { + this.tableId = tableId; + this.nameMapping = nameMapping; + } + + /** Returns the name mapping. */ + public Map getNameMapping() { + return nameMapping; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + RenameColumnEvent that = (RenameColumnEvent) o; + return Objects.equals(tableId, that.tableId) + && Objects.equals(nameMapping, that.nameMapping); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, nameMapping); + } + + @Override + public String toString() { + return "RenameColumnEvent{" + "tableId=" + tableId + ", nameMapping=" + nameMapping + '}'; + } + + @Override + public TableId tableId() { + return tableId; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/SchemaChangeEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/SchemaChangeEvent.java index dfac6eb1d..675040379 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/SchemaChangeEvent.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/SchemaChangeEvent.java @@ -18,9 +18,11 @@ package com.ververica.cdc.common.event; import org.apache.flink.annotation.PublicEvolving; +import java.io.Serializable; + /** * Class {@code SchemaChangeEvent} represents the changes in the table structure of the external * system, such as CREATE, DROP, RENAME and so on. */ @PublicEvolving -public interface SchemaChangeEvent extends ChangeEvent {} +public interface SchemaChangeEvent extends ChangeEvent, Serializable {} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java index 5b0b708c3..6554339f3 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import javax.annotation.Nullable; +import java.io.Serializable; import java.util.Objects; /** @@ -43,7 +44,7 @@ import java.util.Objects; * */ @PublicEvolving -public class TableId { +public class TableId implements Serializable { @Nullable private final String namespace; @Nullable private final String schemaName;