[mongodb][hotfix] Mongo CDC fails to capture collections with `.` in names (#2488)

pull/2399/head
yuxiqian 1 year ago committed by GitHub
parent 1ff3bdddbf
commit 44cd46e227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -89,7 +89,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
super.before();
container =
new MongoDBContainer("mongo:6.0.6")
new MongoDBContainer("mongo:6.0.9")
.withSharding()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS)

@ -70,11 +70,29 @@ public class MongoDBDialect implements DataSourceDialect<MongoDBSourceConfig> {
return "MongoDB";
}
private static TableId parseTableId(String str) {
return parseTableId(str, true);
}
private static TableId parseTableId(String str, boolean useCatalogBeforeSchema) {
String[] parts = str.split("[.]", 2);
int numParts = parts.length;
if (numParts == 1) {
return new TableId(null, null, parts[0]);
} else if (numParts == 2) {
return useCatalogBeforeSchema
? new TableId(parts[0], null, parts[1])
: new TableId(null, parts[0], parts[1]);
} else {
return null;
}
}
@Override
public List<TableId> discoverDataCollections(MongoDBSourceConfig sourceConfig) {
CollectionDiscoveryInfo discoveryInfo = discoverAndCacheDataCollections(sourceConfig);
return discoveryInfo.getDiscoveredCollections().stream()
.map(TableId::parse)
.map(MongoDBDialect::parseTableId)
.collect(Collectors.toList());
}

@ -173,7 +173,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
checkDatabaseNameValidity(database);
checkCollectionNameValidity(collection);
databaseList = database;
collectionList = database + "." + collection;
// match dot explicitly since it will be used for regex match later
collectionList = database + "[.]" + collection;
} else {
databaseList = database;
collectionList = collection;

@ -79,7 +79,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
.pollAwaitTimeMillis(500)
.create(0);
assertEquals(MongoUtils.getMongoVersion(config), "6.0.6");
assertEquals(MongoUtils.getMongoVersion(config), "6.0.9");
}
@Test

@ -71,7 +71,7 @@ public class MongoDBSourceTestBase {
@ClassRule
public static final MongoDBContainer CONTAINER =
new MongoDBContainer("mongo:6.0.6")
new MongoDBContainer("mongo:6.0.9")
.withSharding()
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

@ -266,6 +266,31 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testMatchCollectionWithDots() throws Exception {
// 1. Given colllections:
// db: [coll.name]
String db = CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted");
TableResult result = submitTestCase(db, db + "[.]coll[.]name");
// 2. Wait change stream records come
waitForSinkSize("mongodb_sink", 3);
// 3. Check results
String[] expected =
new String[] {
String.format("+I[%s, coll.name, A101]", db),
String.format("+I[%s, coll.name, A102]", db),
String.format("+I[%s, coll.name, A103]", db)
};
List<String> actual = TestValuesTableFactory.getResults("mongodb_sink");
assertThat(actual, containsInAnyOrder(expected));
result.getJobClient().get().cancel().get();
}
private TableResult submitTestCase(String database, String collection) throws Exception {
String sourceDDL =
"CREATE TABLE mongodb_source ("

@ -0,0 +1,17 @@
// Copyright 2023 Ververica Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
db.getCollection('coll.name').insertOne({"seq": "A101"});
db.getCollection('coll.name').insertOne({"seq": "A102"});
db.getCollection('coll.name').insertOne({"seq": "A103"});
Loading…
Cancel
Save