From 18936a36767bc6a33a877dd6aad48fabe6d2df6a Mon Sep 17 00:00:00 2001 From: Shawn Huang Date: Mon, 20 Nov 2023 18:56:48 +0800 Subject: [PATCH] [mongodb] Incremental snapshot source supports comma-separated format parameter for databases and collections (#2724) --- .../mongodb/source/MongoDBSourceBuilder.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java index c6ee4177a..9a4a0baf7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java @@ -22,6 +22,9 @@ import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import java.util.Arrays; +import java.util.stream.Stream; + import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -87,7 +90,10 @@ public class MongoDBSourceBuilder { /** Regular expressions list that match database names to be monitored. */ public MongoDBSourceBuilder databaseList(String... databases) { - this.configFactory.databaseList(databases); + this.configFactory.databaseList( + Arrays.stream(databases) + .flatMap(database -> Stream.of(database.split(","))) + .toArray(String[]::new)); return this; } @@ -96,7 +102,10 @@ public class MongoDBSourceBuilder { * monitored. Each identifier is of the form {@code .}. */ public MongoDBSourceBuilder collectionList(String... collections) { - this.configFactory.collectionList(collections); + this.configFactory.collectionList( + Arrays.stream(collections) + .flatMap(collection -> Stream.of(collection.split(","))) + .toArray(String[]::new)); return this; }