[mysql] Reading a null column with default value defined should return null instead of default value (#555)

pull/605/head
gongzhongqiang 3 years ago committed by GitHub
parent 7ae20a1277
commit dc5a0c20df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -605,7 +605,7 @@ public final class RowDataDebeziumDeserializeSchema
if (field == null) { if (field == null) {
row.setField(i, null); row.setField(i, null);
} else { } else {
Object fieldValue = struct.get(field); Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema(); Schema fieldSchema = schema.field(fieldName).schema();
Object convertedField = Object convertedField =
convertField(fieldConverters[i], fieldValue, fieldSchema); convertField(fieldConverters[i], fieldValue, fieldSchema);

@ -1263,6 +1263,64 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {
result.getJobClient().get().cancel().get(); result.getJobClient().get().cancel().get();
} }
@Test
public void testColumnOptionalWithDefaultValue() throws Exception {
customerDatabase.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
+ " `product_no` DECIMAL(20, 4) NOT NULL,"
+ " product_kind STRING,"
+ " user_id STRING,"
+ " description STRING,"
+ " primary key (`product_no`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s',"
+ " 'debezium.internal.implementation' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customerDatabase.getUsername(),
customerDatabase.getPassword(),
customerDatabase.getDatabaseName(),
"shopping_cart_dec",
getDezImplementation(),
incrementalSnapshot,
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"SELECT product_no,\n"
+ "product_kind,\n"
+ "user_id,\n"
+ "description FROM debezium_source");
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);
String[] expected =
new String[] {
"+I[123456.1230, KIND_001, user_1, my shopping cart]",
"+I[123457.4560, KIND_002, user_2, my shopping cart]",
"+I[123458.6789, KIND_003, user_3, my shopping cart]",
"+I[123459.1234, KIND_004, user_4, null]"
};
assertEqualsInAnyOrder(
Arrays.asList(expected), fetchRows(result.collect(), expected.length));
result.getJobClient().get().cancel().get();
}
// ------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------
private String getDezImplementation() { private String getDezImplementation() {

@ -222,14 +222,15 @@ CREATE TABLE shopping_cart_dec (
product_no DECIMAL(10, 4) NOT NULL, product_no DECIMAL(10, 4) NOT NULL,
product_kind VARCHAR(255), product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL, user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL, description VARCHAR(255) DEFAULT 'flink',
PRIMARY KEY(product_no) PRIMARY KEY(product_no)
); );
insert into shopping_cart_dec insert into shopping_cart_dec
VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'), VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'),
(123457.456, 'KIND_002', 'user_2', 'my shopping cart'), (123457.456, 'KIND_002', 'user_2', 'my shopping cart'),
(123458.6789, 'KIND_003', 'user_3', 'my shopping cart'); (123458.6789, 'KIND_003', 'user_3', 'my shopping cart'),
(123459.1234, 'KIND_004', 'user_4', null);
-- create table whose primary key are produced by snowflake algorithm -- create table whose primary key are produced by snowflake algorithm
CREATE TABLE address ( CREATE TABLE address (

Loading…
Cancel
Save