[mongodb] Supports `scanFullChangelog` feature & deprecate `copyExisting` option

pull/2294/head
yuxiqian 2 years ago committed by Leonard Xu
parent 7f6eaa4b2e
commit cfd72dfb90

@ -22,6 +22,7 @@ import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
@ -35,6 +36,10 @@ import java.util.Properties;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
@ -55,6 +60,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
public class MongoDBSource {
public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue();
public static final String FULL_DOCUMENT_REQUIRED = FullDocument.REQUIRED.getValue();
public static final String OUTPUT_FORMAT_SCHEMA =
OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT);
@ -76,10 +82,12 @@ public class MongoDBSource {
private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue();
private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue();
private Boolean updateLookup = true;
private Boolean fullDocumentBeforeChange = false;
private Boolean copyExisting = true;
private Integer copyExistingMaxThreads;
private Integer copyExistingQueueSize;
private String copyExistingPipeline;
private StartupOptions startupOptions;
private Integer initialSnapshottingMaxThreads;
private Integer initialSnapshottingQueueSize;
private String initialSnapshottingPipeline;
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private DebeziumDeserializationSchema<T> deserializer;
@ -190,27 +198,68 @@ public class MongoDBSource {
return this;
}
/**
* change.stream.full.document.before.change
*
* <p>Configures the document pre-image your change stream returns on update operations. The
* pre-image is not available for source records published while copying existing data, and
* the pre-image configuration has no effect on copying.
*/
public Builder<T> scanFullChangelog(boolean fullDocumentBeforeChange) {
this.fullDocumentBeforeChange = fullDocumentBeforeChange;
return this;
}
/**
* copy.existing
*
* <p>Copy existing data from source collections and convert them to Change Stream events on
* their respective topics. Any changes to the data that occur during the copy process are
* applied once the copy is completed.
*
* @deprecated please use <code>startupOptions</code> instead.
*/
@Deprecated
public Builder<T> copyExisting(boolean copyExisting) {
this.copyExisting = copyExisting;
return this;
}
/**
* scan.startup.mode
*
* <p>Optional startup mode for MongoDB CDC consumer, valid enumerations are initial,
* latest-offset, timestamp. Default: initial
*/
public Builder<T> startupOptions(StartupOptions startupOptions) {
this.startupOptions = startupOptions;
return this;
}
/**
* copy.existing.max.threads
*
* <p>The number of threads to use when performing the data copy. Defaults to the number of
* processors. Default: defaults to the number of processors
*
* @deprecated use <code>initialSnapshottingMaxThreads</code> instead.
*/
@Deprecated
public Builder<T> copyExistingMaxThreads(int copyExistingMaxThreads) {
checkArgument(copyExistingMaxThreads > 0);
this.copyExistingMaxThreads = copyExistingMaxThreads;
this.initialSnapshottingMaxThreads = copyExistingMaxThreads;
return this;
}
/**
* initial.snapshotting.max.threads
*
* <p>The number of threads to use when performing the data copy. Defaults to the number of
* processors. Default: defaults to the number of processors
*/
public Builder<T> initialSnapshottingMaxThreads(int initialSnapshottingMaxThreads) {
checkArgument(initialSnapshottingMaxThreads > 0);
this.initialSnapshottingMaxThreads = initialSnapshottingMaxThreads;
return this;
}
@ -218,10 +267,24 @@ public class MongoDBSource {
* copy.existing.queue.size
*
* <p>The max size of the queue to use when copying data. Default: 10240
*
* @deprecated use <code>initialSnapshottingQueueSize</code> instead.
*/
@Deprecated
public Builder<T> copyExistingQueueSize(int copyExistingQueueSize) {
checkArgument(copyExistingQueueSize > 0);
this.copyExistingQueueSize = copyExistingQueueSize;
this.initialSnapshottingQueueSize = copyExistingQueueSize;
return this;
}
/**
* initial.snapshotting.queue.size
*
* <p>The max size of the queue to use when copying data. Default: 10240
*/
public Builder<T> initialSnapshottingQueueSize(int initialSnapshottingQueueSize) {
checkArgument(initialSnapshottingQueueSize > 0);
this.initialSnapshottingQueueSize = initialSnapshottingQueueSize;
return this;
}
@ -231,9 +294,24 @@ public class MongoDBSource {
* <p>An array of JSON objects describing the pipeline operations to run when copying
* existing data. This can improve the use of indexes by the copying manager and make
* copying more efficient.
*
* @deprecated use <code>initialSnapshottingPipeline</code> instead.
*/
@Deprecated
public Builder<T> copyExistingPipeline(String copyExistingPipeline) {
this.copyExistingPipeline = copyExistingPipeline;
this.initialSnapshottingPipeline = copyExistingPipeline;
return this;
}
/**
* initial.snapshotting.pipeline eg. [ { "$match": { "closed": "false" } } ]
*
* <p>An array of JSON objects describing the pipeline operations to run when copying
* existing data. This can improve the use of indexes by the copying manager and make
* copying more efficient.
*/
public Builder<T> initialSnapshottingPipeline(String initialSnapshottingPipeline) {
this.initialSnapshottingPipeline = initialSnapshottingPipeline;
return this;
}
@ -283,7 +361,12 @@ public class MongoDBSource {
props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList));
}
if (updateLookup) {
if (fullDocumentBeforeChange) {
props.setProperty(MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_REQUIRED);
props.setProperty(
MongoSourceConfig.FULL_DOCUMENT_BEFORE_CHANGE_CONFIG,
FULL_DOCUMENT_REQUIRED);
} else if (updateLookup) {
props.setProperty(
MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP);
}
@ -315,26 +398,49 @@ public class MongoDBSource {
String.valueOf(pollMaxBatchSize));
}
if (copyExisting != null) {
if (startupOptions != null) {
switch (startupOptions.startupMode) {
case INITIAL:
props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "copy_existing");
break;
case LATEST_OFFSET:
props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "latest");
break;
case TIMESTAMP:
props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "timestamp");
// mongodb-kafka requires an integer number of seconds since the Epoch
props.setProperty(
STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG,
String.valueOf(startupOptions.startupTimestampMillis / 1000));
break;
}
} else if (copyExisting != null) {
props.setProperty(
MongoSourceConfig.COPY_EXISTING_CONFIG, String.valueOf(copyExisting));
MongoSourceConfig.STARTUP_MODE_CONFIG,
copyExisting ? "copy_existing" : "latest");
} else {
// explicitly fallback to initial mode
// since mongodb-kafka's default option is latest
props.setProperty(MongoSourceConfig.STARTUP_MODE_CONFIG, "copy_existing");
}
if (copyExistingMaxThreads != null) {
if (initialSnapshottingMaxThreads != null) {
props.setProperty(
MongoSourceConfig.COPY_EXISTING_MAX_THREADS_CONFIG,
String.valueOf(copyExistingMaxThreads));
STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG,
String.valueOf(initialSnapshottingMaxThreads));
}
if (copyExistingQueueSize != null) {
if (initialSnapshottingQueueSize != null) {
props.setProperty(
MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG,
String.valueOf(copyExistingQueueSize));
STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG,
String.valueOf(initialSnapshottingQueueSize));
}
if (copyExistingPipeline != null) {
if (initialSnapshottingPipeline != null) {
props.setProperty(
MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG, copyExistingPipeline);
STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG,
initialSnapshottingPipeline);
}
if (heartbeatIntervalMillis != null) {

@ -40,7 +40,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -71,9 +70,19 @@ public class MongoDBConnectorSourceTask extends SourceTask {
public static final String COLLECTION_INCLUDE_LIST = "collection.include.list";
public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_MAX_THREADS_CONFIG =
"startup.mode.copy.existing.max.threads";
public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_QUEUE_SIZE_CONFIG =
"startup.mode.copy.existing.queue.size";
public static final String STARTUP_MODE_INITIAL_SNAPSHOTTING_PIPELINE_CONFIG =
"startup.mode.copy.existing.pipeline";
public static final String STARTUP_MODE_TIMESTAMP_START_AT_OPERATION_TIME_CONFIG =
"startup.mode.timestamp.start.at.operation.time";
private final MongoSourceTask target;
private final Field isCopyingField;
private final Field startedTaskField;
private SourceRecord currentLastSnapshotRecord;
@ -81,7 +90,8 @@ public class MongoDBConnectorSourceTask extends SourceTask {
public MongoDBConnectorSourceTask() throws NoSuchFieldException {
this.target = new MongoSourceTask();
this.isCopyingField = MongoSourceTask.class.getDeclaredField("isCopying");
this.startedTaskField = MongoSourceTask.class.getDeclaredField("startedTask");
startedTaskField.setAccessible(true);
}
@Override
@ -236,10 +246,12 @@ public class MongoDBConnectorSourceTask extends SourceTask {
}
private boolean isCopying() {
isCopyingField.setAccessible(true);
try {
return ((AtomicBoolean) isCopyingField.get(target)).get();
} catch (IllegalAccessException e) {
Object startedTask = startedTaskField.get(target);
Field isCopyingField = startedTask.getClass().getDeclaredField("isCopying");
isCopyingField.setAccessible(true);
return (boolean) isCopyingField.get(startedTask);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new IllegalStateException("Cannot access isCopying field of SourceTask", e);
}
}

@ -61,6 +61,8 @@ public class MongoDBEnvelope {
public static final String FULL_DOCUMENT_FIELD = "fullDocument";
public static final String FULL_DOCUMENT_BEFORE_CHANGE_FIELD = "fullDocumentBeforeChange";
public static final String DOCUMENT_KEY_FIELD = "documentKey";
public static final String OPERATION_TYPE_FIELD = "operationType";
@ -94,6 +96,7 @@ public class MongoDBEnvelope {
+ " { \"name\": \"_id\", \"type\": \"string\" },"
+ " { \"name\": \"operationType\", \"type\": [\"string\", \"null\"] },"
+ " { \"name\": \"fullDocument\", \"type\": [\"string\", \"null\"] },"
+ " { \"name\": \"fullDocumentBeforeChange\", \"type\": [\"string\", \"null\"] },"
+ " { \"name\": \"source\","
+ " \"type\": [{\"name\": \"source\", \"type\": \"record\", \"fields\": ["
+ " {\"name\": \"ts_ms\", \"type\": \"long\"},"

@ -195,6 +195,17 @@ public class MongoDBSourceBuilder<T> {
return this;
}
/**
* scan.full-changelog
*
* <p>Whether to generate full mode row data by looking up full document pre- and post-image
* collections. Requires MongoDB >= 6.0.
*/
public MongoDBSourceBuilder<T> scanFullChangelog(boolean enableFullDocPrePostImage) {
this.configFactory.scanFullChangelog(enableFullDocPrePostImage);
return this;
}
/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.

@ -49,6 +49,7 @@ public class MongoDBSourceConfig implements SourceConfig {
private final int splitMetaGroupSize;
private final int splitSizeMB;
private final boolean closeIdleReaders;
private final boolean enableFullDocPrePostImage;
MongoDBSourceConfig(
String scheme,
@ -66,7 +67,8 @@ public class MongoDBSourceConfig implements SourceConfig {
int heartbeatIntervalMillis,
int splitMetaGroupSize,
int splitSizeMB,
boolean closeIdleReaders) {
boolean closeIdleReaders,
boolean enableFullDocPrePostImage) {
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
@ -84,6 +86,7 @@ public class MongoDBSourceConfig implements SourceConfig {
this.splitMetaGroupSize = splitMetaGroupSize;
this.splitSizeMB = splitSizeMB;
this.closeIdleReaders = closeIdleReaders;
this.enableFullDocPrePostImage = enableFullDocPrePostImage;
}
public String getScheme() {
@ -163,6 +166,10 @@ public class MongoDBSourceConfig implements SourceConfig {
return closeIdleReaders;
}
public boolean isFullDocPrePostImageEnabled() {
return enableFullDocPrePostImage;
}
@Override
public boolean equals(Object o) {
if (this == o) {

@ -59,6 +59,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue();
private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
private boolean closeIdleReaders = false;
private boolean enableFullDocPrePostImage = false;
/** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
public MongoDBSourceConfigFactory scheme(String scheme) {
@ -219,6 +220,17 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
return this;
}
/**
* scan.full-changelog
*
* <p>Whether to generate full mode row data by looking up full document pre- and post-image
* collections. Requires MongoDB >= 6.0.
*/
public MongoDBSourceConfigFactory scanFullChangelog(boolean enableFullDocPrePostImage) {
this.enableFullDocPrePostImage = enableFullDocPrePostImage;
return this;
}
/** Creates a new {@link MongoDBSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public MongoDBSourceConfig create(int subtaskId) {
@ -239,6 +251,7 @@ public class MongoDBSourceConfigFactory implements Factory<MongoDBSourceConfig>
heartbeatIntervalMillis,
splitMetaGroupSize,
splitSizeMB,
closeIdleReaders);
closeIdleReaders,
enableFullDocPrePostImage);
}
}

@ -78,8 +78,8 @@ public class MongoDBSourceOptions {
"The ampersand-separated MongoDB connection options. "
+ "eg. replicaSet=test&connectTimeoutMS=300000");
public static final ConfigOption<Integer> COPY_EXISTING_QUEUE_SIZE =
ConfigOptions.key("copy.existing.queue.size")
public static final ConfigOption<Integer> INITIAL_SNAPSHOTTING_QUEUE_SIZE =
ConfigOptions.key("initial.snapshotting.queue.size")
.intType()
.defaultValue(10240)
.withDescription(
@ -133,4 +133,12 @@ public class MongoDBSourceOptions {
.defaultValue(64)
.withDescription(
"The chunk size mb of incremental snapshot. Defaults to 64mb.");
@Experimental
public static final ConfigOption<Boolean> FULL_DOCUMENT_PRE_POST_IMAGE =
ConfigOptions.key("scan.full-changelog")
.booleanType()
.defaultValue(false)
.withDescription(
"Scan full mode changelog. Only available when MongoDB >= 6.0. Defaults to false.");
}

@ -334,7 +334,6 @@ public class MongoDBStreamFetchTask implements FetchTask<SourceSplitBase> {
Instant clusterInstant = Instant.ofEpochSecond(clusterTime.getTime());
source.put(TIMESTAMP_KEY_FIELD, new BsonInt64(clusterInstant.toEpochMilli()));
changeStreamDocument.put(SOURCE_FIELD, source);
return changeStreamDocument;
}

@ -23,6 +23,7 @@ import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.connection.MongoClientPool;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
@ -131,14 +132,16 @@ public class MongoUtils {
descriptor.getDatabaseRegex(),
descriptor.getNamespaceRegex(),
sourceConfig.getBatchSize(),
sourceConfig.isUpdateLookup());
sourceConfig.isUpdateLookup(),
sourceConfig.isFullDocPrePostImageEnabled());
}
public static ChangeStreamIterable<Document> getChangeStreamIterable(
MongoClient mongoClient,
ChangeStreamDescriptor descriptor,
int batchSize,
boolean updateLookup) {
boolean updateLookup,
boolean fullDocPrePostImage) {
return getChangeStreamIterable(
mongoClient,
descriptor.getDatabase(),
@ -146,7 +149,8 @@ public class MongoUtils {
descriptor.getDatabaseRegex(),
descriptor.getNamespaceRegex(),
batchSize,
updateLookup);
updateLookup,
fullDocPrePostImage);
}
public static ChangeStreamIterable<Document> getChangeStreamIterable(
@ -156,7 +160,8 @@ public class MongoUtils {
@Nullable Pattern databaseRegex,
@Nullable Pattern namespaceRegex,
int batchSize,
boolean updateLookup) {
boolean updateLookup,
boolean fullDocPrePostImage) {
ChangeStreamIterable<Document> changeStream;
if (StringUtils.isNotEmpty(database) && StringUtils.isNotEmpty(collection)) {
MongoCollection<Document> coll =
@ -216,9 +221,14 @@ public class MongoUtils {
changeStream.batchSize(batchSize);
}
if (updateLookup) {
if (fullDocPrePostImage) {
// require both pre-image and post-image
changeStream.fullDocument(FullDocument.REQUIRED);
changeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
} else if (updateLookup) {
changeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
}
return changeStream;
}
@ -226,7 +236,7 @@ public class MongoUtils {
public static BsonDocument getLatestResumeToken(
MongoClient mongoClient, ChangeStreamDescriptor descriptor) {
ChangeStreamIterable<Document> changeStreamIterable =
getChangeStreamIterable(mongoClient, descriptor, 1, false);
getChangeStreamIterable(mongoClient, descriptor, 1, false, false);
// Nullable when no change record or postResumeToken (new in MongoDB 4.0.7).
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> changeStreamCursor =
@ -339,6 +349,14 @@ public class MongoUtils {
.withDocumentClass(documentClass);
}
public static String getMongoVersion(MongoDBSourceConfig sourceConfig) {
MongoClient client = MongoClientPool.getInstance().getOrCreateMongoClient(sourceConfig);
return client.getDatabase("config")
.runCommand(new BsonDocument("buildinfo", new BsonString("")))
.get("version")
.toString();
}
public static MongoClient clientFor(MongoDBSourceConfig sourceConfig) {
return MongoClientPool.getInstance().getOrCreateMongoClient(sourceConfig);
}

@ -1,11 +1,11 @@
/*
* Copyright 2008-present MongoDB, Inc.
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -166,12 +166,12 @@ public class MongoDBConnectorDeserializationSchema
}
}
private GenericRowData extractRowData(BsonDocument document) throws Exception {
protected GenericRowData extractRowData(BsonDocument document) throws Exception {
checkNotNull(document);
return (GenericRowData) physicalConverter.convert(document);
}
private BsonDocument extractBsonDocument(Struct value, Schema valueSchema, String fieldName) {
protected BsonDocument extractBsonDocument(Struct value, Schema valueSchema, String fieldName) {
if (valueSchema.field(fieldName) != null) {
String docString = value.getString(fieldName);
if (docString != null) {
@ -186,12 +186,12 @@ public class MongoDBConnectorDeserializationSchema
return resultTypeInfo;
}
private OperationType operationTypeFor(SourceRecord record) {
protected OperationType operationTypeFor(SourceRecord record) {
Struct value = (Struct) record.value();
return OperationType.fromString(value.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD));
}
private void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector) {
protected void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector) {
if (!hasMetadata) {
collector.collect(physicalRow);
return;

@ -0,0 +1,129 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import com.mongodb.client.model.changestream.OperationType;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.debezium.table.MetadataConverter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import java.time.ZoneId;
/**
* Deserialization schema from Mongodb ChangeStreamDocument to Flink Table/SQL internal data that
* produces full changelog mode structure {@link RowData}.
*/
public class MongoDBConnectorFullChangelogDeserializationSchema
extends MongoDBConnectorDeserializationSchema {
private static final long serialVersionUID = 1750787080613035184L;
public MongoDBConnectorFullChangelogDeserializationSchema(
RowType physicalDataType,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> resultTypeInfo,
ZoneId localTimeZone) {
super(physicalDataType, metadataConverters, resultTypeInfo, localTimeZone);
}
@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
OperationType op = operationTypeFor(record);
BsonDocument documentKey =
extractBsonDocument(value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD);
BsonDocument fullDocument =
extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD);
BsonDocument fullDocumentBeforeChange =
extractBsonDocument(
value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_BEFORE_CHANGE_FIELD);
switch (op) {
case INSERT:
GenericRowData insert = extractRowData(fullDocument);
insert.setRowKind(RowKind.INSERT);
emit(record, insert, out);
break;
case DELETE:
// there might be FullDocBeforeChange field
// if fullDocumentPrePostImage feature is on
// convert it to Delete row data with full document
if (fullDocumentBeforeChange != null) {
GenericRowData updateBefore = extractRowData(fullDocumentBeforeChange);
updateBefore.setRowKind(RowKind.DELETE);
emit(record, updateBefore, out);
} else {
GenericRowData delete = extractRowData(documentKey);
delete.setRowKind(RowKind.DELETE);
emit(record, delete, out);
}
break;
case UPDATE:
// Its null if another operation deletes the document
// before the lookup operation happens. Ignored it.
if (fullDocument == null) {
break;
}
// there might be FullDocBeforeChange field
// if fullDocumentPrePostImage feature is on
// convert it to UB row data
if (fullDocumentBeforeChange != null) {
GenericRowData updateBefore = extractRowData(fullDocumentBeforeChange);
updateBefore.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, updateBefore, out);
}
GenericRowData updateAfter = extractRowData(fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, updateAfter, out);
break;
case REPLACE:
// there might be FullDocBeforeChange field
// if fullDocumentPrePostImage feature is on
// convert it to UB row data
if (fullDocumentBeforeChange != null) {
GenericRowData updateBefore = extractRowData(fullDocumentBeforeChange);
updateBefore.setRowKind(RowKind.UPDATE_BEFORE);
emit(record, updateBefore, out);
}
GenericRowData replaceAfter = extractRowData(fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, replaceAfter, out);
break;
case INVALIDATE:
case DROP:
case DROP_DATABASE:
case RENAME:
case OTHER:
default:
break;
}
}
}

@ -17,7 +17,6 @@
package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
@ -28,7 +27,6 @@ import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
@ -72,7 +70,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final String database;
private final String collection;
private final StartupOptions startupOptions;
private final Integer copyExistingQueueSize;
private final Integer initialSnapshottingQueueSize;
private final Integer batchSize;
private final Integer pollMaxBatchSize;
private final Integer pollAwaitTimeMillis;
@ -82,6 +80,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final Integer splitMetaGroupSize;
private final Integer splitSizeMB;
private final boolean closeIdlerReaders;
private final boolean enableFullDocPrePostImage;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -103,7 +102,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
@Nullable String collection,
@Nullable String connectionOptions,
StartupOptions startupOptions,
@Nullable Integer copyExistingQueueSize,
@Nullable Integer initialSnapshottingQueueSize,
@Nullable Integer batchSize,
@Nullable Integer pollMaxBatchSize,
@Nullable Integer pollAwaitTimeMillis,
@ -112,7 +111,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
boolean enableParallelRead,
@Nullable Integer splitMetaGroupSize,
@Nullable Integer splitSizeMB,
boolean closeIdlerReaders) {
boolean closeIdlerReaders,
boolean enableFullDocPrePostImage) {
this.physicalSchema = physicalSchema;
this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
@ -122,7 +122,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.collection = collection;
this.connectionOptions = connectionOptions;
this.startupOptions = checkNotNull(startupOptions);
this.copyExistingQueueSize = copyExistingQueueSize;
this.initialSnapshottingQueueSize = initialSnapshottingQueueSize;
this.batchSize = batchSize;
this.pollMaxBatchSize = pollMaxBatchSize;
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
@ -134,15 +134,18 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.splitMetaGroupSize = splitMetaGroupSize;
this.splitSizeMB = splitSizeMB;
this.closeIdlerReaders = closeIdlerReaders;
this.enableFullDocPrePostImage = enableFullDocPrePostImage;
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
if (this.enableFullDocPrePostImage) {
// generate full-mode changelog with FullDocPrePostImage
return ChangelogMode.all();
} else {
// upsert changelog only
return ChangelogMode.upsert();
}
}
@Override
@ -153,8 +156,11 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
TypeInformation<RowData> typeInfo = scanContext.createTypeInformation(producedDataType);
DebeziumDeserializationSchema<RowData> deserializer =
new MongoDBConnectorDeserializationSchema(
physicalDataType, metadataConverters, typeInfo, localTimeZone);
enableFullDocPrePostImage
? new MongoDBConnectorFullChangelogDeserializationSchema(
physicalDataType, metadataConverters, typeInfo, localTimeZone)
: new MongoDBConnectorDeserializationSchema(
physicalDataType, metadataConverters, typeInfo, localTimeZone);
String databaseList = null;
String collectionList = null;
@ -183,6 +189,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
.scheme(scheme)
.hosts(hosts)
.closeIdleReaders(closeIdlerReaders)
.scanFullChangelog(enableFullDocPrePostImage)
.startupOptions(startupOptions)
.deserializer(deserializer);
@ -198,34 +205,23 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
.ifPresent(builder::heartbeatIntervalMillis);
Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize);
Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB);
return SourceProvider.of(builder.build());
} else {
com.ververica.cdc.connectors.mongodb.MongoDBSource.Builder<RowData> builder =
com.ververica.cdc.connectors.mongodb.MongoDBSource.<RowData>builder()
.scheme(scheme)
.hosts(hosts)
.scanFullChangelog(enableFullDocPrePostImage)
.startupOptions(startupOptions)
.deserializer(deserializer);
switch (startupOptions.startupMode) {
case INITIAL:
builder.copyExisting(true);
break;
case LATEST_OFFSET:
builder.copyExisting(false);
break;
default:
throw new ValidationException(
startupOptions.startupMode
+ " is not supported by legacy source. To use this feature, 'scan.incremental.snapshot.enabled' needs to be set to true.");
}
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
Optional.ofNullable(username).ifPresent(builder::username);
Optional.ofNullable(password).ifPresent(builder::password);
Optional.ofNullable(connectionOptions).ifPresent(builder::connectionOptions);
Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize);
Optional.ofNullable(initialSnapshottingQueueSize)
.ifPresent(builder::initialSnapshottingQueueSize);
Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
@ -280,7 +276,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
collection,
connectionOptions,
startupOptions,
copyExistingQueueSize,
initialSnapshottingQueueSize,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
@ -289,7 +285,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
enableParallelRead,
splitMetaGroupSize,
splitSizeMB,
closeIdlerReaders);
closeIdlerReaders,
enableFullDocPrePostImage);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -313,7 +310,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
&& Objects.equals(collection, that.collection)
&& Objects.equals(connectionOptions, that.connectionOptions)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize)
&& Objects.equals(initialSnapshottingQueueSize, that.initialSnapshottingQueueSize)
&& Objects.equals(batchSize, that.batchSize)
&& Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
&& Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis)
@ -339,7 +336,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
collection,
connectionOptions,
startupOptions,
copyExistingQueueSize,
initialSnapshottingQueueSize,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,

