[mysql] Improve the test to cover table with varbinary primary key (#879)

This closes #879.
pull/780/head
Leonard Xu 3 years ago
parent 18cbde8afb
commit 16ec365523

@ -560,8 +560,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
+ "]",
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}
@ -632,8 +631,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[0, 1024, " + getIntegerSeqString(2, tableColumnCount) + "]"
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}
@ -896,17 +894,19 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}
@Test
public void testTableWithVarbinaryPrimaryKey() throws Exception {
public void testPrimaryKeyWithVarbinaryType() throws Exception {
if (!incrementalSnapshot) {
return;
}
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE demo_varbinary_test ("
"CREATE TABLE varbinary_pk_table ("
+ " order_id VARBINARY(11),"
+ " order_date DATE,"
+ " quantity INT,"
@ -921,24 +921,21 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = 'latest-offset',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'debezium.internal.implementation' = '%s'"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
TEST_USER,
TEST_PASSWORD,
inventoryDatabase.getDatabaseName(),
"demo_varbinary_test",
incrementalSnapshot,
"varbinary_pk_table",
getServerId(),
getDezImplementation());
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM demo_varbinary_test");
TableResult result = tEnv.executeSql("SELECT * FROM varbinary_pk_table");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
do {
@ -949,27 +946,32 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO demo_varbinary_test VALUES (b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', 30, 500, 'flink');"); // 110
"INSERT INTO varbinary_pk_table VALUES (b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', 30, 500, 'flink');"); // 110
statement.execute(
"INSERT INTO demo_varbinary_test VALUES (b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', 30, 500, 'flink-sql');");
"INSERT INTO varbinary_pk_table VALUES (b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', 30, 500, 'flink-sql');");
statement.execute(
"UPDATE demo_varbinary_test SET quantity=50 WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000101';");
"UPDATE varbinary_pk_table SET quantity=50 WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000101';");
statement.execute(
"DELETE FROM demo_varbinary_test WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000110';");
"DELETE FROM varbinary_pk_table WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000110';");
}
String[] expected =
new String[] {
// snapshot records
"+I[[4, 4, 4, 4, 4, 4, 4, 0], 2021-03-08, 0, 0, flink]",
"+I[[4, 4, 4, 4, 4, 4, 4, 1], 2021-03-08, 10, 100, flink]",
"+I[[4, 4, 4, 4, 4, 4, 4, 2], 2021-03-08, 20, 200, flink]",
"+I[[4, 4, 4, 4, 4, 4, 4, 3], 2021-03-08, 30, 300, flink]",
"+I[[4, 4, 4, 4, 4, 4, 4, 4], 2021-03-08, 40, 400, flink]",
// binlog records
"+I[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 30, 500, flink]",
"+I[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]",
"-U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 30, 500, flink]",
"+U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 50, 500, flink]",
"-D[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]"
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}
@ -1037,8 +1039,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[416927583791428523, China, Hangzhou, West Town address 2]",
"+I[418257940021724075, Germany, Berlin, West Town address 3]"
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}
@ -1111,8 +1112,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+U[221, user_221, Shanghai, 123567891234, null, 20]",
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}
@ -1419,8 +1419,7 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
"+I[123458.6789, KIND_003, user_3, my shopping cart]",
"+I[123459.1234, KIND_004, user_4, null]"
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
}

@ -87,18 +87,24 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-02-19', 1002, 2, 106),
(default, '16-02-21', 1003, 1, 107);
CREATE TABLE category (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
CREATE TABLE category
(
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
category_name VARCHAR(255)
);
CREATE TABLE `demo_varbinary_test` (
`order_id` varbinary(8) NOT NULL,
`order_date` date NOT NULL,
`quantity` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`purchaser` varchar(512) NOT NULL,
PRIMARY KEY (`order_id`)
CREATE TABLE `varbinary_pk_table`
(
`order_id` varbinary(8) NOT NULL,
`order_date` date NOT NULL,
`quantity` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`purchaser` varchar(512) NOT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO demo_varbinary_test(order_id,order_date,quantity, product_id, purchaser)
VALUE (b'0000010000000100000001000000010000000100000001000000010000000100','2021-03-08', 30, 500, 'flink');
INSERT INTO varbinary_pk_table
VALUES (b'0000010000000100000001000000010000000100000001000000010000000000', '2021-03-08', 0, 0, 'flink'),
(b'0000010000000100000001000000010000000100000001000000010000000001', '2021-03-08', 10, 100, 'flink'),
(b'0000010000000100000001000000010000000100000001000000010000000010', '2021-03-08', 20, 200, 'flink'),
(b'0000010000000100000001000000010000000100000001000000010000000011', '2021-03-08', 30, 300, 'flink'),
(b'0000010000000100000001000000010000000100000001000000010000000100', '2021-03-08', 40, 400, 'flink');
Loading…
Cancel
Save