diff --git a/docs/content/connectors/mysql-cdc(ZH).md b/docs/content/connectors/mysql-cdc(ZH).md
index bd8e1036e..52685823b 100644
--- a/docs/content/connectors/mysql-cdc(ZH).md
+++ b/docs/content/connectors/mysql-cdc(ZH).md
@@ -369,6 +369,11 @@ Flink SQL> SELECT * FROM orders;
TIMESTAMP_LTZ(3) NOT NULL |
当前记录表在数据库中更新的时间。 如果从表的快照而不是 binlog 读取记录,该值将始终为0。 |
+
+ op |
+ STRING NOT NULL |
+ 当前记录对应的操作类型。 '+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。 |
+
@@ -378,6 +383,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
@@ -402,6 +408,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md
index cd2bd371c..1f4be19d3 100644
--- a/docs/content/connectors/mysql-cdc.md
+++ b/docs/content/connectors/mysql-cdc.md
@@ -377,6 +377,11 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
TIMESTAMP_LTZ(3) NOT NULL |
It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0. |
+
+ op |
+ STRING NOT NULL |
+ It indicates the operation type of the row. '+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message. |
+
@@ -386,6 +391,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
@@ -410,6 +416,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'op' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/AppendMetadataCollector.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/AppendMetadataCollector.java
index cb8c58f3a..e653c4195 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/AppendMetadataCollector.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/AppendMetadataCollector.java
@@ -44,7 +44,16 @@ public final class AppendMetadataCollector implements Collector, Serial
public void collect(RowData physicalRow) {
GenericRowData metaRow = new GenericRowData(metadataConverters.length);
for (int i = 0; i < metadataConverters.length; i++) {
- Object meta = metadataConverters[i].read(inputRecord);
+ MetadataConverter metadataConverter = metadataConverters[i];
+ Object meta;
+ if (metadataConverter instanceof MetadataWithRowDataConverter) {
+ meta =
+ ((MetadataWithRowDataConverter) metadataConverter)
+ .read(inputRecord, physicalRow);
+ } else {
+ meta = metadataConverter.read(inputRecord);
+ }
+
metaRow.setField(i, meta);
}
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/MetadataWithRowDataConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/MetadataWithRowDataConverter.java
new file mode 100644
index 000000000..c1542193a
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/MetadataWithRowDataConverter.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2023 Ververica Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.ververica.cdc.debezium.table;
+
+import org.apache.flink.table.data.RowData;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * A converter converts {@link SourceRecord} metadata and {@link RowData} into Flink internal data
+ * structures.
+ */
+public interface MetadataWithRowDataConverter extends MetadataConverter {
+ Object read(SourceRecord record, RowData rowData);
+
+ default Object read(SourceRecord record) {
+ throw new UnsupportedOperationException(
+ "This method should never be called, please call the read(SourceRecord, RowData) method instead.");
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java
index fee3ef62f..e34980e7c 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlReadableMetadata.java
@@ -17,11 +17,13 @@
package com.ververica.cdc.connectors.mysql.table;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.table.MetadataWithRowDataConverter;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Struct;
@@ -78,6 +80,22 @@ public enum MySqlReadableMetadata {
return TimestampData.fromEpochMillis(
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
+ }),
+
+ /**
+ * It indicates the operation type of the row. '+I' means INSERT message, '-D' means DELETE
+ * message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
+ */
+ OP_TYPE(
+ "op",
+ DataTypes.STRING().notNull(),
+ new MetadataWithRowDataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record, RowData rowData) {
+ return StringData.fromString(rowData.getRowKind().shortString());
+ }
});
private final String key;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
index 931d6734c..dbc871342 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java
@@ -933,6 +933,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"CREATE TABLE mysql_users ("
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " table_name STRING METADATA VIRTUAL,"
+ + " op STRING METADATA FROM 'op' VIRTUAL,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
@@ -967,6 +968,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"CREATE TABLE sink ("
+ " database_name STRING,"
+ " table_name STRING,"
+ + " op STRING,"
+ " `id` DECIMAL(20, 0) NOT NULL,"
+ " name STRING,"
+ " address STRING,"
@@ -1004,15 +1006,15 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
List expected =
Stream.of(
- "+I[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
- "+I[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]",
- "+I[%s, user_table_1_2, 200, user_200, Wuhan, 123567891234, null, null]",
- "+I[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]",
- "+U[%s, user_table_1_1, 300, user_300, Beijing, 123567891234, user_300@foo.com, null]",
- "+U[%s, user_table_1_2, 121, user_121, Shanghai, 88888888, null, null]",
- "-D[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
- "-U[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]",
- "-U[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]")
+ "+I[%s, user_table_1_1, +I, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
+ "+I[%s, user_table_1_2, +I, 121, user_121, Shanghai, 123567891234, null, null]",
+ "+I[%s, user_table_1_2, +I, 200, user_200, Wuhan, 123567891234, null, null]",
+ "+I[%s, user_table_1_1, +I, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]",
+ "+U[%s, user_table_1_1, +U, 300, user_300, Beijing, 123567891234, user_300@foo.com, null]",
+ "+U[%s, user_table_1_2, +U, 121, user_121, Shanghai, 88888888, null, null]",
+ "-D[%s, user_table_1_1, -D, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
+ "-U[%s, user_table_1_1, -U, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]",
+ "-U[%s, user_table_1_2, -U, 121, user_121, Shanghai, 123567891234, null, null]")
.map(s -> String.format(s, userDatabase1.getDatabaseName()))
.sorted()
.collect(Collectors.toList());