|
|
|
@ -94,9 +94,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
DataTypes.FIELD("name", DataTypes.STRING()),
|
|
|
|
|
DataTypes.FIELD("address", DataTypes.STRING()),
|
|
|
|
|
DataTypes.FIELD("phone_number", DataTypes.STRING()));
|
|
|
|
|
final RowType pkType =
|
|
|
|
|
(RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType();
|
|
|
|
|
List<MySqlSnapshotSplit> splits = getMySqlSplits(configuration, pkType);
|
|
|
|
|
List<MySqlSnapshotSplit> splits = getMySqlSplits(configuration);
|
|
|
|
|
String[] expected =
|
|
|
|
|
new String[] {
|
|
|
|
|
"+I[101, user_1, Shanghai, 123567891234]",
|
|
|
|
@ -115,6 +113,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
"+I[121, user_8, Shanghai, 123567891234]",
|
|
|
|
|
"+I[123, user_9, Shanghai, 123567891234]"
|
|
|
|
|
};
|
|
|
|
|
final RowType pkType =
|
|
|
|
|
(RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType();
|
|
|
|
|
List<String> actual =
|
|
|
|
|
readBinlogSplits(
|
|
|
|
|
splits,
|
|
|
|
@ -139,9 +139,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
DataTypes.FIELD("name", DataTypes.STRING()),
|
|
|
|
|
DataTypes.FIELD("address", DataTypes.STRING()),
|
|
|
|
|
DataTypes.FIELD("phone_number", DataTypes.STRING()));
|
|
|
|
|
final RowType pkType =
|
|
|
|
|
(RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType();
|
|
|
|
|
List<MySqlSnapshotSplit> splits = getMySqlSplits(configuration, pkType);
|
|
|
|
|
List<MySqlSnapshotSplit> splits = getMySqlSplits(configuration);
|
|
|
|
|
|
|
|
|
|
String[] expected =
|
|
|
|
|
new String[] {
|
|
|
|
@ -178,6 +176,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
"+I[2002, user_23, Shanghai, 123567891234]",
|
|
|
|
|
"+I[2003, user_24, Shanghai, 123567891234]"
|
|
|
|
|
};
|
|
|
|
|
final RowType pkType =
|
|
|
|
|
(RowType) DataTypes.ROW(DataTypes.FIELD("id", DataTypes.BIGINT())).getLogicalType();
|
|
|
|
|
List<String> actual =
|
|
|
|
|
readBinlogSplits(
|
|
|
|
|
splits,
|
|
|
|
@ -202,13 +202,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
DataTypes.FIELD("level", DataTypes.STRING()),
|
|
|
|
|
DataTypes.FIELD("name", DataTypes.STRING()),
|
|
|
|
|
DataTypes.FIELD("note", DataTypes.STRING()));
|
|
|
|
|
final RowType pkType =
|
|
|
|
|
(RowType)
|
|
|
|
|
DataTypes.ROW(
|
|
|
|
|
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
|
|
|
|
|
DataTypes.FIELD("level", DataTypes.STRING()))
|
|
|
|
|
.getLogicalType();
|
|
|
|
|
List<MySqlSnapshotSplit> splits = getMySqlSplits(configuration, pkType);
|
|
|
|
|
List<MySqlSnapshotSplit> splits = getMySqlSplits(configuration);
|
|
|
|
|
|
|
|
|
|
String[] expected =
|
|
|
|
|
new String[] {
|
|
|
|
@ -218,6 +212,12 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
"+I[20002, LEVEL_3, user_3, user with level 3]"
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
final RowType pkType =
|
|
|
|
|
(RowType)
|
|
|
|
|
DataTypes.ROW(
|
|
|
|
|
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
|
|
|
|
|
DataTypes.FIELD("level", DataTypes.STRING()))
|
|
|
|
|
.getLogicalType();
|
|
|
|
|
List<String> actual =
|
|
|
|
|
readBinlogSplits(
|
|
|
|
|
splits,
|
|
|
|
@ -249,7 +249,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
DataTypes.FIELD("card_no", DataTypes.BIGINT()),
|
|
|
|
|
DataTypes.FIELD("level", DataTypes.STRING()))
|
|
|
|
|
.getLogicalType();
|
|
|
|
|
List<MySqlSnapshotSplit> splits = getMySqlSplits(configuration, pkType);
|
|
|
|
|
List<MySqlSnapshotSplit> splits = getMySqlSplits(configuration);
|
|
|
|
|
String[] expected =
|
|
|
|
|
new String[] {
|
|
|
|
|
"+I[20000, LEVEL_1, user_1, user with level 1]",
|
|
|
|
@ -323,13 +323,12 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
"+I[2003, user_24, Shanghai, 123567891234]"
|
|
|
|
|
};
|
|
|
|
|
List<String> actual =
|
|
|
|
|
readBinlogSplitsFromLatestOffset(dataType, pkType, configuration, expected.length);
|
|
|
|
|
readBinlogSplitsFromLatestOffset(dataType, configuration, expected.length);
|
|
|
|
|
assertEquals(Arrays.stream(expected).sorted().collect(Collectors.toList()), actual);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private List<String> readBinlogSplitsFromLatestOffset(
|
|
|
|
|
DataType dataType, RowType pkType, Configuration configuration, int expectedSize)
|
|
|
|
|
throws Exception {
|
|
|
|
|
DataType dataType, Configuration configuration, int expectedSize) throws Exception {
|
|
|
|
|
final StatefulTaskContext statefulTaskContext =
|
|
|
|
|
new StatefulTaskContext(configuration, binaryLogClient, mySqlConnection);
|
|
|
|
|
|
|
|
|
@ -586,7 +585,7 @@ public class BinlogSplitReaderTest extends MySqlTestBase {
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private List<MySqlSnapshotSplit> getMySqlSplits(Configuration configuration, RowType pkType) {
|
|
|
|
|
private List<MySqlSnapshotSplit> getMySqlSplits(Configuration configuration) {
|
|
|
|
|
final MySqlSnapshotSplitAssigner assigner =
|
|
|
|
|
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
|
|
|
|
|
assigner.open();
|
|
|
|
|