diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceTest.java index d14607b1b..bdf91b1e1 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlSourceTest.java @@ -44,6 +44,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.Statement; @@ -79,17 +80,18 @@ public class MySqlSourceTest extends MySqlTestBase { private final UniqueDatabase database = new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw"); - private final boolean useLegacyImplementation; + @Override + public String getTempFilePath(String fileName) throws IOException { + return super.getTempFilePath(fileName); + } + + @Parameterized.Parameter public boolean useLegacyImplementation; @Parameterized.Parameters(name = "UseLegacyImplementation: {0}") public static Collection parameters() { return Arrays.asList(false, true); } - public MySqlSourceTest(boolean useLegacyImplementation) { - this.useLegacyImplementation = useLegacyImplementation; - } - @Before public void before() { database.createAndInitialize(); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java index d59dbe642..e3c83b6b0 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java @@ -43,7 +43,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.UUID; import java.util.stream.Stream; @@ -62,15 +64,12 @@ public class MySqlValidatorTest { private static TemporaryFolder tempFolder; private static File resourceFolder; - final boolean runIncrementalSnapshot; - @Parameterized.Parameters(name = "runIncrementalSnapshot = {0}") - public static Object[] parameters() { - return new Object[] {true, false}; - } + @Parameterized.Parameter public boolean runIncrementalSnapshot; - public MySqlValidatorTest(boolean runIncrementalSnapshot) { - this.runIncrementalSnapshot = runIncrementalSnapshot; + @Parameterized.Parameters(name = "runIncrementalSnapshot = {0}") + public static List parameters() { + return Arrays.asList(true, false); } @BeforeClass diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 16686c411..0655cf175 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -94,9 +94,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())); - final RowType pkType = - (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); - List splits = getMySqlSplits(configuration, pkType); + List splits = getMySqlSplits(configuration); String[] expected = new String[] { "+I[101, user_1, Shanghai, 123567891234]", @@ -115,6 +113,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase { "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]" }; + final RowType pkType = + (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); List actual = readBinlogSplits( splits, @@ -139,9 +139,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())); - final RowType pkType = - (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); - List splits = getMySqlSplits(configuration, pkType); + List splits = getMySqlSplits(configuration); String[] expected = new String[] { @@ -178,6 +176,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase { "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]" }; + final RowType pkType = + (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); List actual = readBinlogSplits( splits, @@ -202,13 +202,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("level", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("note", DataTypes.STRING())); - final RowType pkType = - (RowType) - DataTypes.ROW( - DataTypes.FIELD("card_no", DataTypes.BIGINT()), - DataTypes.FIELD("level", DataTypes.STRING())) - .getLogicalType(); - List splits = getMySqlSplits(configuration, pkType); + List splits = getMySqlSplits(configuration); String[] expected = new String[] { @@ -218,6 +212,12 @@ public class BinlogSplitReaderTest extends MySqlTestBase { "+I[20002, LEVEL_3, user_3, user with level 3]" }; + final RowType pkType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("card_no", DataTypes.BIGINT()), + DataTypes.FIELD("level", DataTypes.STRING())) + .getLogicalType(); List actual = readBinlogSplits( splits, @@ -249,7 +249,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("card_no", DataTypes.BIGINT()), DataTypes.FIELD("level", DataTypes.STRING())) .getLogicalType(); - List splits = getMySqlSplits(configuration, pkType); + List splits = getMySqlSplits(configuration); String[] expected = new String[] { "+I[20000, LEVEL_1, user_1, user with level 1]", @@ -323,13 +323,12 @@ public class BinlogSplitReaderTest extends MySqlTestBase { "+I[2003, user_24, Shanghai, 123567891234]" }; List actual = - readBinlogSplitsFromLatestOffset(dataType, pkType, configuration, expected.length); + readBinlogSplitsFromLatestOffset(dataType, configuration, expected.length); assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); } private List readBinlogSplitsFromLatestOffset( - DataType dataType, RowType pkType, Configuration configuration, int expectedSize) - throws Exception { + DataType dataType, Configuration configuration, int expectedSize) throws Exception { final StatefulTaskContext statefulTaskContext = new StatefulTaskContext(configuration, binaryLogClient, mySqlConnection); @@ -586,7 +585,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase { .collect(Collectors.toList()); } - private List getMySqlSplits(Configuration configuration, RowType pkType) { + private List getMySqlSplits(Configuration configuration) { final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration, currentParallelism); assigner.open(); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 14cdacc34..a747119c9 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -84,9 +84,7 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())); - final RowType pkType = - (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); - List mySqlSplits = getMySqlSplits(configuration, pkType); + List mySqlSplits = getMySqlSplits(configuration); String[] expected = new String[] { @@ -113,9 +111,7 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())); - final RowType pkType = - (RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType(); - List mySqlSplits = getMySqlSplits(configuration, pkType); + List mySqlSplits = getMySqlSplits(configuration); String[] expected = new String[] { @@ -155,13 +151,7 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("level", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("note", DataTypes.STRING())); - final RowType pkType = - (RowType) - DataTypes.ROW( - DataTypes.FIELD("card_no", DataTypes.BIGINT()), - DataTypes.FIELD("level", DataTypes.STRING())) - .getLogicalType(); - List mySqlSplits = getMySqlSplits(configuration, pkType); + List mySqlSplits = getMySqlSplits(configuration); String[] expected = new String[] {"+I[20001, LEVEL_1, user_1, user with level 1]"}; List actual = readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType); @@ -178,13 +168,7 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { DataTypes.FIELD("level", DataTypes.STRING()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("note", DataTypes.STRING())); - RowType pkType = - (RowType) - DataTypes.ROW( - DataTypes.FIELD("card_no", DataTypes.BIGINT()), - DataTypes.FIELD("level", DataTypes.STRING())) - .getLogicalType(); - List mySqlSplits = getMySqlSplits(configuration, pkType); + List mySqlSplits = getMySqlSplits(configuration); String[] expected = new String[] { @@ -277,7 +261,7 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { .collect(Collectors.toList()); } - private List getMySqlSplits(Configuration configuration, RowType pkType) { + private List getMySqlSplits(Configuration configuration) { final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration, currentParallelism); assigner.open(); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java index fdf7b524a..6ed60703a 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java @@ -47,6 +47,7 @@ import java.nio.file.StandardOpenOption; import java.sql.Connection; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -72,18 +73,11 @@ public class MysqlTimezoneITCase { env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()); - private final boolean incrementalSnapshot; + @Parameterized.Parameter public Boolean incrementalSnapshot; @Parameterized.Parameters(name = "incrementalSnapshot: {0}") - public static Object[] parameters() { - return new Object[][] {new Object[] {true} - // , - // new Object[] {false} - }; - } - - public MysqlTimezoneITCase(boolean incrementalSnapshot) { - this.incrementalSnapshot = incrementalSnapshot; + public static List parameters() { + return Arrays.asList(true, false); } @Before