[mongodb] Support MongoDB 6.0+ & update testcases

pull/2294/head
yuxiqian 2 years ago committed by Leonard Xu
parent d3ed1a7714
commit 7f6eaa4b2e

@ -153,6 +153,12 @@ under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

@ -47,11 +47,9 @@ import java.util.Random;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.FLINK_USER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGODB_PORT;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGO_SUPER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGO_SUPER_USER;
/** End-to-end tests for mongodb-cdc connector uber jar. */
public class MongoE2eITCase extends FlinkContainerTestEnvironment {
@ -62,24 +60,26 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
private static final Path mongoCdcJar = TestUtils.getResource("mongodb-cdc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
private MongoDBContainer config;
private MongoDBContainer shard;
private MongoDBContainer router;
private MongoDBContainer container;
private MongoClient mongoClient;
@Parameterized.Parameter(1)
public boolean parallelismSnapshot;
@Parameterized.Parameters(name = "flinkVersion: {0}, parallelismSnapshot: {1}")
@Parameterized.Parameter(2)
public boolean scanFullChangelog;
@Parameterized.Parameters(
name = "flinkVersion: {0}, parallelismSnapshot: {1}, scanFullChangelog: {2}")
public static List<Object[]> parameters() {
final List<String> flinkVersions = getFlinkVersion();
List<Object[]> params = new ArrayList<>();
for (String flinkVersion : flinkVersions) {
params.add(new Object[] {flinkVersion, true});
params.add(new Object[] {flinkVersion, false});
params.add(new Object[] {flinkVersion, true, true});
params.add(new Object[] {flinkVersion, true, false});
params.add(new Object[] {flinkVersion, false, true});
params.add(new Object[] {flinkVersion, false, false});
}
return params;
}
@ -87,31 +87,25 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
@Before
public void before() {
super.before();
config =
new MongoDBContainer(NETWORK, MongoDBContainer.ShardingClusterRole.CONFIG)
.withLogConsumer(new Slf4jLogConsumer(LOG));
shard =
new MongoDBContainer(NETWORK, MongoDBContainer.ShardingClusterRole.SHARD)
.dependsOn(config)
.withLogConsumer(new Slf4jLogConsumer(LOG));
router =
new MongoDBContainer(NETWORK, MongoDBContainer.ShardingClusterRole.ROUTER)
.dependsOn(shard)
container =
new MongoDBContainer("mongo:6.0.6")
.withSharding()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(LOG));
Startables.deepStart(Stream.of(config)).join();
Startables.deepStart(Stream.of(shard)).join();
Startables.deepStart(Stream.of(router)).join();
Startables.deepStart(Stream.of(container)).join();
if (scanFullChangelog) {
container.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
}
MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
new ConnectionString(
router.getConnectionString(
MONGO_SUPER_USER, MONGO_SUPER_PASSWORD)))
new ConnectionString(container.getConnectionString()))
.build();
mongoClient = MongoClients.create(settings);
}
@ -122,23 +116,21 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
if (mongoClient != null) {
mongoClient.close();
}
if (router != null) {
router.stop();
}
if (shard != null) {
shard.stop();
}
if (config != null) {
config.stop();
if (container != null) {
container.stop();
}
}
@Test
public void testMongoDbCDC() throws Exception {
String dbName =
router.executeCommandFileInDatabase(
container.executeCommandFileInDatabase(
"mongo_inventory",
"inventory" + Integer.toUnsignedString(new Random().nextInt(), 36));
container.executeCommandInDatabase(
"db.runCommand({ collMod: 'products', changeStreamPreAndPostImages: { enabled: true } })",
dbName);
List<String> sqlLines =
Arrays.asList(
"SET 'execution.checkpointing.interval' = '3s';",
@ -159,6 +151,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment {
" 'collection' = 'products',",
" 'heartbeat.interval.ms' = '1000',",
" 'scan.incremental.snapshot.enabled' = '" + parallelismSnapshot + "',",
" 'scan.full-changelog' = '" + scanFullChangelog + "',",
" 'scan.incremental.close-idle-reader.enabled' = '"
+ supportCheckpointsAfterTasksFinished()
+ "'",

@ -51,7 +51,7 @@ under the License.
<dependency>
<groupId>org.mongodb.kafka</groupId>
<artifactId>mongo-kafka-connect</artifactId>
<version>1.6.1</version>
<version>1.10.1</version>
<exclusions>
<exclusion>
<artifactId>mongodb-driver-sync</artifactId>
@ -67,7 +67,7 @@ under the License.
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.3.4</version>
<version>4.9.1</version>
</dependency>
<!-- test dependencies on Flink -->
@ -152,6 +152,13 @@ under the License.
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>

@ -0,0 +1,261 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb;
import com.github.dockerjava.api.command.InspectContainerResponse;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.ImageFromDockerfile;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
/** Mongodb test container. */
public class LegacyMongoDBContainer extends GenericContainer<LegacyMongoDBContainer> {
private static final Logger LOG = LoggerFactory.getLogger(LegacyMongoDBContainer.class);
private static final String DOCKER_IMAGE_NAME = "mongo:5.0.2";
public static final int MONGODB_PORT = 27017;
public static final String MONGO_SUPER_USER = "superuser";
public static final String MONGO_SUPER_PASSWORD = "superpw";
public static final String FLINK_USER = "flinkuser";
public static final String FLINK_USER_PASSWORD = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)//.*$");
private final ShardingClusterRole clusterRole;
public LegacyMongoDBContainer(Network network) {
this(network, ShardingClusterRole.NONE);
}
public LegacyMongoDBContainer(Network network, ShardingClusterRole clusterRole) {
super(
new ImageFromDockerfile()
.withFileFromClasspath("random.key", "docker/mongodb/random.key")
.withFileFromClasspath("setup.js", "docker/mongodb/setup.js")
.withDockerfileFromBuilder(
builder ->
builder.from(DOCKER_IMAGE_NAME)
.copy(
"setup.js",
"/docker-entrypoint-initdb.d/setup.js")
.copy("random.key", "/data/keyfile/random.key")
.run("chown mongodb /data/keyfile/random.key")
.run("chmod 400 /data/keyfile/random.key")
.env("MONGO_INITDB_ROOT_USERNAME", MONGO_SUPER_USER)
.env(
"MONGO_INITDB_ROOT_PASSWORD",
MONGO_SUPER_PASSWORD)
.env("MONGO_INITDB_DATABASE", "admin")
.build()));
this.clusterRole = clusterRole;
withNetwork(network);
withNetworkAliases(clusterRole.hostname);
withExposedPorts(MONGODB_PORT);
withCommand(ShardingClusterRole.startupCommand(clusterRole));
waitingFor(clusterRole.waitStrategy);
}
public String getConnectionString(String username, String password) {
return String.format(
"mongodb://%s:%s@%s:%d",
username, password, getContainerIpAddress(), getMappedPort(MONGODB_PORT));
}
public String getHostAndPort() {
return String.format("%s:%s", getContainerIpAddress(), getMappedPort(MONGODB_PORT));
}
public void executeCommand(String command) {
try {
LOG.info("Executing mongo command: {}", command);
ExecResult execResult =
execInContainer(
"mongo",
"-u",
MONGO_SUPER_USER,
"-p",
MONGO_SUPER_PASSWORD,
"--eval",
command);
LOG.info(execResult.getStdout());
if (execResult.getExitCode() != 0) {
throw new IllegalStateException(
"Execute mongo command failed " + execResult.getStdout());
}
} catch (InterruptedException | IOException e) {
throw new IllegalStateException("Execute mongo command failed", e);
}
}
@Override
protected void containerIsStarted(InspectContainerResponse containerInfo) {
LOG.info("Preparing a MongoDB Container with sharding cluster role {}...", clusterRole);
if (clusterRole != ShardingClusterRole.ROUTER) {
initReplicaSet();
} else {
initShard();
}
}
protected void initReplicaSet() {
LOG.info("Initializing a single node replica set...");
executeCommand(
String.format(
"rs.initiate({ _id : '%s', configsvr: %s, members: [{ _id: 0, host: '%s:%d'}]})",
clusterRole.replicaSetName,
clusterRole == ShardingClusterRole.CONFIG,
clusterRole.hostname,
MONGODB_PORT));
LOG.info("Waiting for single node replica set initialized...");
executeCommand(
String.format(
"var attempt = 0; "
+ "while"
+ "(%s) "
+ "{ "
+ "if (attempt > %d) {quit(1);} "
+ "print('%s ' + attempt); sleep(100); attempt++; "
+ " }",
"db.runCommand( { isMaster: 1 } ).ismaster==false",
60,
"An attempt to await for a single node replica set initialization:"));
}
protected void initShard() {
LOG.info("Initializing a sharded cluster...");
// decrease chunk size from default 64mb to 1mb to make splitter test easier.
executeCommand(
"db.getSiblingDB('config').settings.updateOne(\n"
+ " { _id: \"chunksize\" },\n"
+ " { $set: { _id: \"chunksize\", value: 1 } },\n"
+ " { upsert: true }\n"
+ ");");
executeCommand(
String.format(
"sh.addShard('%s/%s:%d')",
ShardingClusterRole.SHARD.replicaSetName,
ShardingClusterRole.SHARD.hostname,
MONGODB_PORT));
}
/** Executes a mongo command file in separate database. */
public String executeCommandFileInSeparateDatabase(String fileNameIgnoreSuffix) {
return executeCommandFileInDatabase(
fileNameIgnoreSuffix,
fileNameIgnoreSuffix + "_" + Integer.toUnsignedString(new Random().nextInt(), 36));
}
/** Executes a mongo command file, specify a database name. */
public String executeCommandFileInDatabase(String fileNameIgnoreSuffix, String databaseName) {
final String dbName = databaseName != null ? databaseName : fileNameIgnoreSuffix;
final String ddlFile = String.format("ddl/%s.js", fileNameIgnoreSuffix);
final URL ddlTestFile = LegacyMongoDBContainer.class.getClassLoader().getResource(ddlFile);
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
try {
// use database;
String command0 = String.format("db = db.getSiblingDB('%s');\n", dbName);
String command1 =
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
.filter(x -> StringUtils.isNotBlank(x) && !x.trim().startsWith("//"))
.map(
x -> {
final Matcher m = COMMENT_PATTERN.matcher(x);
return m.matches() ? m.group(1) : x;
})
.collect(Collectors.joining("\n"));
executeCommand(command0 + command1);
return dbName;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/** A MongoDB sharded cluster roles. */
public enum ShardingClusterRole {
// Config servers store metadata and configuration settings for the cluster.
CONFIG("config0", "rs0-config", Wait.forLogMessage(".*[Ww]aiting for connections.*", 2)),
// Each shard contains a subset of the sharded data. Each shard can be deployed as a replica
// set.
SHARD("shard0", "rs0-shard", Wait.forLogMessage(".*[Ww]aiting for connections.*", 2)),
// The mongos acts as a query router, providing an interface between client applications and
// the sharded cluster.
ROUTER("router0", null, Wait.forLogMessage(".*[Ww]aiting for connections.*", 1)),
// None sharded cluster.
NONE("mongo0", "rs0", Wait.forLogMessage(".*Replication has not yet been configured.*", 1));
private final String hostname;
private final String replicaSetName;
private final WaitStrategy waitStrategy;
ShardingClusterRole(String hostname, String replicaSetName, WaitStrategy waitStrategy) {
this.hostname = hostname;
this.replicaSetName = replicaSetName;
this.waitStrategy = waitStrategy;
}
public static String startupCommand(ShardingClusterRole clusterRole) {
switch (clusterRole) {
case CONFIG:
return String.format(
"mongod --configsvr --port %d --replSet %s --keyFile /data/keyfile/random.key",
MONGODB_PORT, clusterRole.replicaSetName);
case SHARD:
return String.format(
"mongod --shardsvr --port %d --replSet %s --keyFile /data/keyfile/random.key",
MONGODB_PORT, clusterRole.replicaSetName);
case ROUTER:
return String.format(
"mongos --configdb %s/%s:%d --bind_ip_all --keyFile /data/keyfile/random.key",
CONFIG.replicaSetName, CONFIG.hostname, MONGODB_PORT);
case NONE:
default:
return String.format(
"mongod --port %d --replSet %s --keyFile /data/keyfile/random.key",
MONGODB_PORT, NONE.replicaSetName);
}
}
}
}

@ -19,17 +19,16 @@ package com.ververica.cdc.connectors.mongodb;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.junit.Ignore;
import org.junit.Test;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.FLINK_USER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBTestBase.MONGODB_CONTAINER;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
/** Example Tests for {@link MongoDBSource}. */
public class LegacyMongoDBSourceExampleTest extends MongoDBSourceTestBase {
public class LegacyMongoDBSourceExampleTest extends LegacyMongoDBSourceTestBase {
@Test
@Ignore("Test ignored because it won't stop and is used for manual test")

@ -57,14 +57,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.FLINK_USER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertDelete;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertInsert;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertObjectIdEquals;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertReplace;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertUpdate;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@ -0,0 +1,116 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.MONGO_SUPER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.MONGO_SUPER_USER;
/** Basic class for testing {@link MongoDBSource}. */
public abstract class LegacyMongoDBSourceTestBase extends TestLogger {
protected static final Logger LOG = LoggerFactory.getLogger(LegacyMongoDBSourceTestBase.class);
protected static final int DEFAULT_PARALLELISM = 4;
@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.build());
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ClassRule
public static final LegacyMongoDBContainer CONFIG =
new LegacyMongoDBContainer(NETWORK, LegacyMongoDBContainer.ShardingClusterRole.CONFIG)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final LegacyMongoDBContainer SHARD =
new LegacyMongoDBContainer(NETWORK, LegacyMongoDBContainer.ShardingClusterRole.SHARD)
.dependsOn(CONFIG)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final LegacyMongoDBContainer ROUTER =
new LegacyMongoDBContainer(NETWORK, LegacyMongoDBContainer.ShardingClusterRole.ROUTER)
.dependsOn(SHARD)
.withLogConsumer(new Slf4jLogConsumer(LOG));
protected static MongoClient mongodbClient;
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(CONFIG)).join();
Startables.deepStart(Stream.of(SHARD)).join();
Startables.deepStart(Stream.of(ROUTER)).join();
initialClient();
LOG.info("Containers are started.");
}
@AfterClass
public static void closeContainers() {
if (mongodbClient != null) {
mongodbClient.close();
}
if (ROUTER != null) {
ROUTER.close();
}
if (SHARD != null) {
SHARD.close();
}
if (CONFIG != null) {
CONFIG.close();
}
}
private static void initialClient() {
MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
new ConnectionString(
ROUTER.getConnectionString(
MONGO_SUPER_USER, MONGO_SUPER_PASSWORD)))
.build();
mongodbClient = MongoClients.create(settings);
}
}

@ -23,7 +23,6 @@ import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.slf4j.Logger;
@ -34,8 +33,8 @@ import org.testcontainers.lifecycle.Startables;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGO_SUPER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGO_SUPER_USER;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.MONGO_SUPER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.LegacyMongoDBContainer.MONGO_SUPER_USER;
/**
* Basic class for testing MongoDB source, this contains a MongoDB container which enables change
@ -47,8 +46,8 @@ public class LegacyMongoDBTestBase extends AbstractTestBase {
@ClassRule public static final Network NETWORK = Network.newNetwork();
protected static final MongoDBContainer MONGODB_CONTAINER =
new MongoDBContainer(NETWORK).withLogConsumer(new Slf4jLogConsumer(LOG));
protected static final LegacyMongoDBContainer MONGODB_CONTAINER =
new LegacyMongoDBContainer(NETWORK).withLogConsumer(new Slf4jLogConsumer(LOG));
protected static MongoClient mongodbClient;

@ -0,0 +1,384 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mongodb.source;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils;
import com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.bson.Document;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRows;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.triggerFailover;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
/** Integration tests for MongoDB full document before change info. */
@RunWith(Parameterized.class)
public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
private final boolean parallelismSnapshot;
public MongoDBFullChangelogITCase(boolean parallelismSnapshot) {
this.parallelismSnapshot = parallelismSnapshot;
}
@Parameterized.Parameters(name = "parallelismSnapshot: {0}")
public static Object[] parameters() {
return new Object[][] {new Object[] {false}, new Object[] {true}};
}
@Test
public void testGetMongoDBVersion() {
MongoDBSourceConfig config =
new MongoDBSourceConfigFactory()
.hosts(CONTAINER.getHostAndPort())
.splitSizeMB(1)
.pollAwaitTimeMillis(500)
.create(0);
assertEquals(MongoUtils.getMongoVersion(config), "6.0.6");
}
@Test
public void testReadSingleCollectionWithSingleParallelism() throws Exception {
testMongoDBParallelSource(
1,
MongoDBTestUtils.FailoverType.NONE,
MongoDBTestUtils.FailoverPhase.NEVER,
new String[] {"customers"});
}
@Test
public void testReadSingleCollectionWithMultipleParallelism() throws Exception {
testMongoDBParallelSource(
4,
MongoDBTestUtils.FailoverType.NONE,
MongoDBTestUtils.FailoverPhase.NEVER,
new String[] {"customers"});
}
@Test
public void testReadMultipleCollectionWithSingleParallelism() throws Exception {
testMongoDBParallelSource(
1,
MongoDBTestUtils.FailoverType.NONE,
MongoDBTestUtils.FailoverPhase.NEVER,
new String[] {"customers", "customers_1"});
}
@Test
public void testReadMultipleCollectionWithMultipleParallelism() throws Exception {
testMongoDBParallelSource(
4,
MongoDBTestUtils.FailoverType.NONE,
MongoDBTestUtils.FailoverPhase.NEVER,
new String[] {"customers", "customers_1"});
}
// Failover tests
@Test
public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
if (!parallelismSnapshot) {
return;
}
testMongoDBParallelSource(
MongoDBTestUtils.FailoverType.TM,
MongoDBTestUtils.FailoverPhase.SNAPSHOT,
new String[] {"customers", "customers_1"});
}
@Test
public void testTaskManagerFailoverInStreamPhase() throws Exception {
if (!parallelismSnapshot) {
return;
}
testMongoDBParallelSource(
MongoDBTestUtils.FailoverType.TM,
MongoDBTestUtils.FailoverPhase.STREAM,
new String[] {"customers", "customers_1"});
}
@Test
public void testJobManagerFailoverInSnapshotPhase() throws Exception {
if (!parallelismSnapshot) {
return;
}
testMongoDBParallelSource(
MongoDBTestUtils.FailoverType.JM,
MongoDBTestUtils.FailoverPhase.SNAPSHOT,
new String[] {"customers", "customers_1"});
}
@Test
public void testJobManagerFailoverInStreamPhase() throws Exception {
if (!parallelismSnapshot) {
return;
}
testMongoDBParallelSource(
MongoDBTestUtils.FailoverType.JM,
MongoDBTestUtils.FailoverPhase.STREAM,
new String[] {"customers", "customers_1"});
}
@Test
public void testTaskManagerFailoverSingleParallelism() throws Exception {
if (!parallelismSnapshot) {
return;
}
testMongoDBParallelSource(
1,
MongoDBTestUtils.FailoverType.TM,
MongoDBTestUtils.FailoverPhase.SNAPSHOT,
new String[] {"customers"});
}
@Test
public void testJobManagerFailoverSingleParallelism() throws Exception {
if (!parallelismSnapshot) {
return;
}
testMongoDBParallelSource(
1,
MongoDBTestUtils.FailoverType.JM,
MongoDBTestUtils.FailoverPhase.SNAPSHOT,
new String[] {"customers"});
}
private void testMongoDBParallelSource(
MongoDBTestUtils.FailoverType failoverType,
MongoDBTestUtils.FailoverPhase failoverPhase,
String[] captureCustomerCollections)
throws Exception {
testMongoDBParallelSource(
DEFAULT_PARALLELISM, failoverType, failoverPhase, captureCustomerCollections);
}
private void testMongoDBParallelSource(
int parallelism,
MongoDBTestUtils.FailoverType failoverType,
MongoDBTestUtils.FailoverPhase failoverPhase,
String[] captureCustomerCollections)
throws Exception {
String customerDatabase =
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
// A - enable system-level fulldoc pre & post image feature
CONTAINER.executeCommand(
"use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
// B - enable collection-level fulldoc pre & post image for change capture collection
for (String collectionName : captureCustomerCollections) {
CONTAINER.executeCommandInDatabase(
String.format(
"db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
collectionName, collectionName),
customerDatabase);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
String.format(
"CREATE TABLE customers ("
+ " _id STRING NOT NULL,"
+ " cid BIGINT NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (_id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mongodb-cdc',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'hosts' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database' = '%s',"
+ " 'collection' = '%s',"
+ " 'heartbeat.interval.ms' = '500',"
+ " 'scan.full-changelog' = 'true'"
+ ")",
parallelismSnapshot ? "true" : "false",
CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,
getCollectionNameRegex(customerDatabase, captureCustomerCollections));
CONTAINER.executeCommandFileInDatabase("customer", customerDatabase);
// first step: check the snapshot data
String[] snapshotForSingleTable =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]"
};
tEnv.executeSql(sourceDDL);
TableResult tableResult =
tEnv.executeSql("select cid, name, address, phone_number from customers");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
for (int i = 0; i < captureCustomerCollections.length; i++) {
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
}
// trigger failover after some snapshot splits read finished
if (failoverPhase == MongoDBTestUtils.FailoverPhase.SNAPSHOT && iterator.hasNext()) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
}
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
// second step: check the change stream data
for (String collectionName : captureCustomerCollections) {
makeFirstPartChangeStreamEvents(
mongodbClient.getDatabase(customerDatabase), collectionName);
}
if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM) {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200));
}
for (String collectionName : captureCustomerCollections) {
makeSecondPartChangeStreamEvents(
mongodbClient.getDatabase(customerDatabase), collectionName);
}
String[] changeEventsForSingleTable =
new String[] {
"-U[101, user_1, Shanghai, 123567891234]",
"+U[101, user_1, Hangzhou, 123567891234]",
"-D[102, user_2, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"-U[1010, user_11, Shanghai, 123567891234]",
"+U[1010, user_11, Hangzhou, 123567891234]",
"+I[2001, user_22, Shanghai, 123567891234]",
"+I[2002, user_23, Shanghai, 123567891234]",
"+I[2003, user_24, Shanghai, 123567891234]"
};
List<String> expectedChangeStreamData = new ArrayList<>();
for (int i = 0; i < captureCustomerCollections.length; i++) {
expectedChangeStreamData.addAll(Arrays.asList(changeEventsForSingleTable));
}
List<String> actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());
assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData);
tableResult.getJobClient().get().cancel().get();
}
private String getCollectionNameRegex(String database, String[] captureCustomerCollections) {
checkState(captureCustomerCollections.length > 0);
if (captureCustomerCollections.length == 1) {
return captureCustomerCollections[0];
} else {
// pattern that matches multiple collections
return Arrays.stream(captureCustomerCollections)
.map(coll -> "^(" + database + "." + coll + ")$")
.collect(Collectors.joining("|"));
}
}
private void sleepMs(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
private void makeFirstPartChangeStreamEvents(MongoDatabase mongoDatabase, String collection) {
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collection);
mongoCollection.updateOne(Filters.eq("cid", 101L), Updates.set("address", "Hangzhou"));
mongoCollection.deleteOne(Filters.eq("cid", 102L));
mongoCollection.insertOne(customerDocOf(102L, "user_2", "Shanghai", "123567891234"));
mongoCollection.updateOne(Filters.eq("cid", 103L), Updates.set("address", "Hangzhou"));
}
private void makeSecondPartChangeStreamEvents(MongoDatabase mongoDatabase, String collection) {
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collection);
mongoCollection.updateOne(Filters.eq("cid", 1010L), Updates.set("address", "Hangzhou"));
mongoCollection.insertMany(
Arrays.asList(
customerDocOf(2001L, "user_22", "Shanghai", "123567891234"),
customerDocOf(2002L, "user_23", "Shanghai", "123567891234"),
customerDocOf(2003L, "user_24", "Shanghai", "123567891234")));
}
private Document customerDocOf(Long cid, String name, String address, String phoneNumber) {
Document document = new Document();
document.put("cid", cid);
document.put("name", name);
document.put("address", address);
document.put("phone_number", phoneNumber);
return document;
}
}

