[mongodb] Expose batch.size config option for table source. (#1387)

* [mongodb] Expose batch.size config option for table source.
pull/1411/head
Jiabao Sun 3 years ago committed by GitHub
parent 3ce8f824d2
commit 02eca305d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -226,6 +226,13 @@ Connector Options
<td>Integer</td>
<td>The max size of the queue to use when copying data.</td>
</tr>
<tr>
<td>batch.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>Change stream cursor batch size. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster. The default is 0 meaning it uses the server's default value.</td>
</tr>
<tr>
<td>poll.max.batch.size</td>
<td>optional</td>

@ -59,6 +59,8 @@ public class MongoDBSource {
public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue();
public static final int BATCH_SIZE_DEFAULT = 0;
public static final int POLL_MAX_BATCH_SIZE_DEFAULT = 1000;
public static final int POLL_AWAIT_TIME_MILLIS_DEFAULT = 1500;
@ -128,7 +130,7 @@ public class MongoDBSource {
private List<String> databaseList;
private List<String> collectionList;
private String connectionOptions;
private Integer batchSize;
private Integer batchSize = BATCH_SIZE_DEFAULT;
private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS_DEFAULT;
private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE_DEFAULT;
private Boolean copyExisting = true;
@ -186,7 +188,9 @@ public class MongoDBSource {
/**
* batch.size
*
* <p>The cursor batch size. Default: 0
* <p>The change stream cursor batch size. Specifies the maximum number of change events to
* return in each batch of the response from the MongoDB cluster. The default is 0 meaning
* it uses the server's default value. Default: 0
*/
public Builder<T> batchSize(int batchSize) {
checkArgument(batchSize >= 0);

@ -69,6 +69,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private final String copyExistingPipeline;
private final Integer copyExistingMaxThreads;
private final Integer copyExistingQueueSize;
private final Integer batchSize;
private final Integer pollMaxBatchSize;
private final Integer pollAwaitTimeMillis;
private final Integer heartbeatIntervalMillis;
@ -98,6 +99,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
@Nullable String copyExistingPipeline,
@Nullable Integer copyExistingMaxThreads,
@Nullable Integer copyExistingQueueSize,
@Nullable Integer batchSize,
@Nullable Integer pollMaxBatchSize,
@Nullable Integer pollAwaitTimeMillis,
@Nullable Integer heartbeatIntervalMillis,
@ -115,6 +117,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
this.copyExistingPipeline = copyExistingPipeline;
this.copyExistingMaxThreads = copyExistingMaxThreads;
this.copyExistingQueueSize = copyExistingQueueSize;
this.batchSize = batchSize;
this.pollMaxBatchSize = pollMaxBatchSize;
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
this.heartbeatIntervalMillis = heartbeatIntervalMillis;
@ -175,6 +178,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
Optional.ofNullable(copyExistingPipeline).ifPresent(builder::copyExistingPipeline);
Optional.ofNullable(copyExistingMaxThreads).ifPresent(builder::copyExistingMaxThreads);
Optional.ofNullable(copyExistingQueueSize).ifPresent(builder::copyExistingQueueSize);
Optional.ofNullable(batchSize).ifPresent(builder::batchSize);
Optional.ofNullable(pollMaxBatchSize).ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(pollAwaitTimeMillis).ifPresent(builder::pollAwaitTimeMillis);
Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
@ -232,6 +236,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
copyExistingPipeline,
copyExistingMaxThreads,
copyExistingQueueSize,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
heartbeatIntervalMillis,
@ -263,6 +268,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
&& Objects.equals(copyExistingPipeline, that.copyExistingPipeline)
&& Objects.equals(copyExistingMaxThreads, that.copyExistingMaxThreads)
&& Objects.equals(copyExistingQueueSize, that.copyExistingQueueSize)
&& Objects.equals(batchSize, that.batchSize)
&& Objects.equals(pollMaxBatchSize, that.pollMaxBatchSize)
&& Objects.equals(pollAwaitTimeMillis, that.pollAwaitTimeMillis)
&& Objects.equals(heartbeatIntervalMillis, that.heartbeatIntervalMillis)
@ -287,6 +293,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
copyExistingPipeline,
copyExistingMaxThreads,
copyExistingQueueSize,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
heartbeatIntervalMillis,

@ -30,6 +30,7 @@ import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.BATCH_SIZE_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_NONE;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
@ -148,6 +149,16 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
.withDescription(
"The max size of the queue to use when copying data. Defaults to 16000.");
private static final ConfigOption<Integer> BATCH_SIZE =
ConfigOptions.key("batch.size")
.intType()
.defaultValue(BATCH_SIZE_DEFAULT)
.withDescription(
"Change stream cursor batch size. "
+ "Specifies the maximum number of change events to return in each batch "
+ "of the response from the MongoDB cluster."
+ "Defaults to 0 meaning it uses the server's default value.");
private static final ConfigOption<Integer> POLL_MAX_BATCH_SIZE =
ConfigOptions.key("poll.max.batch.size")
.intType()
@ -196,6 +207,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
String errorsTolerance = config.get(ERRORS_TOLERANCE);
Boolean errorsLogEnable = config.get(ERRORS_LOG_ENABLE);
Integer batchSize = config.get(BATCH_SIZE);
Integer pollMaxBatchSize = config.get(POLL_MAX_BATCH_SIZE);
Integer pollAwaitTimeMillis = config.get(POLL_AWAIT_TIME_MILLIS);
@ -232,6 +244,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
copyExistingPipeline,
copyExistingMaxThreads,
copyExistingQueueSize,
batchSize,
pollMaxBatchSize,
pollAwaitTimeMillis,
heartbeatIntervalMillis,
@ -270,6 +283,7 @@ public class MongoDBTableSourceFactory implements DynamicTableSourceFactory {
options.add(COPY_EXISTING_PIPELINE);
options.add(COPY_EXISTING_MAX_THREADS);
options.add(COPY_EXISTING_QUEUE_SIZE);
options.add(BATCH_SIZE);
options.add(POLL_MAX_BATCH_SIZE);
options.add(POLL_AWAIT_TIME_MILLIS);
options.add(HEARTBEAT_INTERVAL_MILLIS);

@ -44,6 +44,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.BATCH_SIZE_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.ERROR_TOLERANCE_ALL;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_AWAIT_TIME_MILLIS_DEFAULT;
import static com.ververica.cdc.connectors.mongodb.MongoDBSource.POLL_MAX_BATCH_SIZE_DEFAULT;
@ -110,6 +111,7 @@ public class MongoDBTableFactoryTest {
null,
null,
null,
BATCH_SIZE_DEFAULT,
POLL_MAX_BATCH_SIZE_DEFAULT,
POLL_AWAIT_TIME_MILLIS_DEFAULT,
null,
@ -127,6 +129,7 @@ public class MongoDBTableFactoryTest {
options.put("copy.existing.pipeline", "[ { \"$match\": { \"closed\": \"false\" } } ]");
options.put("copy.existing.max.threads", "1");
options.put("copy.existing.queue.size", "101");
options.put("batch.size", "101");
options.put("poll.max.batch.size", "102");
options.put("poll.await.time.ms", "103");
options.put("heartbeat.interval.ms", "104");
@ -147,6 +150,7 @@ public class MongoDBTableFactoryTest {
"[ { \"$match\": { \"closed\": \"false\" } } ]",
1,
101,
101,
102,
103,
104,
@ -181,6 +185,7 @@ public class MongoDBTableFactoryTest {
null,
null,
null,
BATCH_SIZE_DEFAULT,
POLL_MAX_BATCH_SIZE_DEFAULT,
POLL_AWAIT_TIME_MILLIS_DEFAULT,
null,

Loading…
Cancel
Save