From 02eca305d2682fcc0bfd626adb6db1d8b3213a9d Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Fri, 22 Jul 2022 09:30:27 +0800 Subject: [PATCH] [mongodb] Expose batch.size config option for table source. (#1387) * [mongodb] Expose batch.size config option for table source. --- docs/content/connectors/mongodb-cdc.md | 7 +++++++ .../cdc/connectors/mongodb/MongoDBSource.java | 8 ++++++-- .../mongodb/table/MongoDBTableSource.java | 7 +++++++ .../mongodb/table/MongoDBTableSourceFactory.java | 14 ++++++++++++++ .../mongodb/table/MongoDBTableFactoryTest.java | 5 +++++ 5 files changed, 39 insertions(+), 2 deletions(-) diff --git a/docs/content/connectors/mongodb-cdc.md b/docs/content/connectors/mongodb-cdc.md index 3b3aab056..0e30a877a 100644 --- a/docs/content/connectors/mongodb-cdc.md +++ b/docs/content/connectors/mongodb-cdc.md @@ -226,6 +226,13 @@ Connector Options Integer The max size of the queue to use when copying data. + + batch.size + optional + 0 + Integer + Change stream cursor batch size. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster. The default is 0 meaning it uses the server's default value. + poll.max.batch.size optional diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java index 3ce933fdc..5b505af1e 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java @@ -59,6 +59,8 @@ public class MongoDBSource { public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue(); + public static final int BATCH_SIZE_DEFAULT = 0; + public static final int POLL_MAX_BATCH_SIZE_DEFAULT = 1000; public static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = 1500; @@ -128,7 +130,7 @@ public class MongoDBSource { private List databaseList; private List collectionList; private String connectionOptions; - private Integer batchSize; + private Integer batchSize = BATCH_SIZE_DEFAULT; private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT; private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT; private Boolean copyExisting = true; @@ -186,7 +188,9 @@ public class MongoDBSource { /** * batch.size * - *

The cursor batch size. Default: 0 + *

The change stream cursor batch size. Specifies the maximum number of change events to + * return in each batch of the response from the MongoDB cluster. The default is 0 meaning + * it uses the server's default value. Default: 0 */ public Builder batchSize(int batchSize) { checkArgument(batchSize >= 0); diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java index 9ba708010..19ce75baf 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -69,6 +69,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final String copyExistingPipeline; private final Integer copyExistingMaxThreads; private final Integer copyExistingQueueSize; + private final Integer batchSize; private final Integer pollMaxBatchSize; private final Integer pollAwaitTimeMillis; private final Integer heartbeatIntervalMillis; @@ -98,6 +99,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad @Nullable String copyExistingPipeline, @Nullable Integer copyExistingMaxThreads, @Nullable Integer copyExistingQueueSize, + @Nullable Integer batchSize, @Nullable Integer pollMaxBatchSize, @Nullable Integer pollAwaitTimeMillis, @Nullable Integer heartbeatIntervalMillis, @@ -115,6 +117,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad this.copyExistingPipeline = copyExistingPipeline; this.copyExistingMaxThreads = copyExistingMaxThreads; this.copyExistingQueueSize = copyExistingQueueSize; + this.batchSize = batchSize; this.pollMaxBatchSize = pollMaxBatchSize; this.pollAwaitTimeMillis = pollAwaitTimeMillis; this.heartbeatIntervalMillis = heartbeatIntervalMillis; @@ -175,6 +178,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad Optional.ofNullable(copyExistingPipeline).ifPresent(builder::copyExistingPipeline); Optional.ofNullable(copyExistingMaxThreads).ifPresent(builder::copyExistingMaxThreads); Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize); + Optional.ofNullable(batchSize).ifPresent(builder::batchSize); Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize); Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis); Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis); @@ -232,6 +236,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad copyExistingPipeline, copyExistingMaxThreads, copyExistingQueueSize, + batchSize, pollMaxBatchSize, pollAwaitTimeMillis, heartbeatIntervalMillis, @@ -263,6 +268,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad && Objects.equals(copyExistingPipeline, that.copyExistingPipeline) && Objects.equals(copyExistingMaxThreads, that.copyExistingMaxThreads) && Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize) + && Objects.equals(batchSize, that.batchSize) && Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize) && Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis) && Objects.equals(heartbeatIntervalMillis, that.heartbeatIntervalMillis) @@ -287,6 +293,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad copyExistingPipeline, copyExistingMaxThreads, copyExistingQueueSize, + batchSize, pollMaxBatchSize, pollAwaitTimeMillis, heartbeatIntervalMillis, diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index 7428f5ee5..8879a0840 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -30,6 +30,7 @@ import java.time.ZoneId; import java.util.HashSet; import java.util.Set; +import static com.ververica.cdc.connectors.mongodb.MongoDBSource.BATCH_SIZE_DEFAULT; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT; @@ -148,6 +149,16 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { .withDescription( "The max size of the queue to use when copying data. Defaults to 16000."); + private static final ConfigOption BATCH_SIZE = + ConfigOptions.key("batch.size") + .intType() + .defaultValue(BATCH_SIZE_DEFAULT) + .withDescription( + "Change stream cursor batch size. " + + "Specifies the maximum number of change events to return in each batch " + + "of the response from the MongoDB cluster." + + "Defaults to 0 meaning it uses the server's default value."); + private static final ConfigOption POLL_MAX_BATCH_SIZE = ConfigOptions.key("poll.max.batch.size") .intType() @@ -196,6 +207,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { String errorsTolerance = config.get(ERRORS_TOLERANCE); Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE); + Integer batchSize = config.get(BATCH_SIZE); Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE); Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS); @@ -232,6 +244,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { copyExistingPipeline, copyExistingMaxThreads, copyExistingQueueSize, + batchSize, pollMaxBatchSize, pollAwaitTimeMillis, heartbeatIntervalMillis, @@ -270,6 +283,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory { options.add(COPY_EXISTING_PIPELINE); options.add(COPY_EXISTING_MAX_THREADS); options.add(COPY_EXISTING_QUEUE_SIZE); + options.add(BATCH_SIZE); options.add(POLL_MAX_BATCH_SIZE); options.add(POLL_AWAIT_TIME_MILLIS); options.add(HEARTBEAT_INTERVAL_MILLIS); diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index 033ac0855..13337edd8 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import static com.ververica.cdc.connectors.mongodb.MongoDBSource.BATCH_SIZE_DEFAULT; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_ALL; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT; import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT; @@ -110,6 +111,7 @@ public class MongoDBTableFactoryTest { null, null, null, + BATCH_SIZE_DEFAULT, POLL_MAX_BATCH_SIZE_DEFAULT, POLL_AWAIT_TIME_MILLIS_DEFAULT, null, @@ -127,6 +129,7 @@ public class MongoDBTableFactoryTest { options.put("copy.existing.pipeline", "[ { \"$match\": { \"closed\": \"false\" } } ]"); options.put("copy.existing.max.threads", "1"); options.put("copy.existing.queue.size", "101"); + options.put("batch.size", "101"); options.put("poll.max.batch.size", "102"); options.put("poll.await.time.ms", "103"); options.put("heartbeat.interval.ms", "104"); @@ -147,6 +150,7 @@ public class MongoDBTableFactoryTest { "[ { \"$match\": { \"closed\": \"false\" } } ]", 1, 101, + 101, 102, 103, 104, @@ -181,6 +185,7 @@ public class MongoDBTableFactoryTest { null, null, null, + BATCH_SIZE_DEFAULT, POLL_MAX_BATCH_SIZE_DEFAULT, POLL_AWAIT_TIME_MILLIS_DEFAULT, null,