From dc5a0c20df87072d5a0b50ab9b2d1e4e028a84d7 Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Sat, 13 Nov 2021 22:48:44 +0800 Subject: [PATCH] [mysql] Reading a null column with default value defined should return null instead of default value (#555) --- .../RowDataDebeziumDeserializeSchema.java | 2 +- .../mysql/table/MySqlConnectorITCase.java | 58 +++++++++++++++++++ .../src/test/resources/ddl/customer.sql | 5 +- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java index 4f430fbd6..4d2c1149f 100644 --- a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java @@ -605,7 +605,7 @@ public final class RowDataDebeziumDeserializeSchema if (field == null) { row.setField(i, null); } else { - Object fieldValue = struct.get(field); + Object fieldValue = struct.getWithoutDefault(fieldName); Schema fieldSchema = schema.field(fieldName).schema(); Object convertedField = convertField(fieldConverters[i], fieldValue, fieldSchema); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 05b8e03e0..320838f7f 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -1263,6 +1263,64 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testColumnOptionalWithDefaultValue() throws Exception { + customerDatabase.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " `product_no` DECIMAL(20, 4) NOT NULL," + + " product_kind STRING," + + " user_id STRING," + + " description STRING," + + " primary key (`product_no`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'debezium.internal.implementation' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + customerDatabase.getUsername(), + customerDatabase.getPassword(), + customerDatabase.getDatabaseName(), + "shopping_cart_dec", + getDezImplementation(), + incrementalSnapshot, + getServerId(), + getSplitSize()); + tEnv.executeSql(sourceDDL); + // async submit job + TableResult result = + tEnv.executeSql( + "SELECT product_no,\n" + + "product_kind,\n" + + "user_id,\n" + + "description FROM debezium_source"); + + CloseableIterator iterator = result.collect(); + waitForSnapshotStarted(iterator); + + String[] expected = + new String[] { + "+I[123456.1230, KIND_001, user_1, my shopping cart]", + "+I[123457.4560, KIND_002, user_2, my shopping cart]", + "+I[123458.6789, KIND_003, user_3, my shopping cart]", + "+I[123459.1234, KIND_004, user_4, null]" + }; + assertEqualsInAnyOrder( + Arrays.asList(expected), fetchRows(result.collect(), expected.length)); + result.getJobClient().get().cancel().get(); + } + // ------------------------------------------------------------------------------------ private String getDezImplementation() { diff --git a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql index f627d6c70..7d3fe8922 100644 --- a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql +++ b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql @@ -222,14 +222,15 @@ CREATE TABLE shopping_cart_dec ( product_no DECIMAL(10, 4) NOT NULL, product_kind VARCHAR(255), user_id VARCHAR(255) NOT NULL, - description VARCHAR(255) NOT NULL, + description VARCHAR(255) DEFAULT 'flink', PRIMARY KEY(product_no) ); insert into shopping_cart_dec VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'), (123457.456, 'KIND_002', 'user_2', 'my shopping cart'), - (123458.6789, 'KIND_003', 'user_3', 'my shopping cart'); + (123458.6789, 'KIND_003', 'user_3', 'my shopping cart'), + (123459.1234, 'KIND_004', 'user_4', null); -- create table whose primary key are produced by snowflake algorithm CREATE TABLE address (