diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java index 15b52db42..5e3c72846 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java @@ -89,7 +89,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { super.before(); container = - new MongoDBContainer("mongo:6.0.6") + new MongoDBContainer("mongo:6.0.9") .withSharding() .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS) diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java index 2cc031a93..ecb2a2b2d 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java @@ -70,11 +70,29 @@ public class MongoDBDialect implements DataSourceDialect { return "MongoDB"; } + private static TableId parseTableId(String str) { + return parseTableId(str, true); + } + + private static TableId parseTableId(String str, boolean useCatalogBeforeSchema) { + String[] parts = str.split("[.]", 2); + int numParts = parts.length; + if (numParts == 1) { + return new TableId(null, null, parts[0]); + } else if (numParts == 2) { + return useCatalogBeforeSchema + ? new TableId(parts[0], null, parts[1]) + : new TableId(null, parts[0], parts[1]); + } else { + return null; + } + } + @Override public List discoverDataCollections(MongoDBSourceConfig sourceConfig) { CollectionDiscoveryInfo discoveryInfo = discoverAndCacheDataCollections(sourceConfig); return discoveryInfo.getDiscoveredCollections().stream() - .map(TableId::parse) + .map(MongoDBDialect::parseTableId) .collect(Collectors.toList()); } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java index aaa34c9d3..dd5803c89 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -173,7 +173,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad checkDatabaseNameValidity(database); checkCollectionNameValidity(collection); databaseList = database; - collectionList = database + "." + collection; + // match dot explicitly since it will be used for regex match later + collectionList = database + "[.]" + collection; } else { databaseList = database; collectionList = collection; diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index dd25739ad..f8ef429ec 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -79,7 +79,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { .pollAwaitTimeMillis(500) .create(0); - assertEquals(MongoUtils.getMongoVersion(config), "6.0.6"); + assertEquals(MongoUtils.getMongoVersion(config), "6.0.9"); } @Test diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java index 191aabe10..4e0b71f91 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java @@ -71,7 +71,7 @@ public class MongoDBSourceTestBase { @ClassRule public static final MongoDBContainer CONTAINER = - new MongoDBContainer("mongo:6.0.6") + new MongoDBContainer("mongo:6.0.9") .withSharding() .withLogConsumer(new Slf4jLogConsumer(LOG)); } diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java index 72f4d36cf..8da8ce862 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java @@ -266,6 +266,31 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testMatchCollectionWithDots() throws Exception { + // 1. Given colllections: + // db: [coll.name] + String db = CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted"); + + TableResult result = submitTestCase(db, db + "[.]coll[.]name"); + + // 2. Wait change stream records come + waitForSinkSize("mongodb_sink", 3); + + // 3. Check results + String[] expected = + new String[] { + String.format("+I[%s, coll.name, A101]", db), + String.format("+I[%s, coll.name, A102]", db), + String.format("+I[%s, coll.name, A103]", db) + }; + + List actual = TestValuesTableFactory.getResults("mongodb_sink"); + assertThat(actual, containsInAnyOrder(expected)); + + result.getJobClient().get().cancel().get(); + } + private TableResult submitTestCase(String database, String collection) throws Exception { String sourceDDL = "CREATE TABLE mongodb_source (" diff --git a/flink-connector-mongodb-cdc/src/test/resources/ddl/ns-dotted.js b/flink-connector-mongodb-cdc/src/test/resources/ddl/ns-dotted.js new file mode 100644 index 000000000..e2a28d3cc --- /dev/null +++ b/flink-connector-mongodb-cdc/src/test/resources/ddl/ns-dotted.js @@ -0,0 +1,17 @@ +// Copyright 2023 Ververica Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +db.getCollection('coll.name').insertOne({"seq": "A101"}); +db.getCollection('coll.name').insertOne({"seq": "A102"}); +db.getCollection('coll.name').insertOne({"seq": "A103"}); \ No newline at end of file