[tests] Introduce assertEqualsInOrder/assertEqualsInAnyOrder methods for test

this closes #437
pull/443/head
Leonard Xu 3 years ago committed by GitHub
parent d89373b294
commit ee3d7f4584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -61,9 +61,6 @@ import static com.ververica.cdc.connectors.mysql.source.MySqlSourceOptions.SCAN_
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/** Tests for {@link BinlogSplitReader}. */
public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
@ -116,7 +113,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
1,
expected.length,
splits.get(splits.size() - 1).getTableId());
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
}
@Test
@ -179,7 +176,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
splits.size(),
expected.length,
splits.get(splits.size() - 1).getTableId());
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
}
@Test
@ -219,7 +216,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
splits.size(),
expected.length,
splits.get(splits.size() - 1).getTableId());
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
}
@Test
@ -281,7 +278,7 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
customerDatabase.getDatabaseName()
+ "."
+ "customer_card_single_line"));
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
}
@Test
@ -305,15 +302,15 @@ public class BinlogSplitReaderTest extends MySqlParallelSourceTestBase {
"+I[102, user_2, Shanghai, 123567891234]",
"-U[103, user_3, Hangzhou, 123567891234]",
"+U[103, user_3, Shanghai, 123567891234]",
"+U[1010, Hangzhou, Shanghai, 123567891234]",
"-U[1010, user_11, Shanghai, 123567891234]",
"+U[1010, Hangzhou, Shanghai, 123567891234]",
"+I[2001, user_22, Shanghai, 123567891234]",
"+I[2002, user_23, Shanghai, 123567891234]",
"+I[2003, user_24, Shanghai, 123567891234]"
};
List<String> actual =
readBinlogSplitsFromLatestOffset(dataType, configuration, expected.length);
assertThat(actual, containsInAnyOrder(expected));
assertEqualsInOrder(actual, Arrays.asList(expected));
}
private List<String> readBinlogSplitsFromLatestOffset(

@ -46,8 +46,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
/** Tests for {@link SnapshotSplitReader}. */
public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
@ -89,7 +87,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
"+I[123, user_9, Shanghai, 123567891234]"
};
List<String> actual = readTableSnapshotSplits(mySqlSplits, configuration, 1, dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
}
@Test
@ -129,7 +127,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
}
@Test
@ -145,7 +143,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
String[] expected = new String[] {"+I[20001, LEVEL_1, user_1, user with level 1]"};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
}
@Test
@ -185,7 +183,7 @@ public class SnapshotSplitReaderTest extends MySqlParallelSourceTestBase {
};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, configuration, mySqlSplits.size(), dataType);
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
}
private List<String> readTableSnapshotSplits(

@ -45,9 +45,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** IT tests for {@link MySqlParallelSource}. */
public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase {
@ -207,10 +204,9 @@ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase {
triggerFailover(
failoverType, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(100));
}
String[] expectedSnapshot = expectedSnapshotData.toArray(new String[0]);
assertThat(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot));
assertEqualsInAnyOrder(
fetchRows(iterator, expectedSnapshotData.size()), expectedSnapshotData);
// second step: check the binlog data
for (String tableId : captureCustomerTables) {
@ -244,8 +240,7 @@ public class MySqlParallelSourceITCase extends MySqlParallelSourceTestBase {
for (int i = 0; i < captureCustomerTables.length; i++) {
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
}
String[] expectedBinlog = expectedBinlogData.toArray(new String[0]);
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog));
assertEqualsInAnyOrder(fetchRows(iterator, expectedBinlogData.size()), expectedBinlogData);
tableResult.getJobClient().get().cancel().get();
}

@ -31,8 +31,14 @@ import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Basic class for testing {@link MySqlParallelSource}. */
public abstract class MySqlParallelSourceTestBase extends TestLogger {
@ -65,4 +71,17 @@ public abstract class MySqlParallelSourceTestBase extends TestLogger {
Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
LOG.info("Containers are started.");
}
public static void assertEqualsInAnyOrder(List<String> actual, List<String> expected) {
assertTrue(actual != null && expected != null);
assertEqualsInOrder(
actual.stream().sorted().collect(Collectors.toList()),
expected.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> actual, List<String> expected) {
assertTrue(actual != null && expected != null);
assertEquals(actual.size(), expected.size());
assertArrayEquals(actual.toArray(new String[0]), expected.toArray(new String[0]));
}
}

@ -91,9 +91,7 @@ public class MySqlSourceReaderTest extends MySqlParallelSourceTestBase {
};
// the 2 records are produced by 1 operations
List<String> actualRecords = consumeRecords(reader, dataType, 1);
assertEquals(
Arrays.stream(expectedRecords).sorted().collect(Collectors.toList()),
actualRecords);
assertEqualsInOrder(actualRecords, Arrays.asList(expectedRecords));
List<MySqlSplit> splitsState = reader.snapshotState(1L);
// check the binlog split state
assertEquals(1, splitsState.size());
@ -114,9 +112,7 @@ public class MySqlSourceReaderTest extends MySqlParallelSourceTestBase {
};
// the 4 records are produced by 3 operations
List<String> restRecords = consumeRecords(restartReader, dataType, 3);
assertEquals(
Arrays.stream(expectedRestRecords).sorted().collect(Collectors.toList()),
restRecords);
assertEqualsInOrder(restRecords, Arrays.asList(expectedRestRecords));
restartReader.close();
}

