[mysql] Support primary key with varbinary type (#879)

pull/780/head
fangpc 3 years ago committed by Leonard Xu
parent 3f26906425
commit 18cbde8afb

@ -83,9 +83,11 @@ public class ObjectUtils {
*/
@SuppressWarnings("unchecked")
public static int compare(Object obj1, Object obj2) {
Comparable<Object> c1 = (Comparable<Object>) obj1;
Comparable<Object> c2 = (Comparable<Object>) obj2;
return c1.compareTo(c2);
if (obj1 instanceof Comparable && obj1.getClass().equals(obj2.getClass())) {
return ((Comparable) obj1).compareTo(obj2);
} else {
return obj1.toString().compareTo(obj2.toString());
}
}
/**

@ -901,6 +901,78 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
result.getJobClient().get().cancel().get();
}
@Test
public void testTableWithVarbinaryPrimaryKey() throws Exception {
inventoryDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE demo_varbinary_test ("
+ " order_id VARBINARY(11),"
+ " order_date DATE,"
+ " quantity INT,"
+ " product_id INT,"
+ " purchaser STRING,"
+ " PRIMARY KEY(order_id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'scan.startup.mode' = 'latest-offset',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'debezium.internal.implementation' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
TEST_USER,
TEST_PASSWORD,
inventoryDatabase.getDatabaseName(),
"demo_varbinary_test",
incrementalSnapshot,
getServerId(),
getDezImplementation());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result = tEnv.executeSql("SELECT * FROM demo_varbinary_test");
// wait for the source startup, we don't have a better way to wait it, use sleep for now
do {
Thread.sleep(5000L);
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
CloseableIterator<Row> iterator = result.collect();
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"INSERT INTO demo_varbinary_test VALUES (b'0000010000000100000001000000010000000100000001000000010000000101','2021-03-08', 30, 500, 'flink');"); // 110
statement.execute(
"INSERT INTO demo_varbinary_test VALUES (b'0000010000000100000001000000010000000100000001000000010000000110','2021-03-08', 30, 500, 'flink-sql');");
statement.execute(
"UPDATE demo_varbinary_test SET quantity=50 WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000101';");
statement.execute(
"DELETE FROM demo_varbinary_test WHERE order_id=b'0000010000000100000001000000010000000100000001000000010000000110';");
}
String[] expected =
new String[] {
"+I[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 30, 500, flink]",
"+I[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]",
"-U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 30, 500, flink]",
"+U[[4, 4, 4, 4, 4, 4, 4, 5], 2021-03-08, 50, 500, flink]",
"-D[[4, 4, 4, 4, 4, 4, 4, 6], 2021-03-08, 30, 500, flink-sql]"
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
result.getJobClient().get().cancel().get();
}
@Test
public void testPrimaryKeyWithSnowflakeAlgorithm() throws Exception {
customerDatabase.createAndInitialize();

@ -90,4 +90,15 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
CREATE TABLE category (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
category_name VARCHAR(255)
);
);
CREATE TABLE `demo_varbinary_test` (
`order_id` varbinary(8) NOT NULL,
`order_date` date NOT NULL,
`quantity` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
`purchaser` varchar(512) NOT NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO demo_varbinary_test(order_id,order_date,quantity, product_id, purchaser)
VALUE (b'0000010000000100000001000000010000000100000001000000010000000100','2021-03-08', 30, 500, 'flink');
Loading…
Cancel
Save