@ -41,10 +41,11 @@ import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_START
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING_QUEUE_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.DATABASE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.FULL_DOCUMENT_PRE_POST_IMAGE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HOSTS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.INITIAL_SNAPSHOTTING_QUEUE_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.PASSWORD;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
@ -88,7 +89,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
Integer heartbeatIntervalMillis = config.get(HEARTBEAT_INTERVAL_MILLIS);
StartupOptions startupOptions = getStartupOptions(config);
Integer copyExistingQueueSize = config.getOptional(COPY_EXISTING_QUEUE_SIZE).orElse(null);
Integer initialSnapshottingQueueSize =
config.getOptional(INITIAL_SNAPSHOTTING_QUEUE_SIZE).orElse(null);
String zoneId = context.getConfiguration().get(TableConfigOptions.LOCAL_TIME_ZONE);
ZoneId localTimeZone =
@ -102,6 +104,9 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE);
boolean enableFullDocumentPrePostImage =
config.getOptional(FULL_DOCUMENT_PRE_POST_IMAGE).orElse(false);
ResolvedSchema physicalSchema =
getPhysicalSchema(context.getCatalogTable().getResolvedSchema());
checkArgument(physicalSchema.getPrimaryKey().isPresent(), "Primary key must be present");
@ -119,7 +124,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
collection,
connectionOptions,
startupOptions,
copyExistingQueueSize,
initialSnapshottingQueueSize,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
@ -128,7 +133,8 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
enableParallelRead,
splitMetaGroupSize,
splitSizeMB,
enableCloseIdleReaders);
enableCloseIdleReaders,
enableFullDocumentPrePostImage);
}
private void checkPrimaryKey(UniqueConstraint pk, String message) {
@ -191,7 +197,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(COLLECTION);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(COPY_EXISTING_QUEUE_SIZE);
options.add(INITIAL_SNAPSHOTTING_QUEUE_SIZE);
options.add(BATCH_SIZE);
options.add(POLL_MAX_BATCH_SIZE);
options.add(POLL_AWAIT_TIME_MILLIS);
@ -200,6 +206,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB);
options.add(CHUNK_META_GROUP_SIZE);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(FULL_DOCUMENT_PRE_POST_IMAGE);
return options;
}
}

