diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index 83103c921..7af635e58 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -332,21 +332,29 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
TIMESTAMP_LTZ(3) NOT NULL |
它指示在数据库中进行更改的时间。 如果记录是从表的快照而不是改变流中读取的,该值将始终为0。 |
+
+ row_kind |
+ STRING NOT NULL |
+ 当前记录对应的 changelog 类型。注意:当 Source 算子选择为每条记录输出 row_kind 字段后,下游 SQL 算子在处理消息撤回时会因为这个字段不同而比对失败,
+建议只在简单的同步作业中引用该元数据列。 '+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。
+ |
+
扩展的 CREATE TABLE 示例演示了用于公开这些元数据字段的语法:
```sql
CREATE TABLE products (
- db_name STRING METADATA FROM 'database_name' VIRTUAL,
+ db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
- operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
- _id STRING, // 必须声明
- name STRING,
- weight DECIMAL(10,3),
- tags ARRAY, -- array
- price ROW, -- 嵌入式文档
- suppliers ARRAY>, -- 嵌入式文档
+ operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'row_kind' VIRTUAL,
+ _id STRING, // 必须声明
+ name STRING,
+ weight DECIMAL(10,3),
+ tags ARRAY, -- array
+ price ROW, -- 嵌入式文档
+ suppliers ARRAY>, -- 嵌入式文档
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
index 9ffbf184d..03aba394a 100644
--- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
@@ -357,21 +357,29 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
TIMESTAMP_LTZ(3) NOT NULL |
It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |
+
+ row_kind |
+ STRING NOT NULL |
+ It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if
+the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
+ '+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message. |
+
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
```sql
CREATE TABLE products (
- db_name STRING METADATA FROM 'database_name' VIRTUAL,
+ db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
- operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
- _id STRING, // must be declared
- name STRING,
- weight DECIMAL(10,3),
- tags ARRAY, -- array
- price ROW, -- embedded document
- suppliers ARRAY>, -- embedded documents
+ operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'row_kind' VIRTUAL,
+ _id STRING, // must be declared
+ name STRING,
+ weight DECIMAL(10,3),
+ tags ARRAY, -- array
+ price ROW, -- embedded document
+ suppliers ARRAY>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
index c2baf021c..581873d20 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
@@ -19,7 +19,9 @@ package org.apache.flink.cdc.connectors.mongodb.table;
import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
+import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
@@ -81,6 +83,28 @@ public enum MongoDBReadableMetadata {
return TimestampData.fromEpochMillis(
(Long) source.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
+ }),
+
+ /**
+ * It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
+ * message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
+ */
+ ROW_KIND(
+ "row_kind",
+ DataTypes.STRING().notNull(),
+ new RowDataMetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(RowData rowData) {
+ return StringData.fromString(rowData.getRowKind().shortString());
+ }
+
+ @Override
+ public Object read(SourceRecord record) {
+ throw new UnsupportedOperationException(
+ "Please call read(RowData rowData) method instead.");
+ }
});
private final String key;
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
index eebaacafc..4e4bc073f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
@@ -55,6 +55,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -590,6 +591,148 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
return records;
}
+ @Test
+ public void testMetadataColumns() throws Exception {
+ testMongoDBParallelSourceWithMetadataColumns(
+ DEFAULT_PARALLELISM, new String[] {"customers"}, true);
+ }
+
+ private void testMongoDBParallelSourceWithMetadataColumns(
+ int parallelism, String[] captureCustomerCollections, boolean skipSnapshotBackfill)
+ throws Exception {
+ String customerDatabase =
+ "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
+
+ // A - enable system-level fulldoc pre & post image feature
+ mongoContainer.executeCommand(
+ "use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
+
+ // B - enable collection-level fulldoc pre & post image for change capture collection
+ for (String collectionName : captureCustomerCollections) {
+ mongoContainer.executeCommandInDatabase(
+ String.format(
+ "db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
+ collectionName, collectionName),
+ customerDatabase);
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE customers ("
+ + " _id STRING NOT NULL,"
+ + " cid BIGINT NOT NULL,"
+ + " name STRING,"
+ + " address STRING,"
+ + " phone_number STRING,"
+ + " database_name STRING METADATA VIRTUAL,"
+ + " collection_name STRING METADATA VIRTUAL,"
+ + " row_kind STRING METADATA VIRTUAL,"
+ + " primary key (_id) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mongodb-cdc',"
+ + " 'scan.incremental.snapshot.enabled' = '%s',"
+ + " 'hosts' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database' = '%s',"
+ + " 'collection' = '%s',"
+ + " 'heartbeat.interval.ms' = '500',"
+ + " 'scan.full-changelog' = 'true',"
+ + " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ + ")",
+ parallelismSnapshot ? "true" : "false",
+ mongoContainer.getHostAndPort(),
+ FLINK_USER,
+ FLINK_USER_PASSWORD,
+ customerDatabase,
+ getCollectionNameRegex(customerDatabase, captureCustomerCollections),
+ skipSnapshotBackfill);
+
+ mongoContainer.executeCommandFileInDatabase("customer", customerDatabase);
+
+ // first step: check the snapshot data
+ List snapshotForSingleTable =
+ Stream.of(
+ "+I[%s, %s, +I, 101, user_1, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 102, user_2, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 103, user_3, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 109, user_4, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 110, user_5, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 111, user_6, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 118, user_7, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 121, user_8, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 123, user_9, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1009, user_10, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1010, user_11, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1011, user_12, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1012, user_13, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1013, user_14, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1014, user_15, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1015, user_16, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1016, user_17, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1017, user_18, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1018, user_19, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 1019, user_20, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 2000, user_21, Shanghai, 123567891234]")
+ .map(s -> String.format(s, customerDatabase, captureCustomerCollections[0]))
+ .collect(Collectors.toList());
+
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult =
+ tEnv.executeSql(
+ "select database_name, collection_name, row_kind, "
+ + "cid, name, address, phone_number from customers");
+ CloseableIterator iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+ List expectedSnapshotData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerCollections.length; i++) {
+ expectedSnapshotData.addAll(snapshotForSingleTable);
+ }
+
+ assertEqualsInAnyOrder(
+ expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
+
+ // second step: check the change stream data
+ for (String collectionName : captureCustomerCollections) {
+ makeFirstPartChangeStreamEvents(
+ mongodbClient.getDatabase(customerDatabase), collectionName);
+ }
+ for (String collectionName : captureCustomerCollections) {
+ makeSecondPartChangeStreamEvents(
+ mongodbClient.getDatabase(customerDatabase), collectionName);
+ }
+
+ List changeEventsForSingleTable =
+ Stream.of(
+ "-U[%s, %s, -U, 101, user_1, Shanghai, 123567891234]",
+ "+U[%s, %s, +U, 101, user_1, Hangzhou, 123567891234]",
+ "-D[%s, %s, -D, 102, user_2, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 102, user_2, Shanghai, 123567891234]",
+ "-U[%s, %s, -U, 103, user_3, Shanghai, 123567891234]",
+ "+U[%s, %s, +U, 103, user_3, Hangzhou, 123567891234]",
+ "-U[%s, %s, -U, 1010, user_11, Shanghai, 123567891234]",
+ "+U[%s, %s, +U, 1010, user_11, Hangzhou, 123567891234]",
+ "+I[%s, %s, +I, 2001, user_22, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 2002, user_23, Shanghai, 123567891234]",
+ "+I[%s, %s, +I, 2003, user_24, Shanghai, 123567891234]")
+ .map(s -> String.format(s, customerDatabase, captureCustomerCollections[0]))
+ .collect(Collectors.toList());
+ List expectedChangeStreamData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerCollections.length; i++) {
+ expectedChangeStreamData.addAll(changeEventsForSingleTable);
+ }
+ List actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());
+ assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData);
+ tableResult.getJobClient().get().cancel().get();
+ }
+
private void testMongoDBParallelSource(
MongoDBTestUtils.FailoverType failoverType,
MongoDBTestUtils.FailoverPhase failoverPhase,
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
index b8b024238..921da10f3 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
@@ -89,7 +89,8 @@ public class MongoDBTableFactoryTest {
Column.physical("eee", DataTypes.TIMESTAMP(3)),
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata(
- "_database_name", DataTypes.STRING(), "database_name", true)),
+ "_database_name", DataTypes.STRING(), "database_name", true),
+ Column.metadata("_row_kind", DataTypes.STRING(), "row_kind", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("_id")));
@@ -220,7 +221,7 @@ public class MongoDBTableFactoryTest {
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource;
mongoDBSource.applyReadableMetadata(
- Arrays.asList("op_ts", "database_name"),
+ Arrays.asList("op_ts", "database_name", "row_kind"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = mongoDBSource.copy();
@@ -252,7 +253,7 @@ public class MongoDBTableFactoryTest {
SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
- expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
+ expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "row_kind");
assertEquals(expectedSource, actualSource);