[FLINK-36683][cdc-connector][mongo] Support metadata 'row_kind' virtual column

pull/3705/head
Runkang He 3 months ago
parent 7f6d9115ae
commit f65ffdb403

@ -332,21 +332,29 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>它指示在数据库中进行更改的时间。 <br>如果记录是从表的快照而不是改变流中读取的该值将始终为0。</td>
</tr>
<tr>
<td>row_kind</td>
<td>STRING NOT NULL</td>
<td>当前记录对应的 changelog 类型。注意:当 Source 算子选择为每条记录输出 row_kind 字段后,下游 SQL 算子在处理消息撤回时会因为这个字段不同而比对失败,
建议只在简单的同步作业中引用该元数据列。<br>'+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示 UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。
</td>
</tr>
</tbody>
</table>
扩展的 CREATE TABLE 示例演示了用于公开这些元数据字段的语法:
```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
_id STRING, // 必须声明
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
_id STRING, // 必须声明
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',

@ -357,21 +357,29 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
</tr>
<tr>
<td>row_kind</td>
<td>STRING NOT NULL</td>
<td>It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if
the source operator chooses to output the 'row_kind' column for each record. It is recommended to use this metadata column only in simple synchronization jobs.
<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
</tr>
</tbody>
</table>
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
```sql
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',

@ -19,7 +19,9 @@ package org.apache.flink.cdc.connectors.mongodb.table;
import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
@ -81,6 +83,28 @@ public enum MongoDBReadableMetadata {
return TimestampData.fromEpochMillis(
(Long) source.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
}),
/**
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
*/
ROW_KIND(
"row_kind",
DataTypes.STRING().notNull(),
new RowDataMetadataConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object read(RowData rowData) {
return StringData.fromString(rowData.getRowKind().shortString());
}
@Override
public Object read(SourceRecord record) {
throw new UnsupportedOperationException(
"Please call read(RowData rowData) method instead.");
}
});
private final String key;

