From 669b0916f78c05ea258572718240926c347f3ceb Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Fri, 17 Sep 2021 17:09:28 +0800 Subject: [PATCH] [tests] Introduce assertEqualsInOrder/assertEqualsInAnyOrder methods for test this closes #437 --- .../reader/BinlogSplitReaderTest.java | 15 +++++------ .../reader/SnapshotSplitReaderTest.java | 10 +++---- .../source/MySqlParallelSourceITCase.java | 11 +++----- .../source/MySqlParallelSourceTestBase.java | 19 +++++++++++++ .../source/reader/MySqlSourceReaderTest.java | 8 ++---- .../mysql/table/MySqlConnectorITCase.java | 27 ++++++++++--------- .../mysql/table/MysqlTimezoneITCase.java | 12 +++++---- .../mysql/testutils/RecordsFormatter.java | 1 - 8 files changed, 56 insertions(+), 47 deletions(-) diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 145924453..fd19379cf 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -61,9 +61,6 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; /** Tests for {@link BinlogSplitReader}. */ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase { @@ -116,7 +113,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase { 1, expected.length, splits.get(splits.size() - 1).getTableId()); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); } @Test @@ -179,7 +176,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase { splits.size(), expected.length, splits.get(splits.size() - 1).getTableId()); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); } @Test @@ -219,7 +216,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase { splits.size(), expected.length, splits.get(splits.size() - 1).getTableId()); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); } @Test @@ -281,7 +278,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase { customerDatabase.getDatabaseName() + "." + "customer_card_single_line")); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); } @Test @@ -305,15 +302,15 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase { "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", - "+U[1010, Hangzhou, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", + "+U[1010, Hangzhou, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]" }; List actual = readBinlogSplitsFromLatestOffset(dataType, configuration, expected.length); - assertThat(actual, containsInAnyOrder(expected)); + assertEqualsInOrder(actual, Arrays.asList(expected)); } private List readBinlogSplitsFromLatestOffset( diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 108594ae8..cf1a51102 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -46,8 +46,6 @@ import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; -import static org.junit.Assert.assertEquals; - /** Tests for {@link SnapshotSplitReader}. */ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase { @@ -89,7 +87,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase { "+I[123, user_9, Shanghai, 123567891234]" }; List actual = readTableSnapshotSplits(mySqlSplits, configuration, 1, dataType); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); } @Test @@ -129,7 +127,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase { }; List actual = readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); } @Test @@ -145,7 +143,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase { String[] expected = new String[] {"+I[20001, LEVEL_1, user_1, user with level 1]"}; List actual = readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); } @Test @@ -185,7 +183,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase { }; List actual = readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType); - assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); } private List readTableSnapshotSplits( diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceITCase.java index c935898b1..46a926185 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceITCase.java @@ -45,9 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; - /** IT tests for {@link MySqlParallelSource}. */ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase { @@ -207,10 +204,9 @@ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase { triggerFailover( failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100)); } - String[] expectedSnapshot = expectedSnapshotData.toArray(new String[0]); - assertThat( - fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); + assertEqualsInAnyOrder( + fetchRows(iterator, expectedSnapshotData.size()), expectedSnapshotData); // second step: check the binlog data for (String tableId : captureCustomerTables) { @@ -244,8 +240,7 @@ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase { for (int i = 0; i < captureCustomerTables.length; i++) { expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable)); } - String[] expectedBinlog = expectedBinlogData.toArray(new String[0]); - assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); + assertEqualsInAnyOrder(fetchRows(iterator, expectedBinlogData.size()), expectedBinlogData); tableResult.getJobClient().get().cancel().get(); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java index 9a7073586..5e4c2835c 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSourceTestBase.java @@ -31,8 +31,14 @@ import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** Basic class for testing {@link MySqlParallelSource}. */ public abstract class MySqlParallelSourceTestBase extends TestLogger { @@ -65,4 +71,17 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger { Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); LOG.info("Containers are started."); } + + public static void assertEqualsInAnyOrder(List actual, List expected) { + assertTrue(actual != null && expected != null); + assertEqualsInOrder( + actual.stream().sorted().collect(Collectors.toList()), + expected.stream().sorted().collect(Collectors.toList())); + } + + public static void assertEqualsInOrder(List actual, List expected) { + assertTrue(actual != null && expected != null); + assertEquals(actual.size(), expected.size()); + assertArrayEquals(actual.toArray(new String[0]), expected.toArray(new String[0])); + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 94ca6b4e0..17643695c 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -91,9 +91,7 @@ public class MySqlSourceReaderTest extends MySqlParallelSourceTestBase { }; // the 2 records are produced by 1 operations List actualRecords = consumeRecords(reader, dataType, 1); - assertEquals( - Arrays.stream(expectedRecords).sorted().collect(Collectors.toList()), - actualRecords); + assertEqualsInOrder(actualRecords, Arrays.asList(expectedRecords)); List splitsState = reader.snapshotState(1L); // check the binlog split state assertEquals(1, splitsState.size()); @@ -114,9 +112,7 @@ public class MySqlSourceReaderTest extends MySqlParallelSourceTestBase { }; // the 4 records are produced by 3 operations List restRecords = consumeRecords(restartReader, dataType, 3); - assertEquals( - Arrays.stream(expectedRestRecords).sorted().collect(Collectors.toList()), - restRecords); + assertEqualsInOrder(restRecords, Arrays.asList(expectedRestRecords)); restartReader.close(); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 91778e84b..d6ae29e57 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -38,14 +38,13 @@ import org.junit.runners.Parameterized; import java.sql.Connection; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Random; import static com.ververica.cdc.connectors.mysql.MySqlSourceTest.currentMySqlLatestOffset; import static org.apache.flink.api.common.JobStatus.RUNNING; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; /** Integration tests for MySQL binlog SQL source. */ @RunWith(Parameterized.class) @@ -208,7 +207,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { }; List actual = TestValuesTableFactory.getResults("sink"); - assertThat(actual, containsInAnyOrder(expected)); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); result.getJobClient().get().cancel().get(); } @@ -270,8 +269,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { "+I[108, jacket, water resistent black wind breaker, 0.100]", "+I[109, spare tire, 24 inch spare tire, 22.200]" }; - assertThat( - fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); + assertEqualsInAnyOrder( + fetchRows(iterator, expectedSnapshot.length), Arrays.asList(expectedSnapshot)); try (Connection connection = inventoryDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { @@ -296,7 +295,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { "+U[111, scooter, Big 2-wheel scooter , 5.170]", "-D[111, scooter, Big 2-wheel scooter , 5.170]" }; - assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); + assertEqualsInOrder( + fetchRows(iterator, expectedBinlog.length), Arrays.asList(expectedBinlog)); result.getJobClient().get().cancel().get(); } @@ -397,7 +397,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { "+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]" }; - assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected)); + assertEqualsInAnyOrder( + fetchRows(result.collect(), expected.length), Arrays.asList(expected)); result.getJobClient().get().cancel().get(); } @@ -469,7 +470,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { "+U[111, scooter, Big 2-wheel scooter , 5.170]", "-D[111, scooter, Big 2-wheel scooter , 5.170]" }; - assertThat(fetchRows(iterator, expected.length), containsInAnyOrder(expected)); + assertEqualsInAnyOrder( + fetchRows(result.collect(), expected.length), Arrays.asList(expected)); result.getJobClient().get().cancel().get(); } @@ -537,7 +539,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { "+U[416927583791428523, China, Hangzhou, West Town address 2]", "+I[418257940021724075, Germany, Berlin, West Town address 3]", }; - assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected)); + assertEqualsInAnyOrder( + fetchRows(result.collect(), expected.length), Arrays.asList(expected)); result.getJobClient().get().cancel().get(); } @@ -626,7 +629,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; List actual = TestValuesTableFactory.getResults("sink"); - assertThat(actual, containsInAnyOrder(expected)); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); result.getJobClient().get().cancel().get(); } @@ -711,7 +714,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { }; List actual = TestValuesTableFactory.getResults("sink"); - assertThat(actual, containsInAnyOrder(expected)); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); result.getJobClient().get().cancel().get(); } @@ -786,7 +789,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase { new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; List actual = TestValuesTableFactory.getResults("sink"); - assertThat(actual, containsInAnyOrder(expected)); + assertEqualsInAnyOrder(actual, Arrays.asList(expected)); result.getJobClient().get().cancel().get(); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java index d042bef56..ac8d72f71 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java @@ -56,8 +56,8 @@ import java.util.Random; import java.util.UUID; import java.util.stream.Stream; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; +import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase.assertEqualsInAnyOrder; +import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase.assertEqualsInOrder; /** Integration tests to check mysql-cdc works well under different MySQL server timezone. */ @RunWith(Parameterized.class) @@ -190,8 +190,8 @@ public class MysqlTimezoneITCase { new String[] { "+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]", }; - assertThat( - fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); + assertEqualsInAnyOrder( + fetchRows(iterator, expectedSnapshot.length), Arrays.asList(expectedSnapshot)); try (Connection connection = fullTypesDatabase.getJdbcConnection(); Statement statement = connection.createStatement()) { @@ -206,7 +206,9 @@ public class MysqlTimezoneITCase { "+U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22]" }; - assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); + assertEqualsInOrder( + fetchRows(iterator, expectedBinlog.length), Arrays.asList(expectedBinlog)); + result.getJobClient().get().cancel().get(); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java index 58094ece5..c5c106062 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/RecordsFormatter.java @@ -87,7 +87,6 @@ public class RecordsFormatter { return collector.list.stream() .map(rowRowConverter::toExternal) .map(Row::toString) - .sorted() .collect(Collectors.toList()); }