[tests] Introduce assertEqualsInOrder/assertEqualsInAnyOrder methods for test

this closes #437
pull/459/head
Leonard Xu 3 years ago
parent 49caad1c6e
commit 669b0916f7

@ -61,9 +61,6 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/** Tests for {@link BinlogSplitReader}. */ /** Tests for {@link BinlogSplitReader}. */
public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase { public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
@ -116,7 +113,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
1, 1,
expected.length, expected.length,
splits.get(splits.size() - 1).getTableId()); splits.get(splits.size() - 1).getTableId());
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
} }
@Test @Test
@ -179,7 +176,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
splits.size(), splits.size(),
expected.length, expected.length,
splits.get(splits.size() - 1).getTableId()); splits.get(splits.size() - 1).getTableId());
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
} }
@Test @Test
@ -219,7 +216,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
splits.size(), splits.size(),
expected.length, expected.length,
splits.get(splits.size() - 1).getTableId()); splits.get(splits.size() - 1).getTableId());
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
} }
@Test @Test
@ -281,7 +278,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
customerDatabase.getDatabaseName() customerDatabase.getDatabaseName()
+ "." + "."
+ "customer_card_single_line")); + "customer_card_single_line"));
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
} }
@Test @Test
@ -305,15 +302,15 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
"+I[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]",
"+U[1010, Hangzhou, Shanghai, 123567891234]",
"-U[1010, user_11, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]",
"+U[1010, Hangzhou, Shanghai, 123567891234]",
"+I[2001, user_22, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]",
"+I[2002, user_23, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]",
"+I[2003, user_24, Shanghai, 123567891234]" "+I[2003, user_24, Shanghai, 123567891234]"
}; };
List<String> actual = List<String> actual =
readBinlogSplitsFromLatestOffset(dataType, configuration, expected.length); readBinlogSplitsFromLatestOffset(dataType, configuration, expected.length);
assertThat(actual, containsInAnyOrder(expected)); assertEqualsInOrder(actual, Arrays.asList(expected));
} }
private List<String> readBinlogSplitsFromLatestOffset( private List<String> readBinlogSplitsFromLatestOffset(

@ -46,8 +46,6 @@ import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
/** Tests for {@link SnapshotSplitReader}. */ /** Tests for {@link SnapshotSplitReader}. */
public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase { public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
@ -89,7 +87,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
"+I[123, user_9, Shanghai, 123567891234]" "+I[123, user_9, Shanghai, 123567891234]"
}; };
List<String> actual = readTableSnapshotSplits(mySqlSplits, configuration, 1, dataType); List<String> actual = readTableSnapshotSplits(mySqlSplits, configuration, 1, dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
} }
@Test @Test
@ -129,7 +127,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
}; };
List<String> actual = List<String> actual =
readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType); readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
} }
@Test @Test
@ -145,7 +143,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
String[] expected = new String[] {"+I[20001, LEVEL_1, user_1, user with level 1]"}; String[] expected = new String[] {"+I[20001, LEVEL_1, user_1, user with level 1]"};
List<String> actual = List<String> actual =
readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType); readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
} }
@Test @Test
@ -185,7 +183,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
}; };
List<String> actual = List<String> actual =
readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType); readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
} }
private List<String> readTableSnapshotSplits( private List<String> readTableSnapshotSplits(

@ -45,9 +45,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** IT tests for {@link MySqlParallelSource}. */ /** IT tests for {@link MySqlParallelSource}. */
public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase { public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase {
@ -207,10 +204,9 @@ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase {
triggerFailover( triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100)); failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
} }
String[] expectedSnapshot = expectedSnapshotData.toArray(new String[0]);
assertThat( assertEqualsInAnyOrder(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); fetchRows(iterator, expectedSnapshotData.size()), expectedSnapshotData);
// second step: check the binlog data // second step: check the binlog data
for (String tableId : captureCustomerTables) { for (String tableId : captureCustomerTables) {
@ -244,8 +240,7 @@ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase {
for (int i = 0; i < captureCustomerTables.length; i++) { for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable)); expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
} }
String[] expectedBinlog = expectedBinlogData.toArray(new String[0]); assertEqualsInAnyOrder(fetchRows(iterator, expectedBinlogData.size()), expectedBinlogData);
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog));
tableResult.getJobClient().get().cancel().get(); tableResult.getJobClient().get().cancel().get();
} }

@ -31,8 +31,14 @@ import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables; import org.testcontainers.lifecycle.Startables;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Basic class for testing {@link MySqlParallelSource}. */ /** Basic class for testing {@link MySqlParallelSource}. */
public abstract class MySqlParallelSourceTestBase extends TestLogger { public abstract class MySqlParallelSourceTestBase extends TestLogger {
@ -65,4 +71,17 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger {
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
LOG.info("Containers are started."); LOG.info("Containers are started.");
} }
public static void assertEqualsInAnyOrder(List<String> actual, List<String> expected) {
assertTrue(actual != null && expected != null);
assertEqualsInOrder(
actual.stream().sorted().collect(Collectors.toList()),
expected.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> actual, List<String> expected) {
assertTrue(actual != null && expected != null);
assertEquals(actual.size(), expected.size());
assertArrayEquals(actual.toArray(new String[0]), expected.toArray(new String[0]));
}
} }

