diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java index 2dd1996dc..d8c0c6afd 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBConnectorSourceTask.java @@ -303,12 +303,16 @@ public class MongoDBConnectorSourceTask extends SourceTask { Bson nsFilter = regex(ADD_NS_FIELD_NAME, namespacesRegex); if (databaseList != null) { - String databaseRegex = - includeListAsPatterns(databaseList).stream() - .map(Pattern::pattern) - .collect(Collectors.joining("|")); - Bson dbFilter = regex("ns.db", databaseRegex); - nsFilter = and(dbFilter, nsFilter); + if (isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) { + props.put(MongoSourceConfig.DATABASE_CONFIG, discoveredDatabases.get(0)); + } else { + String databaseRegex = + includeListAsPatterns(databaseList).stream() + .map(Pattern::pattern) + .collect(Collectors.joining("|")); + Bson dbFilter = regex("ns.db", databaseRegex); + nsFilter = and(dbFilter, nsFilter); + } } pipeline.add(match(nsFilter));