@ -34,11 +34,11 @@ public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase {
@Test
@Ignore("Test ignored because it won't stop and is used for manual test")
public void testMongoDBExampleSource() throws Exception {
String database = ROUTER.executeCommandFileInSeparateDatabase("inventory");
String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
MongoDBSource<String> mongoSource =
MongoDBSource.<String>builder()
.hosts(ROUTER.getHostAndPort())
.hosts(CONTAINER.getHostAndPort())
.databaseList(database)
.collectionList(database + ".products")
.username(FLINK_USER)

@ -45,7 +45,6 @@ import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.fetchRows;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.triggerFailover;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
/** IT tests for {@link MongoDBSource}. */
@ -136,7 +135,7 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
String[] captureCustomerCollections)
throws Exception {
String customerDatabase = ROUTER.executeCommandFileInSeparateDatabase("customer");
String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@ -145,7 +144,7 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
format(
String.format(
"CREATE TABLE customers ("
+ " _id STRING NOT NULL,"
+ " cid BIGINT NOT NULL,"
@ -163,7 +162,7 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
+ " 'collection' = '%s',"
+ " 'heartbeat.interval.ms' = '500'"
+ ")",
ROUTER.getHostAndPort(),
CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
customerDatabase,

@ -19,32 +19,27 @@ package com.ververica.cdc.connectors.mongodb.source;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.util.stream.Stream;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGO_SUPER_PASSWORD;
import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGO_SUPER_USER;
/** MongoDBSourceTestBase for MongoDB >= 5.0.3. */
public class MongoDBSourceTestBase {
/** Basic class for testing {@link MongoDBSource}. */
public abstract class MongoDBSourceTestBase extends TestLogger {
protected static MongoClient mongodbClient;
protected static final Logger LOG = LoggerFactory.getLogger(MongoDBSourceTestBase.class);
protected static final int DEFAULT_PARALLELISM = 4;
@Rule
@ -57,61 +52,26 @@ public abstract class MongoDBSourceTestBase extends TestLogger {
.withHaLeadershipControl()
.build());
@ClassRule public static final Network NETWORK = Network.newNetwork();
@ClassRule
public static final MongoDBContainer CONFIG =
new MongoDBContainer(NETWORK, MongoDBContainer.ShardingClusterRole.CONFIG)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final MongoDBContainer SHARD =
new MongoDBContainer(NETWORK, MongoDBContainer.ShardingClusterRole.SHARD)
.dependsOn(CONFIG)
.withLogConsumer(new Slf4jLogConsumer(LOG));
@ClassRule
public static final MongoDBContainer ROUTER =
new MongoDBContainer(NETWORK, MongoDBContainer.ShardingClusterRole.ROUTER)
.dependsOn(SHARD)
.withLogConsumer(new Slf4jLogConsumer(LOG));
protected static MongoClient mongodbClient;
@BeforeClass
public static void startContainers() {
LOG.info("Starting containers...");
Startables.deepStart(Stream.of(CONFIG)).join();
Startables.deepStart(Stream.of(SHARD)).join();
Startables.deepStart(Stream.of(ROUTER)).join();
initialClient();
LOG.info("Containers are started.");
}
Startables.deepStart(Stream.of(CONTAINER)).join();
@AfterClass
public static void closeContainers() {
if (mongodbClient != null) {
mongodbClient.close();
}
if (ROUTER != null) {
ROUTER.close();
}
if (SHARD != null) {
SHARD.close();
}
if (CONFIG != null) {
CONFIG.close();
}
}
private static void initialClient() {
MongoClientSettings settings =
MongoClientSettings.builder()
.applyConnectionString(
new ConnectionString(
ROUTER.getConnectionString(
MONGO_SUPER_USER, MONGO_SUPER_PASSWORD)))
new ConnectionString(CONTAINER.getConnectionString()))
.build();
mongodbClient = MongoClients.create(settings);
LOG.info("Containers are started.");
}
private static final Logger LOG = LoggerFactory.getLogger(MongoDBSourceTestBase.class);
@ClassRule
public static final MongoDBContainer CONTAINER =
new MongoDBContainer("mongo:6.0.6")
.withSharding()
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

@ -69,11 +69,11 @@ public class MongoDBSnapshotSplitReaderTest extends MongoDBSourceTestBase {
@Before
public void before() {
database = ROUTER.executeCommandFileInSeparateDatabase("chunk_test");
database = CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
MongoDBSourceConfigFactory configFactory =
new MongoDBSourceConfigFactory()
.hosts(ROUTER.getHostAndPort())
.hosts(CONTAINER.getHostAndPort())
.databaseList(database)
.collectionList(database + ".shopping_cart")
.username(FLINK_USER)

@ -83,11 +83,11 @@ public class MongoDBStreamSplitReaderTest extends MongoDBSourceTestBase {
@Before
public void before() {
database = ROUTER.executeCommandFileInSeparateDatabase("chunk_test");
database = CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
MongoDBSourceConfigFactory configFactory =
new MongoDBSourceConfigFactory()
.hosts(ROUTER.getHostAndPort())
.hosts(CONTAINER.getHostAndPort())
.databaseList(database)
.collectionList(database + ".shopping_cart")
.username(FLINK_USER)

@ -19,7 +19,6 @@ package com.ververica.cdc.connectors.mongodb.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
@ -53,7 +52,6 @@ import static com.ververica.cdc.connectors.mongodb.utils.MongoDBTestUtils.waitFo
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertThrows;
/** Integration tests for MongoDB change stream event SQL source. */
@RunWith(Parameterized.class)
@ -91,7 +89,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
@Test
public void testConsumingAllEvents() throws ExecutionException, InterruptedException {
String database = ROUTER.executeCommandFileInSeparateDatabase("inventory");
String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
String sourceDDL =
String.format(
@ -112,7 +110,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'heartbeat.interval.ms' = '1000'"
+ ")",
ROUTER.getHostAndPort(),
CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
@ -224,7 +222,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
@Test
public void testStartupFromTimestamp() throws Exception {
String database = ROUTER.executeCommandFileInSeparateDatabase("inventory");
String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
// Unfortunately we have to sleep here to differ initial and later-generating changes in
// oplog by timestamp
@ -253,7 +251,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
+ "',"
+ " 'heartbeat.interval.ms' = '1000'"
+ ")",
ROUTER.getHostAndPort(),
CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
@ -273,15 +271,6 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);
if (!parallelismSnapshot) {
assertThrows(
ValidationException.class,
() ->
tEnv.executeSql(
"INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name"));
return;
}
// async submit job
TableResult result =
tEnv.executeSql(
@ -312,7 +301,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
@Test
public void testAllTypes() throws Throwable {
String database = ROUTER.executeCommandFileInSeparateDatabase("column_type_test");
String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
String sourceDDL =
String.format(
@ -355,7 +344,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
+ " 'database' = '%s',"
+ " 'collection' = '%s'"
+ ")",
ROUTER.getHostAndPort(),
CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
@ -475,7 +464,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
@Test
public void testMetadataColumns() throws Exception {
String database = ROUTER.executeCommandFileInSeparateDatabase("inventory");
String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
String sourceDDL =
String.format(
@ -497,7 +486,7 @@ public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
+ " 'collection' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s'"
+ ")",
ROUTER.getHostAndPort(),
CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,

@ -76,9 +76,9 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
public void testMatchMultipleDatabasesAndCollections() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
String db0 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
String db1 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection = ^(db0|db1)\.coll_a\d?$
String collectionRegex = String.format("^(%s|%s)\\.coll_a\\d?$", db0, db1);
@ -119,11 +119,11 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
public void testMatchMultipleDatabases() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
String db0 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
String db1 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// db2: [coll_a1, coll_a2, coll_b1, coll_b2]
String db2 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db2 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match database: ^(db0|db1)$
String databaseRegex = String.format("%s|%s", db0, db1);
@ -173,9 +173,9 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
public void testMatchSingleQualifiedCollectionPattern() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
String db0 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
String db1 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection ^(db0|db1)\.coll_a\d?$
String collectionRegex = String.format("^%s\\.coll_b\\d?$", db0);
@ -212,9 +212,9 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
public void testMatchSingleDatabaseWithCollectionPattern() throws Exception {
// 1. Given collections:
// db0: [coll_a1, coll_a2, coll_b1, coll_b2]
String db0 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// db1: [coll_a1, coll_a2, coll_b1, coll_b2]
String db1 = ROUTER.executeCommandFileInSeparateDatabase("ns_regex");
String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
// 2. Test match: collection .*coll_b\d?
String collectionRegex = ".*coll_b\\d?";
@ -250,7 +250,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
public void testMatchDatabaseAndCollectionContainsDash() throws Exception {
// 1. Given collections:
// db0: [coll-a1, coll-a2, coll-b1, coll-b2]
String db0 = ROUTER.executeCommandFileInSeparateDatabase("ns-regex");
String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns-regex");
TableResult result = submitTestCase(db0, "coll-a1");
@ -275,7 +275,7 @@ public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
+ " coll_name STRING METADATA FROM 'collection_name' VIRTUAL,"
+ " PRIMARY KEY (_id) NOT ENFORCED"
+ ") WITH ("
+ ignoreIfNull("hosts", ROUTER.getHostAndPort())
+ ignoreIfNull("hosts", CONTAINER.getHostAndPort())
+ ignoreIfNull("username", FLINK_USER)
+ ignoreIfNull("password", FLINK_USER_PASSWORD)
+ ignoreIfNull("database", database)

@ -49,6 +49,7 @@ import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.FULL_DOCUMENT_PRE_POST_IMAGE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
@ -106,6 +107,9 @@ public class MongoDBTableFactoryTest {
private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT =
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue();
private static final boolean FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT =
FULL_DOCUMENT_PRE_POST_IMAGE.defaultValue();
@Test
public void testCommonProperties() {
Map<String, String> properties = getAllOptions();
@ -132,7 +136,8 @@ public class MongoDBTableFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT,
CHUNK_META_GROUP_SIZE_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}
@ -143,7 +148,7 @@ public class MongoDBTableFactoryTest {
options.put("connection.options", "replicaSet=test&connectTimeoutMS=300000");
options.put("scan.startup.mode", "timestamp");
options.put("scan.startup.timestamp-millis", "1667232000000");
options.put("copy.existing.queue.size", "100");
options.put("initial.snapshotting.queue.size", "100");
options.put("batch.size", "101");
options.put("poll.max.batch.size", "102");
options.put("poll.await.time.ms", "103");
@ -152,6 +157,7 @@ public class MongoDBTableFactoryTest {
options.put("chunk-meta.group.size", "1001");
options.put("scan.incremental.snapshot.chunk.size.mb", "10");
options.put("scan.incremental.close-idle-reader.enabled", "true");
options.put("scan.full-changelog", "true");
DynamicTableSource actualSource = createTableSource(SCHEMA, options);
MongoDBTableSource expectedSource =
@ -174,6 +180,7 @@ public class MongoDBTableFactoryTest {
true,
1001,
10,
true,
true);
assertEquals(expectedSource, actualSource);
}
@ -210,7 +217,8 @@ public class MongoDBTableFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT,
CHUNK_META_GROUP_SIZE_DEFAULT,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
FULL_DOCUMENT_PRE_POST_IMAGE_ENABLED_DEFAULT);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

@ -86,7 +86,7 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase {
public void testTemporalTypesWithTimeZone() throws Exception {
tEnv.getConfig().setLocalTimeZone(ZoneId.of(localTimeZone));
String database = ROUTER.executeCommandFileInSeparateDatabase("column_type_test");
String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
String sourceDDL =
String.format(
@ -107,7 +107,7 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase {
+ " 'database' = '%s',"
+ " 'collection' = '%s'"
+ ")",
ROUTER.getHostAndPort(),
CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,
@ -159,7 +159,7 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase {
public void testDateAndTimestampToStringWithTimeZone() throws Exception {
tEnv.getConfig().setLocalTimeZone(ZoneId.of(localTimeZone));
String database = ROUTER.executeCommandFileInSeparateDatabase("column_type_test");
String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
String sourceDDL =
String.format(
@ -176,7 +176,7 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase {
+ " 'database' = '%s',"
+ " 'collection' = '%s'"
+ ")",
ROUTER.getHostAndPort(),
CONTAINER.getHostAndPort(),
FLINK_USER,
FLINK_USER_PASSWORD,
database,

Loading…
Cancel
Save