@ -91,9 +91,7 @@ public class MySqlSourceReaderTest extends MySqlParallelSourceTestBase {
}; };
// the 2 records are produced by 1 operations // the 2 records are produced by 1 operations
List<String> actualRecords = consumeRecords(reader, dataType, 1); List<String> actualRecords = consumeRecords(reader, dataType, 1);
assertEquals( assertEqualsInOrder(actualRecords, Arrays.asList(expectedRecords));
Arrays.stream(expectedRecords).sorted().collect(Collectors.toList()),
actualRecords);
List<MySqlSplit> splitsState = reader.snapshotState(1L); List<MySqlSplit> splitsState = reader.snapshotState(1L);
// check the binlog split state // check the binlog split state
assertEquals(1, splitsState.size()); assertEquals(1, splitsState.size());
@ -114,9 +112,7 @@ public class MySqlSourceReaderTest extends MySqlParallelSourceTestBase {
}; };
// the 4 records are produced by 3 operations // the 4 records are produced by 3 operations
List<String> restRecords = consumeRecords(restartReader, dataType, 3); List<String> restRecords = consumeRecords(restartReader, dataType, 3);
assertEquals( assertEqualsInOrder(restRecords, Arrays.asList(expectedRestRecords));
Arrays.stream(expectedRestRecords).sorted().collect(Collectors.toList()),
restRecords);
restartReader.close(); restartReader.close();
} }

@ -38,14 +38,13 @@ import org.junit.runners.Parameterized;
import java.sql.Connection; import java.sql.Connection;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import static com.ververica.cdc.connectors.mysql.MySqlSourceTest.currentMySqlLatestOffset; import static com.ververica.cdc.connectors.mysql.MySqlSourceTest.currentMySqlLatestOffset;
import static org.apache.flink.api.common.JobStatus.RUNNING; import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** Integration tests for MySQL binlog SQL source. */ /** Integration tests for MySQL binlog SQL source. */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@ -208,7 +207,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
}; };
List<String> actual = TestValuesTableFactory.getResults("sink"); List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected)); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@ -270,8 +269,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+I[108, jacket, water resistent black wind breaker, 0.100]", "+I[108, jacket, water resistent black wind breaker, 0.100]",
"+I[109, spare tire, 24 inch spare tire, 22.200]" "+I[109, spare tire, 24 inch spare tire, 22.200]"
}; };
assertThat( assertEqualsInAnyOrder(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); fetchRows(iterator, expectedSnapshot.length), Arrays.asList(expectedSnapshot));
try (Connection connection = inventoryDatabase.getJdbcConnection(); try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
@ -296,7 +295,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+U[111, scooter, Big 2-wheel scooter , 5.170]", "+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]" "-D[111, scooter, Big 2-wheel scooter , 5.170]"
}; };
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); assertEqualsInOrder(
fetchRows(iterator, expectedBinlog.length), Arrays.asList(expectedBinlog));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@ -397,7 +397,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]" "+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]"
}; };
assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected)); assertEqualsInAnyOrder(
fetchRows(result.collect(), expected.length), Arrays.asList(expected));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@ -469,7 +470,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+U[111, scooter, Big 2-wheel scooter , 5.170]", "+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]" "-D[111, scooter, Big 2-wheel scooter , 5.170]"
}; };
assertThat(fetchRows(iterator, expected.length), containsInAnyOrder(expected)); assertEqualsInAnyOrder(
fetchRows(result.collect(), expected.length), Arrays.asList(expected));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@ -537,7 +539,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+U[416927583791428523, China, Hangzhou, West Town address 2]", "+U[416927583791428523, China, Hangzhou, West Town address 2]",
"+I[418257940021724075, Germany, Berlin, West Town address 3]", "+I[418257940021724075, Germany, Berlin, West Town address 3]",
}; };
assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected)); assertEqualsInAnyOrder(
fetchRows(result.collect(), expected.length), Arrays.asList(expected));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@ -626,7 +629,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink"); List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected)); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@ -711,7 +714,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
}; };
List<String> actual = TestValuesTableFactory.getResults("sink"); List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected)); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@ -786,7 +789,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"}; new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink"); List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected)); assertEqualsInAnyOrder(actual, Arrays.asList(expected));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }

@ -56,8 +56,8 @@ import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.hamcrest.Matchers.containsInAnyOrder; import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase.assertEqualsInAnyOrder;
import static org.junit.Assert.assertThat; import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase.assertEqualsInOrder;
/** Integration tests to check mysql-cdc works well under different MySQL server timezone. */ /** Integration tests to check mysql-cdc works well under different MySQL server timezone. */
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
@ -190,8 +190,8 @@ public class MysqlTimezoneITCase {
new String[] { new String[] {
"+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]", "+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]",
}; };
assertThat( assertEqualsInAnyOrder(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); fetchRows(iterator, expectedSnapshot.length), Arrays.asList(expectedSnapshot));
try (Connection connection = fullTypesDatabase.getJdbcConnection(); try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
@ -206,7 +206,9 @@ public class MysqlTimezoneITCase {
"+U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22]" "+U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22]"
}; };
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); assertEqualsInOrder(
fetchRows(iterator, expectedBinlog.length), Arrays.asList(expectedBinlog));
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }

@ -87,7 +87,6 @@ public class RecordsFormatter {
return collector.list.stream() return collector.list.stream()
.map(rowRowConverter::toExternal) .map(rowRowConverter::toExternal)
.map(Row::toString) .map(Row::toString)
.sorted()
.collect(Collectors.toList()); .collect(Collectors.toList());
} }

Loading…
Cancel
Save