diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java index 48b0f24f7..3d6c5e327 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java @@ -22,6 +22,7 @@ import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.kafka.connect.source.MongoSourceConfig; import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance; import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat; +import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.DebeziumSourceFunction; @@ -35,6 +36,10 @@ import java.util.Properties; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME; import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME; @@ -55,6 +60,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; public class MongoDBSource { public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue(); + public static final String FULL_DOCUMENT_REQUIRED = FullDocument.REQUIRED.getValue(); public static final String OUTPUT_FORMAT_SCHEMA = OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT); @@ -76,10 +82,12 @@ public class MongoDBSource { private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue(); private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue(); private Boolean updateLookup = true; + private Boolean fullDocumentBeforeChange = false; private Boolean copyExisting = true; - private Integer copyExistingMaxThreads; - private Integer copyExistingQueueSize; - private String copyExistingPipeline; + private StartupOptions startupOptions; + private Integer initialSnapshottingMaxThreads; + private Integer initialSnapshottingQueueSize; + private String initialSnapshottingPipeline; private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue(); private DebeziumDeserializationSchema deserializer; @@ -190,27 +198,68 @@ public class MongoDBSource { return this; } + /** + * change.stream.full.document.before.change + * + *

Configures the document pre-image your change stream returns on update operations. The + * pre-image is not available for source records published while copying existing data, and + * the pre-image configuration has no effect on copying. + */ + public Builder scanFullChangelog(boolean fullDocumentBeforeChange) { + this.fullDocumentBeforeChange = fullDocumentBeforeChange; + return this; + } + /** * copy.existing * *

Copy existing data from source collections and convert them to Change Stream events on * their respective topics. Any changes to the data that occur during the copy process are * applied once the copy is completed. + * + * @deprecated please use startupOptions instead. */ + @Deprecated public Builder copyExisting(boolean copyExisting) { this.copyExisting = copyExisting; return this; } + /** + * scan.startup.mode + * + *

Optional startup mode for MongoDB CDC consumer, valid enumerations are initial, + * latest-offset, timestamp. Default: initial + */ + public Builder startupOptions(StartupOptions startupOptions) { + this.startupOptions = startupOptions; + return this; + } + /** * copy.existing.max.threads * *

The number of threads to use when performing the data copy. Defaults to the number of * processors. Default: defaults to the number of processors + * + * @deprecated use initialSnapshottingMaxThreads instead. */ + @Deprecated public Builder copyExistingMaxThreads(int copyExistingMaxThreads) { checkArgument(copyExistingMaxThreads > 0); - this.copyExistingMaxThreads = copyExistingMaxThreads; + this.initialSnapshottingMaxThreads = copyExistingMaxThreads; + return this; + } + + /** + * initial.snapshotting.max.threads + * + *

The number of threads to use when performing the data copy. Defaults to the number of + * processors. Default: defaults to the number of processors + */ + public Builder initialSnapshottingMaxThreads(int initialSnapshottingMaxThreads) { + checkArgument(initialSnapshottingMaxThreads > 0); + this.initialSnapshottingMaxThreads = initialSnapshottingMaxThreads; return this; } @@ -218,10 +267,24 @@ public class MongoDBSource { * copy.existing.queue.size * *

The max size of the queue to use when copying data. Default: 10240 + * + * @deprecated use initialSnapshottingQueueSize instead. */ + @Deprecated public Builder copyExistingQueueSize(int copyExistingQueueSize) { checkArgument(copyExistingQueueSize > 0); - this.copyExistingQueueSize = copyExistingQueueSize; + this.initialSnapshottingQueueSize = copyExistingQueueSize; + return this; + } + + /** + * initial.snapshotting.queue.size + * + *

The max size of the queue to use when copying data. Default: 10240 + */ + public Builder initialSnapshottingQueueSize(int initialSnapshottingQueueSize) { + checkArgument(initialSnapshottingQueueSize > 0); + this.initialSnapshottingQueueSize = initialSnapshottingQueueSize; return this; } @@ -231,9 +294,24 @@ public class MongoDBSource { *

An array of JSON objects describing the pipeline operations to run when copying * existing data. This can improve the use of indexes by the copying manager and make * copying more efficient. + * + * @deprecated use initialSnapshottingPipeline instead. */ + @Deprecated public Builder copyExistingPipeline(String copyExistingPipeline) { - this.copyExistingPipeline = copyExistingPipeline; + this.initialSnapshottingPipeline = copyExistingPipeline; + return this; + } + + /** + * initial.snapshotting.pipeline eg. [ { "$match": { "closed": "false" } } ] + * + *

An array of JSON objects describing the pipeline operations to run when copying + * existing data. This can improve the use of indexes by the copying manager and make + * copying more efficient. + */ + public Builder initialSnapshottingPipeline(String initialSnapshottingPipeline) { + this.initialSnapshottingPipeline = initialSnapshottingPipeline; return this; } @@ -283,7 +361,12 @@ public class MongoDBSource { props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList)); } - if (updateLookup) { + if (fullDocumentBeforeChange) { + props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_REQUIRED); + props.setProperty( + MongoSourceConfig.FULL_DOCUMENT_BEFORE_CHANGE_CONFIG, + FULL_DOCUMENT_REQUIRED); + } else if (updateLookup) { props.setProperty( MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP); } @@ -315,26 +398,49 @@ public class MongoDBSource { String.valueOf(pollMaxBatchSize)); } - if (copyExisting != null) { + if (startupOptions != null) { + switch (startupOptions.startupMode) { + case INITIAL: + props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "copy_existing"); + break; + case LATEST_OFFSET: + props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "latest"); + break; + case TIMESTAMP: + props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "timestamp"); + + // mongodb-kafka requires an integer number of seconds since the Epoch + props.setProperty( + STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG, + String.valueOf(startupOptions.startupTimestampMillis / 1000)); + break; + } + } else if (copyExisting != null) { props.setProperty( - MongoSourceConfig.COPY_EXISTING_CONFIG, String.valueOf(copyExisting)); + MongoSourceConfig.STARTUP_MODE_CONFIG, + copyExisting ? "copy_existing" : "latest"); + } else { + // explicitly fallback to initial mode + // since mongodb-kafka's default option is latest + props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "copy_existing"); } - if (copyExistingMaxThreads != null) { + if (initialSnapshottingMaxThreads != null) { props.setProperty( - MongoSourceConfig.COPY_EXISTING_MAX_THREADS_CONFIG, - String.valueOf(copyExistingMaxThreads)); + STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG, + String.valueOf(initialSnapshottingMaxThreads)); } - if (copyExistingQueueSize != null) { + if (initialSnapshottingQueueSize != null) { props.setProperty( - MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG, - String.valueOf(copyExistingQueueSize)); + STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG, + String.valueOf(initialSnapshottingQueueSize)); } - if (copyExistingPipeline != null) { + if (initialSnapshottingPipeline != null) { props.setProperty( - MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG, copyExistingPipeline); + STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG, + initialSnapshottingPipeline); } if (heartbeatIntervalMillis != null) { diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java index 2928ce74c..2357f2423 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java @@ -40,7 +40,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -71,9 +70,19 @@ public class MongoDBConnectorSourceTask extends SourceTask { public static final String COLLECTION_INCLUDE_LIST = "collection.include.list"; + public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG = + "startup.mode.copy.existing.max.threads"; + public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG = + "startup.mode.copy.existing.queue.size"; + public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG = + "startup.mode.copy.existing.pipeline"; + + public static final String STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG = + "startup.mode.timestamp.start.at.operation.time"; + private final MongoSourceTask target; - private final Field isCopyingField; + private final Field startedTaskField; private SourceRecord currentLastSnapshotRecord; @@ -81,7 +90,8 @@ public class MongoDBConnectorSourceTask extends SourceTask { public MongoDBConnectorSourceTask() throws NoSuchFieldException { this.target = new MongoSourceTask(); - this.isCopyingField = MongoSourceTask.class.getDeclaredField("isCopying"); + this.startedTaskField = MongoSourceTask.class.getDeclaredField("startedTask"); + startedTaskField.setAccessible(true); } @Override @@ -236,10 +246,12 @@ public class MongoDBConnectorSourceTask extends SourceTask { } private boolean isCopying() { - isCopyingField.setAccessible(true); try { - return ((AtomicBoolean) isCopyingField.get(target)).get(); - } catch (IllegalAccessException e) { + Object startedTask = startedTaskField.get(target); + Field isCopyingField = startedTask.getClass().getDeclaredField("isCopying"); + isCopyingField.setAccessible(true); + return (boolean) isCopyingField.get(startedTask); + } catch (IllegalAccessException | NoSuchFieldException e) { throw new IllegalStateException("Cannot access isCopying field of SourceTask", e); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java index 920b5e269..edcdd2f28 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java @@ -61,6 +61,8 @@ public class MongoDBEnvelope { public static final String FULL_DOCUMENT_FIELD = "fullDocument"; + public static final String FULL_DOCUMENT_BEFORE_CHANGE_FIELD = "fullDocumentBeforeChange"; + public static final String DOCUMENT_KEY_FIELD = "documentKey"; public static final String OPERATION_TYPE_FIELD = "operationType"; @@ -94,6 +96,7 @@ public class MongoDBEnvelope { + " { \"name\": \"_id\", \"type\": \"string\" }," + " { \"name\": \"operationType\", \"type\": [\"string\", \"null\"] }," + " { \"name\": \"fullDocument\", \"type\": [\"string\", \"null\"] }," + + " { \"name\": \"fullDocumentBeforeChange\", \"type\": [\"string\", \"null\"] }," + " { \"name\": \"source\"," + " \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": [" + " {\"name\": \"ts_ms\", \"type\": \"long\"}," diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java index a91662caf..e1d2f6ba8 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java @@ -195,6 +195,17 @@ public class MongoDBSourceBuilder { return this; } + /** + * scan.full-changelog + * + *