@ -55,6 +55,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@ -590,6 +591,148 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
return records;
}
@Test
public void testMetadataColumns() throws Exception {
testMongoDBParallelSourceWithMetadataColumns(
DEFAULT_PARALLELISM, new String[] {"customers"}, true);
}
private void testMongoDBParallelSourceWithMetadataColumns(
int parallelism, String[] captureCustomerCollections, boolean skipSnapshotBackfill)
throws Exception {
String customerDatabase =
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
// A - enable system-level fulldoc pre & post image feature
mongoContainer.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
// B - enable collection-level fulldoc pre & post image for change capture collection
for (String collectionName : captureCustomerCollections) {
mongoContainer.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
collectionName, collectionName),
customerDatabase);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
String.format(
"CREATE TABLE customers ("
+ " _id STRING NOT NULL,"
+ " cid BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " database_name STRING METADATA VIRTUAL,"
+ " collection_name STRING METADATA VIRTUAL,"
+ " row_kind STRING METADATA VIRTUAL,"
+ " primary key (_id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mongodb-cdc',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'hosts' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database' = '%s',"
+ " 'collection' = '%s',"
+ " 'heartbeat.interval.ms' = '500',"
+ " 'scan.full-changelog' = 'true',"
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
+ ")",
parallelismSnapshot ? "true" : "false",
mongoContainer.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
getCollectionNameRegex(customerDatabase, captureCustomerCollections),
skipSnapshotBackfill);
mongoContainer.executeCommandFileInDatabase("customer", customerDatabase);
// first step: check the snapshot data
List<String> snapshotForSingleTable =
Stream.of(
"+I[%s, %s, +I, 101, user_1, Shanghai, 123567891234]",
"+I[%s, %s, +I, 102, user_2, Shanghai, 123567891234]",
"+I[%s, %s, +I, 103, user_3, Shanghai, 123567891234]",
"+I[%s, %s, +I, 109, user_4, Shanghai, 123567891234]",
"+I[%s, %s, +I, 110, user_5, Shanghai, 123567891234]",
"+I[%s, %s, +I, 111, user_6, Shanghai, 123567891234]",
"+I[%s, %s, +I, 118, user_7, Shanghai, 123567891234]",
"+I[%s, %s, +I, 121, user_8, Shanghai, 123567891234]",
"+I[%s, %s, +I, 123, user_9, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1009, user_10, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1010, user_11, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1011, user_12, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1012, user_13, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1013, user_14, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1014, user_15, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1015, user_16, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1016, user_17, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1017, user_18, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1018, user_19, Shanghai, 123567891234]",
"+I[%s, %s, +I, 1019, user_20, Shanghai, 123567891234]",
"+I[%s, %s, +I, 2000, user_21, Shanghai, 123567891234]")
.map(s -> String.format(s, customerDatabase, captureCustomerCollections[0]))
.collect(Collectors.toList());
tEnv.executeSql(sourceDDL);
TableResult tableResult =
tEnv.executeSql(
"select database_name, collection_name, row_kind, "
+ "cid, name, address, phone_number from customers");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
for (int i = 0; i < captureCustomerCollections.length; i++) {
expectedSnapshotData.addAll(snapshotForSingleTable);
}
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
// second step: check the change stream data
for (String collectionName : captureCustomerCollections) {
makeFirstPartChangeStreamEvents(
mongodbClient.getDatabase(customerDatabase), collectionName);
}
for (String collectionName : captureCustomerCollections) {
makeSecondPartChangeStreamEvents(
mongodbClient.getDatabase(customerDatabase), collectionName);
}
List<String> changeEventsForSingleTable =
Stream.of(
"-U[%s, %s, -U, 101, user_1, Shanghai, 123567891234]",
"+U[%s, %s, +U, 101, user_1, Hangzhou, 123567891234]",
"-D[%s, %s, -D, 102, user_2, Shanghai, 123567891234]",
"+I[%s, %s, +I, 102, user_2, Shanghai, 123567891234]",
"-U[%s, %s, -U, 103, user_3, Shanghai, 123567891234]",
"+U[%s, %s, +U, 103, user_3, Hangzhou, 123567891234]",
"-U[%s, %s, -U, 1010, user_11, Shanghai, 123567891234]",
"+U[%s, %s, +U, 1010, user_11, Hangzhou, 123567891234]",
"+I[%s, %s, +I, 2001, user_22, Shanghai, 123567891234]",
"+I[%s, %s, +I, 2002, user_23, Shanghai, 123567891234]",
"+I[%s, %s, +I, 2003, user_24, Shanghai, 123567891234]")
.map(s -> String.format(s, customerDatabase, captureCustomerCollections[0]))
.collect(Collectors.toList());
List<String> expectedChangeStreamData = new ArrayList<>();
for (int i = 0; i < captureCustomerCollections.length; i++) {
expectedChangeStreamData.addAll(changeEventsForSingleTable);
}
List<String> actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());
assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData);
tableResult.getJobClient().get().cancel().get();
}
private void testMongoDBParallelSource(
MongoDBTestUtils.FailoverType failoverType,
MongoDBTestUtils.FailoverPhase failoverPhase,

@ -89,7 +89,8 @@ public class MongoDBTableFactoryTest {
Column.physical("eee", DataTypes.TIMESTAMP(3)),
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata(
"_database_name", DataTypes.STRING(), "database_name", true)),
"_database_name", DataTypes.STRING(), "database_name", true),
Column.metadata("_row_kind", DataTypes.STRING(), "row_kind", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("_id")));
@ -220,7 +221,7 @@ public class MongoDBTableFactoryTest {
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource;
mongoDBSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name"),
Arrays.asList("op_ts", "database_name", "row_kind"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = mongoDBSource.copy();
@ -252,7 +253,7 @@ public class MongoDBTableFactoryTest {
SCAN_NEWLY_ADDED_TABLE_ENABLED_DEFAULT);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "row_kind");
assertEquals(expectedSource, actualSource);

Loading…
Cancel
Save