@ -20,164 +20,124 @@ import com.github.dockerjava.api.command.InspectContainerResponse;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.ImageFromDockerfile;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Random;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
/** Mongodb test container. */
public class MongoDBContainer extends GenericContainer<MongoDBContainer> {
/** Container for testing MongoDB >= 5.0.3. */
public class MongoDBContainer extends org.testcontainers.containers.MongoDBContainer {
private static final Logger LOG = LoggerFactory.getLogger(MongoDBContainer.class);
private static final String DOCKER_IMAGE_NAME = "mongo:5.0.2";
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)//.*$");
public static final int MONGODB_PORT = 27017;
public static final String MONGO_SUPER_USER = "superuser";
public static final String MONGO_SUPER_PASSWORD = "superpw";
public static final String FLINK_USER = "flinkuser";
public static final String FLINK_USER_PASSWORD = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)//.*$");
public MongoDBContainer(String imageName) {
super(imageName);
}
@Override
protected void containerIsStarted(InspectContainerResponse containerInfo, boolean reused) {
super.containerIsStarted(containerInfo, reused);
private final ShardingClusterRole clusterRole;
final String setupFilePath = "docker/mongodb/setup.js";
final URL setupFile = MongoDBContainer.class.getClassLoader().getResource(setupFilePath);
public MongoDBContainer(Network network) {
this(network, ShardingClusterRole.NONE);
assertNotNull("Cannot locate " + setupFilePath, setupFile);
try {
String createUserCommand =
Files.readAllLines(Paths.get(setupFile.toURI())).stream()
.filter(x -> StringUtils.isNotBlank(x) && !x.trim().startsWith("//"))
.map(
x -> {
final Matcher m = COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining(" "));
ExecResult execResult =
execInContainer(
"mongosh",
"--eval",
"use admin",
"--eval",
createUserCommand,
"--eval",
"console.log('Flink test user created.\\n');");
LOG.info(execResult.getStdout());
if (execResult.getExitCode() != 0) {
throw new IllegalStateException(
"Execute mongo command failed " + execResult.getStderr());
}
this.waitingFor(Wait.forLogMessage("Flink test user created.\\s", 1));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public MongoDBContainer(Network network, ShardingClusterRole clusterRole) {
super(
new ImageFromDockerfile()
.withFileFromClasspath("random.key", "docker/mongodb/random.key")
.withFileFromClasspath("setup.js", "docker/mongodb/setup.js")
.withDockerfileFromBuilder(
builder ->
builder.from(DOCKER_IMAGE_NAME)
.copy(
"setup.js",
"/docker-entrypoint-initdb.d/setup.js")
.copy("random.key", "/data/keyfile/random.key")
.run("chown mongodb /data/keyfile/random.key")
.run("chmod 400 /data/keyfile/random.key")
.env("MONGO_INITDB_ROOT_USERNAME", MONGO_SUPER_USER)
.env(
"MONGO_INITDB_ROOT_PASSWORD",
MONGO_SUPER_PASSWORD)
.env("MONGO_INITDB_DATABASE", "admin")
.build()));
this.clusterRole = clusterRole;
@Override
public MongoDBContainer withSharding() {
return (MongoDBContainer) super.withSharding();
}
withNetwork(network);
withNetworkAliases(clusterRole.hostname);
withExposedPorts(MONGODB_PORT);
withCommand(ShardingClusterRole.startupCommand(clusterRole));
waitingFor(clusterRole.waitStrategy);
@Override
public MongoDBContainer withLogConsumer(Consumer<OutputFrame> consumer) {
return (MongoDBContainer) super.withLogConsumer(consumer);
}
public String getConnectionString(String username, String password) {
return String.format(
"mongodb://%s:%s@%s:%d",
username, password, getContainerIpAddress(), getMappedPort(MONGODB_PORT));
@Override
public MongoDBContainer withNetwork(Network network) {
return (MongoDBContainer) super.withNetwork(network);
}
public String getHostAndPort() {
return String.format("%s:%s", getContainerIpAddress(), getMappedPort(MONGODB_PORT));
@Override
public MongoDBContainer withNetworkAliases(String... aliases) {
return (MongoDBContainer) super.withNetworkAliases(aliases);
}
public void executeCommand(String command) {
try {
LOG.info("Executing mongo command: {}", command);
ExecResult execResult =
execInContainer(
"mongo",
"-u",
MONGO_SUPER_USER,
"-p",
MONGO_SUPER_PASSWORD,
"--eval",
command);
ExecResult execResult = execInContainer("mongosh", "--eval", command);
LOG.info(execResult.getStdout());
if (execResult.getExitCode() != 0) {
throw new IllegalStateException(
"Execute mongo command failed " + execResult.getStdout());
"Execute mongo command failed " + execResult.getStderr());
}
} catch (InterruptedException | IOException e) {
throw new IllegalStateException("Execute mongo command failed", e);
}
}
@Override
protected void containerIsStarted(InspectContainerResponse containerInfo) {
LOG.info("Preparing a MongoDB Container with sharding cluster role {}...", clusterRole);
if (clusterRole != ShardingClusterRole.ROUTER) {
initReplicaSet();
} else {
initShard();
public String executeCommandInDatabase(String command, String databaseName) {
try {
executeCommand(String.format("db = db.getSiblingDB('%s');\n", databaseName) + command);
return databaseName;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected void initReplicaSet() {
LOG.info("Initializing a single node replica set...");
executeCommand(
String.format(
"rs.initiate({ _id : '%s', configsvr: %s, members: [{ _id: 0, host: '%s:%d'}]})",
clusterRole.replicaSetName,
clusterRole == ShardingClusterRole.CONFIG,
clusterRole.hostname,
MONGODB_PORT));
LOG.info("Waiting for single node replica set initialized...");
executeCommand(
String.format(
"var attempt = 0; "
+ "while"
+ "(%s) "
+ "{ "
+ "if (attempt > %d) {quit(1);} "
+ "print('%s ' + attempt); sleep(100); attempt++; "
+ " }",
"db.runCommand( { isMaster: 1 } ).ismaster==false",
60,
"An attempt to await for a single node replica set initialization:"));
}
protected void initShard() {
LOG.info("Initializing a sharded cluster...");
// decrease chunk size from default 64mb to 1mb to make splitter test easier.
executeCommand(
"db.getSiblingDB('config').settings.updateOne(\n"
+ " { _id: \"chunksize\" },\n"
+ " { $set: { _id: \"chunksize\", value: 1 } },\n"
+ " { upsert: true }\n"
+ ");");
executeCommand(
String.format(
"sh.addShard('%s/%s:%d')",
ShardingClusterRole.SHARD.replicaSetName,
ShardingClusterRole.SHARD.hostname,
MONGODB_PORT));
}
/** Executes a mongo command file. */
public String executeCommandFile(String fileNameIgnoreSuffix) {
return executeCommandFileInDatabase(fileNameIgnoreSuffix, fileNameIgnoreSuffix);
/** Executes a mongo command in separate database. */
public String executeCommandInSeparateDatabase(String command, String baseName) {
return executeCommandInDatabase(
command, baseName + "_" + Integer.toUnsignedString(new Random().nextInt(), 36));
}
/** Executes a mongo command file in separate database. */
@ -215,52 +175,12 @@ public class MongoDBContainer extends GenericContainer<MongoDBContainer> {
}
}
/** A MongoDB sharded cluster roles. */
public enum ShardingClusterRole {
// Config servers store metadata and configuration settings for the cluster.
CONFIG("config0", "rs0-config", Wait.forLogMessage(".*[Ww]aiting for connections.*", 2)),
// Each shard contains a subset of the sharded data. Each shard can be deployed as a replica
// set.
SHARD("shard0", "rs0-shard", Wait.forLogMessage(".*[Ww]aiting for connections.*", 2)),
// The mongos acts as a query router, providing an interface between client applications and
// the sharded cluster.
ROUTER("router0", null, Wait.forLogMessage(".*[Ww]aiting for connections.*", 1)),
// None sharded cluster.
NONE("mongo0", "rs0", Wait.forLogMessage(".*Replication has not yet been configured.*", 1));
private final String hostname;
private final String replicaSetName;
private final WaitStrategy waitStrategy;
ShardingClusterRole(String hostname, String replicaSetName, WaitStrategy waitStrategy) {
this.hostname = hostname;
this.replicaSetName = replicaSetName;
this.waitStrategy = waitStrategy;
}
public String getConnectionString() {
return String.format(
"mongodb://%s:%d", getContainerIpAddress(), getMappedPort(MONGODB_PORT));
}
public static String startupCommand(ShardingClusterRole clusterRole) {
switch (clusterRole) {
case CONFIG:
return String.format(
"mongod --configsvr --port %d --replSet %s --keyFile /data/keyfile/random.key",
MONGODB_PORT, clusterRole.replicaSetName);
case SHARD:
return String.format(
"mongod --shardsvr --port %d --replSet %s --keyFile /data/keyfile/random.key",
MONGODB_PORT, clusterRole.replicaSetName);
case ROUTER:
return String.format(
"mongos --configdb %s/%s:%d --bind_ip_all --keyFile /data/keyfile/random.key",
CONFIG.replicaSetName, CONFIG.hostname, MONGODB_PORT);
case NONE:
default:
return String.format(
"mongod --port %d --replSet %s --keyFile /data/keyfile/random.key",
MONGODB_PORT, NONE.replicaSetName);
}
}
public String getHostAndPort() {
return String.format("%s:%s", getContainerIpAddress(), getMappedPort(MONGODB_PORT));
}
}

Loading…
Cancel
Save