diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 8d8047fa7..eebaacafc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -90,7 +90,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase { @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}") public static Object[] parameters() { List parameterTuples = new ArrayList<>(); - for (String mongoVersion : MONGO_VERSIONS) { + for (String mongoVersion : getMongoVersions()) { parameterTuples.add(new Object[] {mongoVersion, true}); parameterTuples.add(new Object[] {mongoVersion, false}); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java index a285fe4a3..350f96fb5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceExampleTest.java @@ -41,7 +41,7 @@ public class MongoDBParallelSourceExampleTest extends MongoDBSourceTestBase { @Parameterized.Parameters(name = "mongoVersion: {0} parallelismSnapshot: {1}") public static Object[] parameters() { List parameterTuples = new ArrayList<>(); - for (String mongoVersion : MONGO_VERSIONS) { + for (String mongoVersion : getMongoVersions()) { parameterTuples.add(new Object[] {mongoVersion, true}); parameterTuples.add(new Object[] {mongoVersion, false}); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java index 0595f6b5a..461bf76af 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java @@ -80,7 +80,7 @@ public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase { @Parameterized.Parameters(name = "mongoVersion: {0}") public static Object[] parameters() { - return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray(); + return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray(); } @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java index 49a362969..3da217b8b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBSourceTestBase.java @@ -45,7 +45,14 @@ public class MongoDBSourceTestBase { .withLogConsumer(new Slf4jLogConsumer(LOG)); } - public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"}; + public static String[] getMongoVersions() { + String specifiedMongoVersion = System.getProperty("specifiedMongoVersion"); + if (specifiedMongoVersion != null) { + return new String[] {specifiedMongoVersion}; + } else { + return new String[] {"6.0.16", "7.0.12"}; + } + } protected static final int DEFAULT_PARALLELISM = 4; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java index 69900b4e9..1a5943492 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java @@ -79,7 +79,7 @@ public class NewlyAddedTableITCase extends MongoDBSourceTestBase { @Parameterized.Parameters(name = "mongoVersion: {0}") public static Object[] parameters() { - return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray(); + return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray(); } private final ScheduledExecutorService mockChangelogExecutor = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java index b683f303d..8634f96cf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBSnapshotSplitReaderTest.java @@ -81,7 +81,7 @@ public class MongoDBSnapshotSplitReaderTest extends MongoDBSourceTestBase { @Parameterized.Parameters(name = "mongoVersion: {0}") public static Object[] parameters() { - return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray(); + return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray(); } @Before diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java index bf781904b..f71dc6307 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/reader/MongoDBStreamSplitReaderTest.java @@ -95,7 +95,7 @@ public class MongoDBStreamSplitReaderTest extends MongoDBSourceTestBase { @Parameterized.Parameters(name = "mongoVersion: {0}") public static Object[] parameters() { - return Stream.of(MONGO_VERSIONS).map(e -> new Object[] {e}).toArray(); + return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray(); } @Before diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java index f9edc73fa..96ab145a9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTimeZoneITCase.java @@ -66,7 +66,7 @@ public class MongoDBTimeZoneITCase extends MongoDBSourceTestBase { name = "mongoVersion: {0}, localTimeZone: {1}, parallelismSnapshot: {2}") public static Object[] parameters() { List parameterTuples = new ArrayList<>(); - for (String mongoVersion : MONGO_VERSIONS) { + for (String mongoVersion : getMongoVersions()) { for (String timezone : new String[] {"Asia/Shanghai", "Europe/Berlin", "UTC"}) { for (boolean parallelismSnapshot : new boolean[] {true, false}) { parameterTuples.add(new Object[] {mongoVersion, timezone, parallelismSnapshot}); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java index 048e23bd7..093359642 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -53,6 +53,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -90,7 +91,12 @@ public abstract class PipelineTestEnvironment extends TestLogger { @Parameterized.Parameters(name = "flinkVersion: {0}") public static List getFlinkVersion() { - return Arrays.asList("1.17.2", "1.18.1", "1.19.1", "1.20.0"); + String flinkVersion = System.getProperty("specifiedFlinkVersion"); + if (flinkVersion != null) { + return Collections.singletonList(flinkVersion); + } else { + return Arrays.asList("1.17.2", "1.18.1", "1.19.1", "1.20.0"); + } } @Before diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java index e9cf648f9..352e3e28a 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/MongoE2eITCase.java @@ -44,6 +44,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.stream.Stream; @@ -66,8 +67,6 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { private MongoClient mongoClient; - public static final String[] MONGO_VERSIONS = {"6.0.16", "7.0.12"}; - @Parameterized.Parameter(1) public String mongoVersion; @@ -77,6 +76,15 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { @Parameterized.Parameter(3) public boolean scanFullChangelog; + public static List getMongoVersions() { + String specifiedMongoVersion = System.getProperty("specifiedMongoVersion"); + if (specifiedMongoVersion != null) { + return Collections.singletonList(specifiedMongoVersion); + } else { + return Arrays.asList("6.0.16", "7.0.12"); + } + } + @Parameterized.Parameters( name = "flinkVersion: {0}, mongoVersion: {1}, parallelismSnapshot: {2}, scanFullChangelog: {3}") @@ -84,7 +92,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { final List flinkVersions = getFlinkVersion(); List params = new ArrayList<>(); for (String flinkVersion : flinkVersions) { - for (String mongoVersion : MONGO_VERSIONS) { + for (String mongoVersion : getMongoVersions()) { params.add(new Object[] {flinkVersion, mongoVersion, true, true}); params.add(new Object[] {flinkVersion, mongoVersion, true, false}); params.add(new Object[] {flinkVersion, mongoVersion, false, true}); @@ -99,7 +107,7 @@ public class MongoE2eITCase extends FlinkContainerTestEnvironment { super.before(); container = - new MongoDBContainer("mongo:6.0.9") + new MongoDBContainer("mongo:" + mongoVersion) .withSharding() .withNetwork(NETWORK) .withNetworkAliases(INTER_CONTAINER_MONGO_ALIAS) diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java index 49ef039e0..77aabc2df 100644 --- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/utils/FlinkContainerTestEnvironment.java @@ -61,6 +61,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -120,7 +121,12 @@ public abstract class FlinkContainerTestEnvironment extends TestLogger { @Parameterized.Parameters(name = "flinkVersion: {0}") public static List getFlinkVersion() { - return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.1", "1.20.0"); + String flinkVersion = System.getProperty("specifiedFlinkVersion"); + if (flinkVersion != null) { + return Collections.singletonList(flinkVersion); + } else { + return Arrays.asList("1.16.3", "1.17.2", "1.18.1", "1.19.1", "1.20.0"); + } } @Before diff --git a/tools/mig-test/misc/patch_flink_conf.rb b/tools/mig-test/misc/patch_flink_conf.rb index fe6030188..6241a52c3 100644 --- a/tools/mig-test/misc/patch_flink_conf.rb +++ b/tools/mig-test/misc/patch_flink_conf.rb @@ -26,7 +26,11 @@ parallelism.default: 4 execution.checkpointing.interval: 300 EXTRACONF -File.write("#{FLINK_HOME}/conf/flink-conf.yaml", EXTRA_CONF, mode: 'a+') +if File.file?("#{FLINK_HOME}/conf/flink-conf.yaml") + File.write("#{FLINK_HOME}/conf/flink-conf.yaml", EXTRA_CONF, mode: 'a+') +else + File.write("#{FLINK_HOME}/conf/config.yaml", EXTRA_CONF, mode: 'a+') +end # MySQL connector is not provided `wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.27/mysql-connector-java-8.0.27.jar -O #{FLINK_HOME}/lib/mysql-connector-java-8.0.27.jar` \ No newline at end of file diff --git a/tools/mig-test/run_migration_test.rb b/tools/mig-test/run_migration_test.rb index d7c2fff28..1a454efe8 100644 --- a/tools/mig-test/run_migration_test.rb +++ b/tools/mig-test/run_migration_test.rb @@ -114,7 +114,13 @@ def test_migration(from_version, to_version) end end -version_list = %w[3.0.0 3.0.1 3.1.0 3.1.1 3.3-SNAPSHOT] +version_list = case ARGV[0] + when '1.18.1' then %w[3.0.0 3.0.1 3.1.1 3.3-SNAPSHOT] + when '1.19.1' then %w[3.1.1 3.3-SNAPSHOT] + when '1.20.0' then %w[3.3-SNAPSHOT] + else [] + end + no_savepoint_versions = %w[3.0.0 3.0.1] version_result = Hash.new('❓') @failures = [] @@ -157,6 +163,6 @@ rescue LoadError end puts "✅ - Compatible, ❌ - Not compatible, ❓ - Target version doesn't support `--from-savepoint`" -if @failures.filter { |old_version, new_version| new_version == version_list.last && old_version != '3.1.0' }.any? +if @failures.any? abort 'Some migration to snapshot version tests failed.' end