diff --git a/docs/content/connectors/mongodb-cdc.md b/docs/content/connectors/mongodb-cdc.md index 607e23134..5678a9097 100644 --- a/docs/content/connectors/mongodb-cdc.md +++ b/docs/content/connectors/mongodb-cdc.md @@ -250,6 +250,41 @@ Connector Options Note: `heartbeat.interval.ms` is highly recommended to set a proper value larger than 0 **if the collection changes slowly**. The heartbeat event can pushing the `resumeToken` forward to avoid `resumeToken` being expired when we recover the Flink job from a checkpoint or savepoint. +Available Metadata +---------------- + +The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. + +
+ + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDataTypeDescription
database_nameSTRING NOT NULLName of the database that contain the row.
collection_nameSTRING NOT NULLName of the collection that contain the row.
op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0.
+
+ + Features -------- diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java index a055a5f84..147f691f7 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java @@ -46,8 +46,6 @@ public class MongoDBConnectorSourceTask extends SourceTask { private static final String TRUE = "true"; - private static final String CLUSTER_TIME_FIELD = "clusterTime"; - private final MongoSourceTask target; private final Field isCopyingField; @@ -159,9 +157,11 @@ public class MongoDBConnectorSourceTask extends SourceTask { private void markRecordTimestamp(SourceRecord record) { final Struct value = (Struct) record.value(); final Struct source = new Struct(value.schema().field(Envelope.FieldName.SOURCE).schema()); - long timestamp = System.currentTimeMillis(); - if (value.schema().field(CLUSTER_TIME_FIELD) != null) { - String clusterTime = value.getString(CLUSTER_TIME_FIELD); + // It indicates the time that the change was made in the database. If the record is read + // from snapshot of the table instead of the change stream, the value is always 0. + long timestamp = 0L; + if (value.schema().field(MongoDBEnvelope.CLUSTER_TIME_FIELD) != null) { + String clusterTime = value.getString(MongoDBEnvelope.CLUSTER_TIME_FIELD); if (clusterTime != null) { timestamp = new JsonReader(clusterTime).readTimestamp().getTime() * 1000L; } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java new file mode 100644 index 000000000..0e9aa99e0 --- /dev/null +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mongodb.internal; + +/** + * An immutable descriptor for the structure of {@link + * com.mongodb.client.model.changestream.ChangeStreamDocument} envelopes. + */ +public class MongoDBEnvelope { + + public static final String CLUSTER_TIME_FIELD = "clusterTime"; + + public static final String FULL_DOCUMENT_FIELD = "fullDocument"; + + public static final String DOCUMENT_KEY_FIELD = "documentKey"; + + public static final String OPERATION_TYPE_FIELD = "operationType"; + + public static final String NAMESPACE_FIELD = "ns"; + + public static final String NAMESPACE_DATABASE_FIELD = "db"; + + public static final String NAMESPACE_COLLECTION_FIELD = "coll"; +} diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java index d129ad565..12bb298ff 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java @@ -35,7 +35,10 @@ import org.apache.flink.util.Collector; import com.mongodb.client.model.changestream.OperationType; import com.mongodb.internal.HexUtils; +import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.AppendMetadataCollector; +import com.ververica.cdc.debezium.table.MetadataConverter; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -76,12 +79,6 @@ public class MongoDBConnectorDeserializationSchema private static final long serialVersionUID = 1750787080613035184L; - private static final String FULL_DOCUMENT_FIELD = "fullDocument"; - - private static final String DOCUMENT_KEY_FIELD = "documentKey"; - - private static final String OPERATION_TYPE_FIELD = "operationType"; - /** TypeInformation of the produced {@link RowData}. */ private final TypeInformation resultTypeInfo; @@ -90,14 +87,27 @@ public class MongoDBConnectorDeserializationSchema /** * Runtime converter that converts {@link - * com.mongodb.client.model.changestream.ChangeStreamDocument}s into objects of Flink SQL - * internal data structures. + * com.mongodb.client.model.changestream.ChangeStreamDocument}s into {@link RowData} consisted + * of physical column values. + */ + protected final DeserializationRuntimeConverter physicalConverter; + + /** Whether the deserializer needs to handle metadata columns. */ + protected final boolean hasMetadata; + + /** + * A wrapped output collector which is used to append metadata columns after physical columns. */ - private final DeserializationRuntimeConverter runtimeConverter; + private final AppendMetadataCollector appendMetadataCollector; public MongoDBConnectorDeserializationSchema( - RowType rowType, TypeInformation resultTypeInfo, ZoneId localTimeZone) { - this.runtimeConverter = createConverter(rowType); + RowType physicalDataType, + MetadataConverter[] metadataConverters, + TypeInformation resultTypeInfo, + ZoneId localTimeZone) { + this.hasMetadata = checkNotNull(metadataConverters).length > 0; + this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); + this.physicalConverter = createConverter(physicalDataType); this.resultTypeInfo = resultTypeInfo; this.localTimeZone = localTimeZone; } @@ -109,19 +119,22 @@ public class MongoDBConnectorDeserializationSchema OperationType op = operationTypeFor(record); BsonDocument documentKey = - checkNotNull(extractBsonDocument(value, valueSchema, DOCUMENT_KEY_FIELD)); - BsonDocument fullDocument = extractBsonDocument(value, valueSchema, FULL_DOCUMENT_FIELD); + checkNotNull( + extractBsonDocument( + value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD)); + BsonDocument fullDocument = + extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD); switch (op) { case INSERT: GenericRowData insert = extractRowData(fullDocument); insert.setRowKind(RowKind.INSERT); - out.collect(insert); + emit(record, insert, out); break; case DELETE: GenericRowData delete = extractRowData(documentKey); delete.setRowKind(RowKind.DELETE); - out.collect(delete); + emit(record, delete, out); break; case UPDATE: // It’s null if another operation deletes the document @@ -131,12 +144,12 @@ public class MongoDBConnectorDeserializationSchema } GenericRowData updateAfter = extractRowData(fullDocument); updateAfter.setRowKind(RowKind.UPDATE_AFTER); - out.collect(updateAfter); + emit(record, updateAfter, out); break; case REPLACE: GenericRowData replaceAfter = extractRowData(fullDocument); replaceAfter.setRowKind(RowKind.UPDATE_AFTER); - out.collect(replaceAfter); + emit(record, replaceAfter, out); break; case INVALIDATE: case DROP: @@ -150,7 +163,7 @@ public class MongoDBConnectorDeserializationSchema private GenericRowData extractRowData(BsonDocument document) throws Exception { checkNotNull(document); - return (GenericRowData) runtimeConverter.convert(document); + return (GenericRowData) physicalConverter.convert(document); } private BsonDocument extractBsonDocument(Struct value, Schema valueSchema, String fieldName) { @@ -170,7 +183,18 @@ public class MongoDBConnectorDeserializationSchema private OperationType operationTypeFor(SourceRecord record) { Struct value = (Struct) record.value(); - return OperationType.fromString(value.getString(OPERATION_TYPE_FIELD)); + return OperationType.fromString(value.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD)); + } + + private void emit(SourceRecord inRecord, RowData physicalRow, Collector collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + + appendMetadataCollector.inputRecord = inRecord; + appendMetadataCollector.outputCollector = collector; + appendMetadataCollector.collect(physicalRow); } // ------------------------------------------------------------------------------------- diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java new file mode 100644 index 000000000..a173b600c --- /dev/null +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mongodb.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; + +import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope; +import com.ververica.cdc.debezium.table.MetadataConverter; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +/** Defines the supported metadata columns for {@link MongoDBTableSource}. */ +public enum MongoDBReadableMetadata { + + /** Name of the collection that contain the row. */ + COLLECTION( + "collection_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD); + return StringData.fromString( + to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD)); + } + }), + + /** Name of the database that contain the row. */ + DATABASE( + "database_name", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD); + return StringData.fromString( + to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD)); + } + }), + + /** + * It indicates the time that the change was made in the database. If the record is read from + * snapshot of the table instead of the change stream, the value is always 0. + */ + OP_TS( + "op_ts", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + return TimestampData.fromEpochMillis( + (Long) source.get(AbstractSourceInfo.TIMESTAMP_KEY)); + } + }); + + private final String key; + + private final DataType dataType; + + private final MetadataConverter converter; + + MongoDBReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + + public String getKey() { + return key; + } + + public DataType getDataType() { + return dataType; + } + + public MetadataConverter getConverter() { + return converter; + } +} diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java index 76c6deba0..930b9f99d 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -24,19 +24,27 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.mongodb.MongoDBSource; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.table.MetadataConverter; import javax.annotation.Nullable; import java.time.ZoneId; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -44,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A {@link DynamicTableSource} that describes how to create a MongoDB change stream events source * from a logical description. */ -public class MongoDBTableSource implements ScanTableSource { +public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetadata { private final TableSchema physicalSchema; private final String hosts; @@ -64,6 +72,16 @@ public class MongoDBTableSource implements ScanTableSource { private final Integer heartbeatIntervalMillis; private final ZoneId localTimeZone; + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List metadataKeys; + public MongoDBTableSource( TableSchema physicalSchema, String hosts, @@ -99,6 +117,8 @@ public class MongoDBTableSource implements ScanTableSource { this.pollAwaitTimeMillis = pollAwaitTimeMillis; this.heartbeatIntervalMillis = heartbeatIntervalMillis; this.localTimeZone = localTimeZone; + this.producedDataType = physicalSchema.toPhysicalRowDataType(); + this.metadataKeys = Collections.emptyList(); } @Override @@ -112,12 +132,15 @@ public class MongoDBTableSource implements ScanTableSource { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { - RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = getMetadataConverters(); TypeInformation typeInfo = scanContext.createTypeInformation(physicalSchema.toRowDataType()); DebeziumDeserializationSchema deserializer = - new MongoDBConnectorDeserializationSchema(rowType, typeInfo, localTimeZone); + new MongoDBConnectorDeserializationSchema( + physicalDataType, metadataConverters, typeInfo, localTimeZone); MongoDBSource.Builder builder = MongoDBSource.builder() @@ -144,26 +167,61 @@ public class MongoDBTableSource implements ScanTableSource { return SourceFunctionProvider.of(sourceFunction, false); } + protected MetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new MetadataConverter[0]; + } + + return metadataKeys.stream() + .map( + key -> + Stream.of(MongoDBReadableMetadata.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(MongoDBReadableMetadata::getConverter) + .toArray(MetadataConverter[]::new); + } + + @Override + public Map listReadableMetadata() { + return Stream.of(MongoDBReadableMetadata.values()) + .collect( + Collectors.toMap( + MongoDBReadableMetadata::getKey, + MongoDBReadableMetadata::getDataType)); + } + + @Override + public void applyReadableMetadata(List metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } + @Override public DynamicTableSource copy() { - return new MongoDBTableSource( - physicalSchema, - hosts, - username, - password, - database, - collection, - connectionOptions, - errorsTolerance, - errorsLogEnable, - copyExisting, - copyExistingPipeline, - copyExistingMaxThreads, - copyExistingQueueSize, - pollMaxBatchSize, - pollAwaitTimeMillis, - heartbeatIntervalMillis, - localTimeZone); + MongoDBTableSource source = + new MongoDBTableSource( + physicalSchema, + hosts, + username, + password, + database, + collection, + connectionOptions, + errorsTolerance, + errorsLogEnable, + copyExisting, + copyExistingPipeline, + copyExistingMaxThreads, + copyExistingQueueSize, + pollMaxBatchSize, + pollAwaitTimeMillis, + heartbeatIntervalMillis, + localTimeZone); + source.metadataKeys = metadataKeys; + source.producedDataType = producedDataType; + return source; } @Override @@ -191,7 +249,9 @@ public class MongoDBTableSource implements ScanTableSource { && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize) && Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis) && Objects.equals(heartbeatIntervalMillis, that.heartbeatIntervalMillis) - && Objects.equals(localTimeZone, that.localTimeZone); + && Objects.equals(localTimeZone, that.localTimeZone) + && Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(metadataKeys, that.metadataKeys); } @Override @@ -213,7 +273,9 @@ public class MongoDBTableSource implements ScanTableSource { pollMaxBatchSize, pollAwaitTimeMillis, heartbeatIntervalMillis, - localTimeZone); + localTimeZone, + producedDataType, + metadataKeys); } @Override diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java index a4b86199b..36f202e36 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java @@ -39,8 +39,11 @@ import org.junit.Test; import java.time.ZoneId; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; @@ -352,6 +355,119 @@ public class MongoDBConnectorITCase extends MongoDBTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testMetadataColumns() throws Exception { + String database = executeCommandFileInSeparateDatabase("inventory"); + + String sourceDDL = + String.format( + "CREATE TABLE mongodb_source (" + + " _id STRING NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " db_name STRING METADATA FROM 'database_name' VIRTUAL," + + " collection_name STRING METADATA VIRTUAL," + + " PRIMARY KEY (_id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'mongodb-cdc'," + + " 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000'," + + " 'hosts' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database' = '%s'," + + " 'collection' = '%s'" + + ")", + MONGODB_CONTAINER.getHostAndPort(), + FLINK_USER, + FLINK_USER_PASSWORD, + database, + "products"); + + String sinkDDL = + "CREATE TABLE meta_sink (" + + " _id STRING NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " database_name STRING," + + " collection_name STRING," + + " PRIMARY KEY (_id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = tEnv.executeSql("INSERT INTO meta_sink SELECT * FROM mongodb_source"); + + // wait for snapshot finished and begin binlog + waitForSinkSize("meta_sink", 9); + + MongoCollection products = getMongoDatabase(database).getCollection("products"); + + products.updateOne( + Filters.eq("_id", new ObjectId("100000000000000000000106")), + Updates.set("description", "18oz carpenter hammer")); + + products.updateOne( + Filters.eq("_id", new ObjectId("100000000000000000000107")), + Updates.set("weight", 5.1)); + + products.insertOne( + productDocOf( + "100000000000000000000110", + "jacket", + "water resistent white wind breaker", + 0.2)); + + products.insertOne( + productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", 5.18)); + + products.updateOne( + Filters.eq("_id", new ObjectId("100000000000000000000110")), + Updates.combine( + Updates.set("description", "new water resistent white wind breaker"), + Updates.set("weight", 0.5))); + + products.updateOne( + Filters.eq("_id", new ObjectId("100000000000000000000111")), + Updates.set("weight", 5.17)); + + products.deleteOne(Filters.eq("_id", new ObjectId("100000000000000000000111"))); + + waitForSinkSize("meta_sink", 16); + + List expected = + Stream.of( + "+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products)", + "+I(100000000000000000000102,car battery,12V car battery,8.100,%s,products)", + "+I(100000000000000000000103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800,%s,products)", + "+I(100000000000000000000104,hammer,12oz carpenter''s hammer,0.750,%s,products)", + "+I(100000000000000000000105,hammer,12oz carpenter''s hammer,0.875,%s,products)", + "+I(100000000000000000000106,hammer,12oz carpenter''s hammer,1.000,%s,products)", + "+I(100000000000000000000107,rocks,box of assorted rocks,5.300,%s,products)", + "+I(100000000000000000000108,jacket,water resistent black wind breaker,0.100,%s,products)", + "+I(100000000000000000000109,spare tire,24 inch spare tire,22.200,%s,products)", + "+I(100000000000000000000110,jacket,water resistent white wind breaker,0.200,%s,products)", + "+I(100000000000000000000111,scooter,Big 2-wheel scooter,5.180,%s,products)", + "+U(100000000000000000000106,hammer,18oz carpenter hammer,1.000,%s,products)", + "+U(100000000000000000000107,rocks,box of assorted rocks,5.100,%s,products)", + "+U(100000000000000000000110,jacket,new water resistent white wind breaker,0.500,%s,products)", + "+U(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)", + "-D(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)") + .map(s -> String.format(s, database)) + .sorted() + .collect(Collectors.toList()); + + List actual = TestValuesTableFactory.getRawResults("meta_sink"); + Collections.sort(actual); + assertEquals(expected, actual); + result.getJobClient().get().cancel().get(); + } + private static void waitForSnapshotStarted(String sinkName) throws InterruptedException { while (sinkSize(sinkName) == 0) { Thread.sleep(100); diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index 33b75828f..2365cd2e7 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -36,6 +36,7 @@ import org.junit.Test; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -57,9 +58,23 @@ public class MongoDBTableFactoryTest { Column.physical("ccc", DataTypes.DOUBLE()), Column.physical("ddd", DataTypes.DECIMAL(31, 18)), Column.physical("eee", DataTypes.TIMESTAMP(3))), - new ArrayList<>(), + Collections.emptyList(), UniqueConstraint.primaryKey("pk", Arrays.asList("_id"))); + private static final ResolvedSchema SCHEMA_WITH_METADATA = + new ResolvedSchema( + Arrays.asList( + Column.physical("_id", DataTypes.STRING().notNull()), + Column.physical("bbb", DataTypes.STRING().notNull()), + Column.physical("ccc", DataTypes.DOUBLE()), + Column.physical("ddd", DataTypes.DECIMAL(31, 18)), + Column.physical("eee", DataTypes.TIMESTAMP(3)), + Column.metadata("time", DataTypes.TIMESTAMP(3), "op_ts", true), + Column.metadata( + "_database_name", DataTypes.STRING(), "database_name", true)), + Collections.emptyList(), + UniqueConstraint.primaryKey("pk", Collections.singletonList("_id"))); + private static final String MY_HOSTS = "localhost:27017,localhost:27018"; private static final String USER = "flinkuser"; private static final String PASSWORD = "flinkpw"; @@ -75,7 +90,7 @@ public class MongoDBTableFactoryTest { Map properties = getAllOptions(); // validation for source - DynamicTableSource actualSource = createTableSource(properties); + DynamicTableSource actualSource = createTableSource(SCHEMA, properties); MongoDBTableSource expectedSource = new MongoDBTableSource( TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)), @@ -111,7 +126,7 @@ public class MongoDBTableFactoryTest { options.put("poll.max.batch.size", "102"); options.put("poll.await.time.ms", "103"); options.put("heartbeat.interval.ms", "104"); - DynamicTableSource actualSource = createTableSource(options); + DynamicTableSource actualSource = createTableSource(SCHEMA, options); MongoDBTableSource expectedSource = new MongoDBTableSource( @@ -135,6 +150,44 @@ public class MongoDBTableFactoryTest { assertEquals(expectedSource, actualSource); } + @Test + public void testMetadataColumns() { + Map properties = getAllOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); + MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource; + mongoDBSource.applyReadableMetadata( + Arrays.asList("op_ts", "database_name"), + SCHEMA_WITH_METADATA.toSourceRowDataType()); + actualSource = mongoDBSource.copy(); + + MongoDBTableSource expectedSource = + new MongoDBTableSource( + TableSchemaUtils.getPhysicalSchema(fromResolvedSchema(SCHEMA)), + MY_HOSTS, + USER, + PASSWORD, + MY_DATABASE, + MY_TABLE, + null, + ERROR_TOLERANCE, + ERROR_LOGS_ENABLE, + COPY_EXISTING, + null, + null, + null, + POLL_MAX_BATCH_SIZE_DEFAULT, + POLL_AWAIT_TIME_MILLIS_DEFAULT, + null, + LOCAL_TIME_ZONE); + + expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); + expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); + + assertEquals(expectedSource, actualSource); + } + @Test public void testValidation() { // validate unsupported option @@ -142,7 +195,7 @@ public class MongoDBTableFactoryTest { Map properties = getAllOptions(); properties.put("unknown", "abc"); - createTableSource(properties); + createTableSource(SCHEMA, properties); fail("exception expected"); } catch (Throwable t) { assertTrue( @@ -162,17 +215,18 @@ public class MongoDBTableFactoryTest { return options; } - private static DynamicTableSource createTableSource(Map options) { + private static DynamicTableSource createTableSource( + ResolvedSchema schema, Map options) { return FactoryUtil.createTableSource( null, ObjectIdentifier.of("default", "default", "t1"), new ResolvedCatalogTable( CatalogTable.of( - fromResolvedSchema(SCHEMA).toSchema(), + fromResolvedSchema(schema).toSchema(), "mock source", new ArrayList<>(), options), - SCHEMA), + schema), new Configuration(), MongoDBTableFactoryTest.class.getClassLoader(), false);