|
|
|
@ -22,6 +22,9 @@ import com.ververica.cdc.connectors.base.options.StartupOptions;
|
|
|
|
|
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
|
|
|
|
|
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
|
|
|
|
|
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
|
|
|
|
import static org.apache.flink.util.Preconditions.checkArgument;
|
|
|
|
|
import static org.apache.flink.util.Preconditions.checkNotNull;
|
|
|
|
|
|
|
|
|
@ -87,7 +90,10 @@ public class MongoDBSourceBuilder<T> {
|
|
|
|
|
|
|
|
|
|
/** Regular expressions list that match database names to be monitored. */
|
|
|
|
|
public MongoDBSourceBuilder<T> databaseList(String... databases) {
|
|
|
|
|
this.configFactory.databaseList(databases);
|
|
|
|
|
this.configFactory.databaseList(
|
|
|
|
|
Arrays.stream(databases)
|
|
|
|
|
.flatMap(database -> Stream.of(database.split(",")))
|
|
|
|
|
.toArray(String[]::new));
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -96,7 +102,10 @@ public class MongoDBSourceBuilder<T> {
|
|
|
|
|
* monitored. Each identifier is of the form {@code <databaseName>.<collectionName>}.
|
|
|
|
|
*/
|
|
|
|
|
public MongoDBSourceBuilder<T> collectionList(String... collections) {
|
|
|
|
|
this.configFactory.collectionList(collections);
|
|
|
|
|
this.configFactory.collectionList(
|
|
|
|
|
Arrays.stream(collections)
|
|
|
|
|
.flatMap(collection -> Stream.of(collection.split(",")))
|
|
|
|
|
.toArray(String[]::new));
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|