Whether to generate full mode row data by looking up full document pre- and post-image + * collections. Requires MongoDB >= 6.0. + */ + public MongoDBSourceBuilder scanFullChangelog(boolean enableFullDocPrePostImage) { + this.configFactory.scanFullChangelog(enableFullDocPrePostImage); + return this; + } + /** * The deserializer used to convert from consumed {@link * org.apache.kafka.connect.source.SourceRecord}. diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java index ec2351be8..11066ff27 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java @@ -49,6 +49,7 @@ public class MongoDBSourceConfig implements SourceConfig { private final int splitMetaGroupSize; private final int splitSizeMB; private final boolean closeIdleReaders; + private final boolean enableFullDocPrePostImage; MongoDBSourceConfig( String scheme, @@ -66,7 +67,8 @@ public class MongoDBSourceConfig implements SourceConfig { int heartbeatIntervalMillis, int splitMetaGroupSize, int splitSizeMB, - boolean closeIdleReaders) { + boolean closeIdleReaders, + boolean enableFullDocPrePostImage) { this.scheme = checkNotNull(scheme); this.hosts = checkNotNull(hosts); this.username = username; @@ -84,6 +86,7 @@ public class MongoDBSourceConfig implements SourceConfig { this.splitMetaGroupSize = splitMetaGroupSize; this.splitSizeMB = splitSizeMB; this.closeIdleReaders = closeIdleReaders; + this.enableFullDocPrePostImage = enableFullDocPrePostImage; } public String getScheme() { @@ -163,6 +166,10 @@ public class MongoDBSourceConfig implements SourceConfig { return closeIdleReaders; } + public boolean isFullDocPrePostImageEnabled() { + return enableFullDocPrePostImage; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java index 366b51613..4d870bfb0 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java @@ -59,6 +59,7 @@ public class MongoDBSourceConfigFactory implements Factory private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue(); private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue(); private boolean closeIdleReaders = false; + private boolean enableFullDocPrePostImage = false; /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */ public MongoDBSourceConfigFactory scheme(String scheme) { @@ -219,6 +220,17 @@ public class MongoDBSourceConfigFactory implements Factory return this; } + /** + * scan.full-changelog + * + *

Whether to generate full mode row data by looking up full document pre- and post-image + * collections. Requires MongoDB >= 6.0. + */ + public MongoDBSourceConfigFactory scanFullChangelog(boolean enableFullDocPrePostImage) { + this.enableFullDocPrePostImage = enableFullDocPrePostImage; + return this; + } + /** Creates a new {@link MongoDBSourceConfig} for the given subtask {@code subtaskId}. */ @Override public MongoDBSourceConfig create(int subtaskId) { @@ -239,6 +251,7 @@ public class MongoDBSourceConfigFactory implements Factory heartbeatIntervalMillis, splitMetaGroupSize, splitSizeMB, - closeIdleReaders); + closeIdleReaders, + enableFullDocPrePostImage); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java index 8a8d35a51..53bafbf26 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java @@ -78,8 +78,8 @@ public class MongoDBSourceOptions { "The ampersand-separated MongoDB connection options. " + "eg. replicaSet=test&connectTimeoutMS=300000"); - public static final ConfigOption COPY_EXISTING_QUEUE_SIZE = - ConfigOptions.key("copy.existing.queue.size") + public static final ConfigOption INITIAL_SNAPSHOTTING_QUEUE_SIZE = + ConfigOptions.key("initial.snapshotting.queue.size") .intType() .defaultValue(10240) .withDescription( @@ -133,4 +133,12 @@ public class MongoDBSourceOptions { .defaultValue(64) .withDescription( "The chunk size mb of incremental snapshot. Defaults to 64mb."); + + @Experimental + public static final ConfigOption FULL_DOCUMENT_PRE_POST_IMAGE = + ConfigOptions.key("scan.full-changelog") + .booleanType() + .defaultValue(false) + .withDescription( + "Scan full mode changelog. Only available when MongoDB >= 6.0. Defaults to false."); } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java index fdf6d139e..d4db25194 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java @@ -334,7 +334,6 @@ public class MongoDBStreamFetchTask implements FetchTask { Instant clusterInstant = Instant.ofEpochSecond(clusterTime.getTime()); source.put(TIMESTAMP_KEY_FIELD, new BsonInt64(clusterInstant.toEpochMilli())); changeStreamDocument.put(SOURCE_FIELD, source); - return changeStreamDocument; } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java index 737c31546..cbe191f4a 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java @@ -23,6 +23,7 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig; import com.ververica.cdc.connectors.mongodb.source.connection.MongoClientPool; import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor; @@ -131,14 +132,16 @@ public class MongoUtils { descriptor.getDatabaseRegex(), descriptor.getNamespaceRegex(), sourceConfig.getBatchSize(), - sourceConfig.isUpdateLookup()); + sourceConfig.isUpdateLookup(), + sourceConfig.isFullDocPrePostImageEnabled()); } public static ChangeStreamIterable getChangeStreamIterable( MongoClient mongoClient, ChangeStreamDescriptor descriptor, int batchSize, - boolean updateLookup) { + boolean updateLookup, + boolean fullDocPrePostImage) { return getChangeStreamIterable( mongoClient, descriptor.getDatabase(), @@ -146,7 +149,8 @@ public class MongoUtils { descriptor.getDatabaseRegex(), descriptor.getNamespaceRegex(), batchSize, - updateLookup); + updateLookup, + fullDocPrePostImage); } public static ChangeStreamIterable getChangeStreamIterable( @@ -156,7 +160,8 @@ public class MongoUtils { @Nullable Pattern databaseRegex, @Nullable Pattern namespaceRegex, int batchSize, - boolean updateLookup) { + boolean updateLookup, + boolean fullDocPrePostImage) { ChangeStreamIterable changeStream; if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) { MongoCollection coll = @@ -216,9 +221,14 @@ public class MongoUtils { changeStream.batchSize(batchSize); } - if (updateLookup) { + if (fullDocPrePostImage) { + // require both pre-image and post-image + changeStream.fullDocument(FullDocument.REQUIRED); + changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); + } else if (updateLookup) { changeStream.fullDocument(FullDocument.UPDATE_LOOKUP); } + return changeStream; } @@ -226,7 +236,7 @@ public class MongoUtils { public static BsonDocument getLatestResumeToken( MongoClient mongoClient, ChangeStreamDescriptor descriptor) { ChangeStreamIterable changeStreamIterable = - getChangeStreamIterable(mongoClient, descriptor, 1, false); + getChangeStreamIterable(mongoClient, descriptor, 1, false, false); // Nullable when no change record or postResumeToken (new in MongoDB 4.0.7). try (MongoChangeStreamCursor> changeStreamCursor = @@ -339,6 +349,14 @@ public class MongoUtils { .withDocumentClass(documentClass); } + public static String getMongoVersion(MongoDBSourceConfig sourceConfig) { + MongoClient client = MongoClientPool.getInstance().getOrCreateMongoClient(sourceConfig); + return client.getDatabase("config") + .runCommand(new BsonDocument("buildinfo", new BsonString(""))) + .get("version") + .toString(); + } + public static MongoClient clientFor(MongoDBSourceConfig sourceConfig) { return MongoClientPool.getInstance().getOrCreateMongoClient(sourceConfig); } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java index 9d392fb5e..b75afb992 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorDeserializationSchema.java @@ -1,11 +1,11 @@ /* - * Copyright 2008-present MongoDB, Inc. + * Copyright 2023 Ververica Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -166,12 +166,12 @@ public class MongoDBConnectorDeserializationSchema } } - private GenericRowData extractRowData(BsonDocument document) throws Exception { + protected GenericRowData extractRowData(BsonDocument document) throws Exception { checkNotNull(document); return (GenericRowData) physicalConverter.convert(document); } - private BsonDocument extractBsonDocument(Struct value, Schema valueSchema, String fieldName) { + protected BsonDocument extractBsonDocument(Struct value, Schema valueSchema, String fieldName) { if (valueSchema.field(fieldName) != null) { String docString = value.getString(fieldName); if (docString != null) { @@ -186,12 +186,12 @@ public class MongoDBConnectorDeserializationSchema return resultTypeInfo; } - private OperationType operationTypeFor(SourceRecord record) { + protected OperationType operationTypeFor(SourceRecord record) { Struct value = (Struct) record.value(); return OperationType.fromString(value.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD)); } - private void emit(SourceRecord inRecord, RowData physicalRow, Collector collector) { + protected void emit(SourceRecord inRecord, RowData physicalRow, Collector collector) { if (!hasMetadata) { collector.collect(physicalRow); return; diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorFullChangelogDeserializationSchema.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorFullChangelogDeserializationSchema.java new file mode 100644 index 000000000..eeb4b5c60 --- /dev/null +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBConnectorFullChangelogDeserializationSchema.java @@ -0,0 +1,129 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mongodb.table; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import com.mongodb.client.model.changestream.OperationType; +import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope; +import com.ververica.cdc.debezium.table.MetadataConverter; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.bson.BsonDocument; + +import java.time.ZoneId; + +/** + * Deserialization schema from Mongodb ChangeStreamDocument to Flink Table/SQL internal data that + * produces full changelog mode structure {@link RowData}. + */ +public class MongoDBConnectorFullChangelogDeserializationSchema + extends MongoDBConnectorDeserializationSchema { + + private static final long serialVersionUID = 1750787080613035184L; + + public MongoDBConnectorFullChangelogDeserializationSchema( + RowType physicalDataType, + MetadataConverter[] metadataConverters, + TypeInformation resultTypeInfo, + ZoneId localTimeZone) { + super(physicalDataType, metadataConverters, resultTypeInfo, localTimeZone); + } + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + + OperationType op = operationTypeFor(record); + + BsonDocument documentKey = + extractBsonDocument(value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD); + BsonDocument fullDocument = + extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD); + + BsonDocument fullDocumentBeforeChange = + extractBsonDocument( + value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_BEFORE_CHANGE_FIELD); + + switch (op) { + case INSERT: + GenericRowData insert = extractRowData(fullDocument); + insert.setRowKind(RowKind.INSERT); + emit(record, insert, out); + break; + case DELETE: + // there might be FullDocBeforeChange field + // if fullDocumentPrePostImage feature is on + // convert it to Delete row data with full document + if (fullDocumentBeforeChange != null) { + GenericRowData updateBefore = extractRowData(fullDocumentBeforeChange); + updateBefore.setRowKind(RowKind.DELETE); + emit(record, updateBefore, out); + } else { + GenericRowData delete = extractRowData(documentKey); + delete.setRowKind(RowKind.DELETE); + emit(record, delete, out); + } + break; + case UPDATE: + // It’s null if another operation deletes the document + // before the lookup operation happens. Ignored it. + if (fullDocument == null) { + break; + } + // there might be FullDocBeforeChange field + // if fullDocumentPrePostImage feature is on + // convert it to UB row data + if (fullDocumentBeforeChange != null) { + GenericRowData updateBefore = extractRowData(fullDocumentBeforeChange); + updateBefore.setRowKind(RowKind.UPDATE_BEFORE); + emit(record, updateBefore, out); + } + GenericRowData updateAfter = extractRowData(fullDocument); + updateAfter.setRowKind(RowKind.UPDATE_AFTER); + emit(record, updateAfter, out); + break; + case REPLACE: + // there might be FullDocBeforeChange field + // if fullDocumentPrePostImage feature is on + // convert it to UB row data + if (fullDocumentBeforeChange != null) { + GenericRowData updateBefore = extractRowData(fullDocumentBeforeChange); + updateBefore.setRowKind(RowKind.UPDATE_BEFORE); + emit(record, updateBefore, out); + } + GenericRowData replaceAfter = extractRowData(fullDocument); + replaceAfter.setRowKind(RowKind.UPDATE_AFTER); + emit(record, replaceAfter, out); + break; + case INVALIDATE: + case DROP: + case DROP_DATABASE: + case RENAME: + case OTHER: + default: + break; + } + } +} diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java index 3ff691532..6fe8acf11 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -17,7 +17,6 @@ package com.ververica.cdc.connectors.mongodb.table; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -28,7 +27,6 @@ import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; @@ -72,7 +70,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final String database; private final String collection; private final StartupOptions startupOptions; - private final Integer copyExistingQueueSize; + private final Integer initialSnapshottingQueueSize; private final Integer batchSize; private final Integer pollMaxBatchSize; private final Integer pollAwaitTimeMillis; @@ -82,6 +80,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final Integer splitMetaGroupSize; private final Integer splitSizeMB; private final boolean closeIdlerReaders; + private final boolean enableFullDocPrePostImage; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -103,7 +102,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad @Nullable String collection, @Nullable String connectionOptions, StartupOptions startupOptions, - @Nullable Integer copyExistingQueueSize, + @Nullable Integer initialSnapshottingQueueSize, @Nullable Integer batchSize, @Nullable Integer pollMaxBatchSize, @Nullable Integer pollAwaitTimeMillis, @@ -112,7 +111,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad boolean enableParallelRead, @Nullable Integer splitMetaGroupSize, @Nullable Integer splitSizeMB, - boolean closeIdlerReaders) { + boolean closeIdlerReaders, + boolean enableFullDocPrePostImage) { this.physicalSchema = physicalSchema; this.scheme = checkNotNull(scheme); this.hosts = checkNotNull(hosts); @@ -122,7 +122,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad this.collection = collection; this.connectionOptions = connectionOptions; this.startupOptions = checkNotNull(startupOptions); - this.copyExistingQueueSize = copyExistingQueueSize; + this.initialSnapshottingQueueSize = initialSnapshottingQueueSize; this.batchSize = batchSize; this.pollMaxBatchSize = pollMaxBatchSize; this.pollAwaitTimeMillis = pollAwaitTimeMillis; @@ -134,15 +134,18 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad this.splitMetaGroupSize = splitMetaGroupSize; this.splitSizeMB = splitSizeMB; this.closeIdlerReaders = closeIdlerReaders; + this.enableFullDocPrePostImage = enableFullDocPrePostImage; } @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.DELETE) - .build(); + if (this.enableFullDocPrePostImage) { + // generate full-mode changelog with FullDocPrePostImage + return ChangelogMode.all(); + } else { + // upsert changelog only + return ChangelogMode.upsert(); + } } @Override @@ -153,8 +156,11 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad TypeInformation typeInfo = scanContext.createTypeInformation(producedDataType); DebeziumDeserializationSchema deserializer = - new MongoDBConnectorDeserializationSchema( - physicalDataType, metadataConverters, typeInfo, localTimeZone); + enableFullDocPrePostImage + ? new MongoDBConnectorFullChangelogDeserializationSchema( + physicalDataType, metadataConverters, typeInfo, localTimeZone) + : new MongoDBConnectorDeserializationSchema( + physicalDataType, metadataConverters, typeInfo, localTimeZone); String databaseList = null; String collectionList = null; @@ -183,6 +189,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad .scheme(scheme) .hosts(hosts) .closeIdleReaders(closeIdlerReaders) + .scanFullChangelog(enableFullDocPrePostImage) .startupOptions(startupOptions) .deserializer(deserializer); @@ -198,34 +205,23 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad .ifPresent(builder::heartbeatIntervalMillis); Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize); Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB); - return SourceProvider.of(builder.build()); } else { com.ververica.cdc.connectors.mongodb.MongoDBSource.Builder builder = com.ververica.cdc.connectors.mongodb.MongoDBSource.builder() .scheme(scheme) .hosts(hosts) + .scanFullChangelog(enableFullDocPrePostImage) + .startupOptions(startupOptions) .deserializer(deserializer); - switch (startupOptions.startupMode) { - case INITIAL: - builder.copyExisting(true); - break; - case LATEST_OFFSET: - builder.copyExisting(false); - break; - default: - throw new ValidationException( - startupOptions.startupMode - + " is not supported by legacy source. To use this feature, 'scan.incremental.snapshot.enabled' needs to be set to true."); - } - Optional.ofNullable(databaseList).ifPresent(builder::databaseList); Optional.ofNullable(collectionList).ifPresent(builder::collectionList); Optional.ofNullable(username).ifPresent(builder::username); Optional.ofNullable(password).ifPresent(builder::password); Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions); - Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize); + Optional.ofNullable(initialSnapshottingQueueSize) + .ifPresent(builder::initialSnapshottingQueueSize); Optional.ofNullable(batchSize).ifPresent(builder::batchSize); Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize); Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis); @@ -280,7 +276,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad collection, connectionOptions, startupOptions, - copyExistingQueueSize, + initialSnapshottingQueueSize, batchSize, pollMaxBatchSize, pollAwaitTimeMillis, @@ -289,7 +285,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad enableParallelRead, splitMetaGroupSize, splitSizeMB, - closeIdlerReaders); + closeIdlerReaders, + enableFullDocPrePostImage); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -313,7 +310,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad && Objects.equals(collection, that.collection) && Objects.equals(connectionOptions, that.connectionOptions) && Objects.equals(startupOptions, that.startupOptions) - && Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize) + && Objects.equals(initialSnapshottingQueueSize, that.initialSnapshottingQueueSize) && Objects.equals(batchSize, that.batchSize) && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize) && Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis) @@ -339,7 +336,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad collection, connectionOptions, startupOptions, - copyExistingQueueSize, + initialSnapshottingQueueSize, batchSize, pollMaxBatchSize, pollAwaitTimeMillis, diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index 912869939..4c637bb57 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -41,10 +41,11 @@ import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_START import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS; -import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING_QUEUE_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.DATABASE; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.FULL_DOCUMENT_PRE_POST_IMAGE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HOSTS; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.INITIAL_SNAPSHOTTING_QUEUE_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.PASSWORD; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE; @@ -88,7 +89,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { Integer heartbeatIntervalMillis = config.get(HEARTBEAT_INTERVAL_MILLIS); StartupOptions startupOptions = getStartupOptions(config); - Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null); + Integer initialSnapshottingQueueSize = + config.getOptional(INITIAL_SNAPSHOTTING_QUEUE_SIZE).orElse(null); String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE); ZoneId localTimeZone = @@ -102,6 +104,9 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB); int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE); + boolean enableFullDocumentPrePostImage = + config.getOptional(FULL_DOCUMENT_PRE_POST_IMAGE).orElse(false); + ResolvedSchema physicalSchema = getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present"); @@ -119,7 +124,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { collection, connectionOptions, startupOptions, - copyExistingQueueSize, + initialSnapshottingQueueSize, batchSize, pollMaxBatchSize, pollAwaitTimeMillis, @@ -128,7 +133,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { enableParallelRead, splitMetaGroupSize, splitSizeMB, - enableCloseIdleReaders); + enableCloseIdleReaders, + enableFullDocumentPrePostImage); } private void checkPrimaryKey(UniqueConstraint pk, String message) { @@ -191,7 +197,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { options.add(COLLECTION); options.add(SCAN_STARTUP_MODE); options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); - options.add(COPY_EXISTING_QUEUE_SIZE); + options.add(INITIAL_SNAPSHOTTING_QUEUE_SIZE); options.add(BATCH_SIZE); options.add(POLL_MAX_BATCH_SIZE); options.add(POLL_AWAIT_TIME_MILLIS); @@ -200,6 +206,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB); options.add(CHUNK_META_GROUP_SIZE); options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + options.add(FULL_DOCUMENT_PRE_POST_IMAGE); return options; } } diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/utils/MongoDBContainer.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/utils/MongoDBContainer.java index 14e7055b6..d5343311b 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/utils/MongoDBContainer.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/utils/MongoDBContainer.java @@ -20,164 +20,124 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.containers.wait.strategy.WaitStrategy; -import org.testcontainers.images.builder.ImageFromDockerfile; import java.io.IOException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Random; +import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.junit.Assert.assertNotNull; -/** Mongodb test container. */ -public class MongoDBContainer extends GenericContainer { +/** Container for testing MongoDB >= 5.0.3. */ +public class MongoDBContainer extends org.testcontainers.containers.MongoDBContainer { private static final Logger LOG = LoggerFactory.getLogger(MongoDBContainer.class); - private static final String DOCKER_IMAGE_NAME = "mongo:5.0.2"; + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)//.*$"); public static final int MONGODB_PORT = 27017; - public static final String MONGO_SUPER_USER = "superuser"; - - public static final String MONGO_SUPER_PASSWORD = "superpw"; - public static final String FLINK_USER = "flinkuser"; public static final String FLINK_USER_PASSWORD = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;"; - private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)//.*$"); + public MongoDBContainer(String imageName) { + super(imageName); + } + + @Override + protected void containerIsStarted(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarted(containerInfo, reused); - private final ShardingClusterRole clusterRole; + final String setupFilePath = "docker/mongodb/setup.js"; + final URL setupFile = MongoDBContainer.class.getClassLoader().getResource(setupFilePath); - public MongoDBContainer(Network network) { - this(network, ShardingClusterRole.NONE); + assertNotNull("Cannot locate " + setupFilePath, setupFile); + try { + String createUserCommand = + Files.readAllLines(Paths.get(setupFile.toURI())).stream() + .filter(x -> StringUtils.isNotBlank(x) && !x.trim().startsWith("//")) + .map( + x -> { + final Matcher m = COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining(" ")); + ExecResult execResult = + execInContainer( + "mongosh", + "--eval", + "use admin", + "--eval", + createUserCommand, + "--eval", + "console.log('Flink test user created.\\n');"); + LOG.info(execResult.getStdout()); + if (execResult.getExitCode() != 0) { + throw new IllegalStateException( + "Execute mongo command failed " + execResult.getStderr()); + } + this.waitingFor(Wait.forLogMessage("Flink test user created.\\s", 1)); + } catch (Exception e) { + throw new RuntimeException(e); + } } - public MongoDBContainer(Network network, ShardingClusterRole clusterRole) { - super( - new ImageFromDockerfile() - .withFileFromClasspath("random.key", "docker/mongodb/random.key") - .withFileFromClasspath("setup.js", "docker/mongodb/setup.js") - .withDockerfileFromBuilder( - builder -> - builder.from(DOCKER_IMAGE_NAME) - .copy( - "setup.js", - "/docker-entrypoint-initdb.d/setup.js") - .copy("random.key", "/data/keyfile/random.key") - .run("chown mongodb /data/keyfile/random.key") - .run("chmod 400 /data/keyfile/random.key") - .env("MONGO_INITDB_ROOT_USERNAME", MONGO_SUPER_USER) - .env( - "MONGO_INITDB_ROOT_PASSWORD", - MONGO_SUPER_PASSWORD) - .env("MONGO_INITDB_DATABASE", "admin") - .build())); - this.clusterRole = clusterRole; + @Override + public MongoDBContainer withSharding() { + return (MongoDBContainer) super.withSharding(); + } - withNetwork(network); - withNetworkAliases(clusterRole.hostname); - withExposedPorts(MONGODB_PORT); - withCommand(ShardingClusterRole.startupCommand(clusterRole)); - waitingFor(clusterRole.waitStrategy); + @Override + public MongoDBContainer withLogConsumer(Consumer consumer) { + return (MongoDBContainer) super.withLogConsumer(consumer); } - public String getConnectionString(String username, String password) { - return String.format( - "mongodb://%s:%s@%s:%d", - username, password, getContainerIpAddress(), getMappedPort(MONGODB_PORT)); + @Override + public MongoDBContainer withNetwork(Network network) { + return (MongoDBContainer) super.withNetwork(network); } - public String getHostAndPort() { - return String.format("%s:%s", getContainerIpAddress(), getMappedPort(MONGODB_PORT)); + @Override + public MongoDBContainer withNetworkAliases(String... aliases) { + return (MongoDBContainer) super.withNetworkAliases(aliases); } public void executeCommand(String command) { try { LOG.info("Executing mongo command: {}", command); - ExecResult execResult = - execInContainer( - "mongo", - "-u", - MONGO_SUPER_USER, - "-p", - MONGO_SUPER_PASSWORD, - "--eval", - command); + ExecResult execResult = execInContainer("mongosh", "--eval", command); LOG.info(execResult.getStdout()); if (execResult.getExitCode() != 0) { throw new IllegalStateException( - "Execute mongo command failed " + execResult.getStdout()); + "Execute mongo command failed " + execResult.getStderr()); } } catch (InterruptedException | IOException e) { throw new IllegalStateException("Execute mongo command failed", e); } } - @Override - protected void containerIsStarted(InspectContainerResponse containerInfo) { - LOG.info("Preparing a MongoDB Container with sharding cluster role {}...", clusterRole); - if (clusterRole != ShardingClusterRole.ROUTER) { - initReplicaSet(); - } else { - initShard(); + public String executeCommandInDatabase(String command, String databaseName) { + try { + executeCommand(String.format("db = db.getSiblingDB('%s');\n", databaseName) + command); + return databaseName; + } catch (Exception e) { + throw new RuntimeException(e); } } - protected void initReplicaSet() { - LOG.info("Initializing a single node replica set..."); - executeCommand( - String.format( - "rs.initiate({ _id : '%s', configsvr: %s, members: [{ _id: 0, host: '%s:%d'}]})", - clusterRole.replicaSetName, - clusterRole == ShardingClusterRole.CONFIG, - clusterRole.hostname, - MONGODB_PORT)); - - LOG.info("Waiting for single node replica set initialized..."); - executeCommand( - String.format( - "var attempt = 0; " - + "while" - + "(%s) " - + "{ " - + "if (attempt > %d) {quit(1);} " - + "print('%s ' + attempt); sleep(100); attempt++; " - + " }", - "db.runCommand( { isMaster: 1 } ).ismaster==false", - 60, - "An attempt to await for a single node replica set initialization:")); - } - - protected void initShard() { - LOG.info("Initializing a sharded cluster..."); - // decrease chunk size from default 64mb to 1mb to make splitter test easier. - executeCommand( - "db.getSiblingDB('config').settings.updateOne(\n" - + " { _id: \"chunksize\" },\n" - + " { $set: { _id: \"chunksize\", value: 1 } },\n" - + " { upsert: true }\n" - + ");"); - executeCommand( - String.format( - "sh.addShard('%s/%s:%d')", - ShardingClusterRole.SHARD.replicaSetName, - ShardingClusterRole.SHARD.hostname, - MONGODB_PORT)); - } - - /** Executes a mongo command file. */ - public String executeCommandFile(String fileNameIgnoreSuffix) { - return executeCommandFileInDatabase(fileNameIgnoreSuffix, fileNameIgnoreSuffix); + /** Executes a mongo command in separate database. */ + public String executeCommandInSeparateDatabase(String command, String baseName) { + return executeCommandInDatabase( + command, baseName + "_" + Integer.toUnsignedString(new Random().nextInt(), 36)); } /** Executes a mongo command file in separate database. */ @@ -215,52 +175,12 @@ public class MongoDBContainer extends GenericContainer { } } - /** A MongoDB sharded cluster roles. */ - public enum ShardingClusterRole { - // Config servers store metadata and configuration settings for the cluster. - CONFIG("config0", "rs0-config", Wait.forLogMessage(".*[Ww]aiting for connections.*", 2)), - - // Each shard contains a subset of the sharded data. Each shard can be deployed as a replica - // set. - SHARD("shard0", "rs0-shard", Wait.forLogMessage(".*[Ww]aiting for connections.*", 2)), - - // The mongos acts as a query router, providing an interface between client applications and - // the sharded cluster. - ROUTER("router0", null, Wait.forLogMessage(".*[Ww]aiting for connections.*", 1)), - - // None sharded cluster. - NONE("mongo0", "rs0", Wait.forLogMessage(".*Replication has not yet been configured.*", 1)); - - private final String hostname; - private final String replicaSetName; - private final WaitStrategy waitStrategy; - - ShardingClusterRole(String hostname, String replicaSetName, WaitStrategy waitStrategy) { - this.hostname = hostname; - this.replicaSetName = replicaSetName; - this.waitStrategy = waitStrategy; - } + public String getConnectionString() { + return String.format( + "mongodb://%s:%d", getContainerIpAddress(), getMappedPort(MONGODB_PORT)); + } - public static String startupCommand(ShardingClusterRole clusterRole) { - switch (clusterRole) { - case CONFIG: - return String.format( - "mongod --configsvr --port %d --replSet %s --keyFile /data/keyfile/random.key", - MONGODB_PORT, clusterRole.replicaSetName); - case SHARD: - return String.format( - "mongod --shardsvr --port %d --replSet %s --keyFile /data/keyfile/random.key", - MONGODB_PORT, clusterRole.replicaSetName); - case ROUTER: - return String.format( - "mongos --configdb %s/%s:%d --bind_ip_all --keyFile /data/keyfile/random.key", - CONFIG.replicaSetName, CONFIG.hostname, MONGODB_PORT); - case NONE: - default: - return String.format( - "mongod --port %d --replSet %s --keyFile /data/keyfile/random.key", - MONGODB_PORT, NONE.replicaSetName); - } - } + public String getHostAndPort() { + return String.format("%s:%s", getContainerIpAddress(), getMappedPort(MONGODB_PORT)); } }