@ -38,14 +38,13 @@ import org.junit.runners.Parameterized;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import static com.ververica.cdc.connectors.mysql.MySqlSourceTest.currentMySqlLatestOffset;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
/** Integration tests for MySQL binlog SQL source. */
@RunWith(Parameterized.class)
@ -208,7 +207,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
result.getJobClient().get().cancel().get();
}
@ -270,8 +269,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+I[108, jacket, water resistent black wind breaker, 0.100]",
"+I[109, spare tire, 24 inch spare tire, 22.200]"
};
assertThat(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot));
assertEqualsInAnyOrder(
fetchRows(iterator, expectedSnapshot.length), Arrays.asList(expectedSnapshot));
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
@ -296,7 +295,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
};
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog));
assertEqualsInOrder(
fetchRows(iterator, expectedBinlog.length), Arrays.asList(expectedBinlog));
result.getJobClient().get().cancel().get();
}
@ -397,7 +397,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22, ZRrvv70IOQ9I77+977+977+9Nu+/vT57dAA=]"
};
assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected));
assertEqualsInAnyOrder(
fetchRows(result.collect(), expected.length), Arrays.asList(expected));
result.getJobClient().get().cancel().get();
}
@ -469,7 +470,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
};
assertThat(fetchRows(iterator, expected.length), containsInAnyOrder(expected));
assertEqualsInAnyOrder(
fetchRows(result.collect(), expected.length), Arrays.asList(expected));
result.getJobClient().get().cancel().get();
}
@ -537,7 +539,8 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
"+U[416927583791428523, China, Hangzhou, West Town address 2]",
"+I[418257940021724075, Germany, Berlin, West Town address 3]",
};
assertThat(fetchRows(result.collect(), expected.length), containsInAnyOrder(expected));
assertEqualsInAnyOrder(
fetchRows(result.collect(), expected.length), Arrays.asList(expected));
result.getJobClient().get().cancel().get();
}
@ -626,7 +629,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
result.getJobClient().get().cancel().get();
}
@ -711,7 +714,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
result.getJobClient().get().cancel().get();
}
@ -786,7 +789,7 @@ public class MySqlConnectorITCase extends MySqlParallelSourceTestBase {
new String[] {"+I[110, jacket, new water resistent white wind breaker, 0.500]"};
List<String> actual = TestValuesTableFactory.getResults("sink");
assertThat(actual, containsInAnyOrder(expected));
assertEqualsInAnyOrder(actual, Arrays.asList(expected));
result.getJobClient().get().cancel().get();
}

@ -56,8 +56,8 @@ import java.util.Random;
import java.util.UUID;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase.assertEqualsInAnyOrder;
import static com.ververica.cdc.connectors.mysql.source.MySqlParallelSourceTestBase.assertEqualsInOrder;
/** Integration tests to check mysql-cdc works well under different MySQL server timezone. */
@RunWith(Parameterized.class)
@ -190,8 +190,8 @@ public class MysqlTimezoneITCase {
new String[] {
"+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]",
};
assertThat(
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot));
assertEqualsInAnyOrder(
fetchRows(iterator, expectedSnapshot.length), Arrays.asList(expectedSnapshot));
try (Connection connection = fullTypesDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
@ -206,7 +206,9 @@ public class MysqlTimezoneITCase {
"+U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22]"
};
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog));
assertEqualsInOrder(
fetchRows(iterator, expectedBinlog.length), Arrays.asList(expectedBinlog));
result.getJobClient().get().cancel().get();
}

@ -87,7 +87,6 @@ public class RecordsFormatter {
return collector.list.stream()
.map(rowRowConverter::toExternal)
.map(Row::toString)
.sorted()
.collect(Collectors.toList());
}

Loading…
Cancel
Save