diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/ChangeEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/ChangeEvent.java new file mode 100644 index 000000000..e42b276ac --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/ChangeEvent.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Class {@code ChangeEvent} represents the change events of external systems, including {@link + * DataChangeEvent} and {@link SchemaChangeEvent}. + */ +@PublicEvolving +public interface ChangeEvent extends Event { + + /** Describes the database table corresponding to the occurrence of a change event. */ + TableId tableId(); +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java new file mode 100644 index 000000000..2f47403a7 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +/** + * Class {@code DataChangeEvent} represents the data change events of external systems, such as + * INSERT, UPDATE, DELETE and so on. + */ +@PublicEvolving +public interface DataChangeEvent extends ChangeEvent { + + /** Describes the record of data before change. */ + DataRecord before(); + + /** Describes the record of data after change. */ + DataRecord after(); + + /** Describes the operation type of the change event. e.g. INSERT, UPDATE, REPLACE, DELETE. */ + OperationType op(); + + /** Optional, describes the metadata of the change event. e.g. MySQL binlog file name, pos. */ + Map meta(); +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataRecord.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataRecord.java new file mode 100644 index 000000000..12214f410 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataRecord.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +/** Class {@code DataRecord} describes the data of changed record in the external system. */ +@PublicEvolving +public interface DataRecord { + + /** Returns the number of fields in this row. */ + int getArity(); + + // ------------------------------------------------------------------------------------------ + // Read-only accessor methods + // ------------------------------------------------------------------------------------------ + + /** Returns true if the field is null at the given position. */ + boolean isNullAt(int pos); + + /** Returns the boolean value at the given position. */ + boolean getBoolean(int pos); + + /** Returns the byte value at the given position. */ + byte getByte(int pos); + + /** Returns the short value at the given position. */ + short getShort(int pos); + + /** Returns the integer value at the given position. */ + int getInt(int pos); + + /** Returns the long value at the given position. */ + long getLong(int pos); + + /** Returns the float value at the given position. */ + float getFloat(int pos); + + /** Returns the double value at the given position. */ + double getDouble(int pos); + + /** Returns the string value at the given position. */ + String getString(int pos); + + /** Returns the binary value at the given position. */ + byte[] getBinary(int pos); + + // TODO: add more methods for other types +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/Event.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/Event.java new file mode 100644 index 000000000..8eda1209b --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/Event.java @@ -0,0 +1,26 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Class {@code Event} is the super interface defines the events of external systems flowing into + * Flink CDC. + */ +@PublicEvolving +public interface Event {} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/OperationType.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/OperationType.java new file mode 100644 index 000000000..343eaed4f --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/OperationType.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Enum {@code OperationType} describes the type of operation that the data change event reports. + */ +@PublicEvolving +public enum OperationType { + INSERT, + UPDATE, + REPLACE, + DELETE +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/SchemaChangeEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/SchemaChangeEvent.java new file mode 100644 index 000000000..dfac6eb1d --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/SchemaChangeEvent.java @@ -0,0 +1,26 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Class {@code SchemaChangeEvent} represents the changes in the table structure of the external + * system, such as CREATE, DROP, RENAME and so on. + */ +@PublicEvolving +public interface SchemaChangeEvent extends ChangeEvent {} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java new file mode 100644 index 000000000..5b0b708c3 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/TableId.java @@ -0,0 +1,133 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.event; + +import org.apache.flink.annotation.PublicEvolving; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * The unique identifier used to represent the path of external table or external data collection, + * all external system data collection could map to a dedicated {@code TableId}. + * + * + * + *

Connectors need to establish the mapping between {@code TableId} and external data collection + * object path. For example, + * + *

+ */ +@PublicEvolving +public class TableId { + + @Nullable private final String namespace; + @Nullable private final String schemaName; + private final String tableName; + + private TableId(@Nullable String namespace, @Nullable String schemaName, String tableName) { + this.namespace = namespace; + this.schemaName = schemaName; + this.tableName = Objects.requireNonNull(tableName); + } + + /** The mapping relationship for external systems. e.g. Oracle (database, schema, table). */ + public static TableId tableId(String namespace, String schemaName, String tableName) { + return new TableId( + Objects.requireNonNull(namespace), Objects.requireNonNull(schemaName), tableName); + } + + /** The mapping relationship for external systems. e.g. MySQL (database, table). */ + public static TableId tableId(String schemaName, String tableName) { + return new TableId(null, Objects.requireNonNull(schemaName), tableName); + } + + /** The mapping relationship for external systems. e.g. Kafka (topic). */ + public static TableId tableId(String tableName) { + return new TableId(null, null, tableName); + } + + public static TableId parse(String tableId) { + String[] parts = Objects.requireNonNull(tableId).split("\\."); + if (parts.length == 3) { + return tableId(parts[0], parts[1], parts[2]); + } else if (parts.length == 2) { + return tableId(parts[0], parts[1]); + } else if (parts.length == 1) { + return tableId(parts[0]); + } + throw new IllegalArgumentException("Invalid tableId: " + tableId); + } + + public String identifier() { + if (namespace == null || namespace.isEmpty()) { + if (schemaName == null || schemaName.isEmpty()) { + return tableName; + } + return schemaName + "." + tableName; + } + return namespace + "." + schemaName + "." + tableName; + } + + @Nullable + public String getNamespace() { + return namespace; + } + + @Nullable + public String getSchemaName() { + return schemaName; + } + + public String getTableName() { + return tableName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableId that = (TableId) o; + return Objects.equals(namespace, that.namespace) + && Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(namespace, schemaName, tableName); + } + + @Override + public String toString() { + return identifier(); + } +}