From 8ca3091805832cb5c06555af0d06bce0044ae4a2 Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Tue, 5 Dec 2023 02:11:24 +0800 Subject: [PATCH] [cdc-pipeline-connector][mysql] Ensure the precision inference of DECIMAL type matches table schema --- .../mysql/source/MySqlFullTypesITCase.java | 24 +++++++------ .../DebeziumEventDeserializationSchema.java | 3 +- .../DebeziumSchemaDataTypeInference.java | 35 ++++++++++++++----- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java index e86c0870c..447b557e2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlFullTypesITCase.java @@ -109,7 +109,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { public void testMysql57TimeDataTypes() throws Throwable { RowType recordType = RowType.of( - DataTypes.DECIMAL(1, 0).notNull(), + DataTypes.DECIMAL(20, 0).notNull(), DataTypes.INT(), DataTypes.DATE(), DataTypes.TIME(0), @@ -122,7 +122,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { Object[] expectedSnapshot = new Object[] { - DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0), + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), 2021, 18460, 64822000, @@ -139,7 +139,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { Object[] expectedStreamRecord = new Object[] { - DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0), + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), 2021, 18460, 64822000, @@ -159,7 +159,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { public void testMysql8TimeDataTypes() throws Throwable { RowType recordType = RowType.of( - DataTypes.DECIMAL(1, 0).notNull(), + DataTypes.DECIMAL(20, 0).notNull(), DataTypes.INT(), DataTypes.DATE(), DataTypes.TIME(0), @@ -174,7 +174,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { Object[] expectedSnapshot = new Object[] { - DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0), + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), 2021, 18460, 64822000, @@ -194,7 +194,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { Object[] expectedStreamRecord = new Object[] { - DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0), + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), 2021, 18460, 64822000, @@ -241,7 +241,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { Object[] expectedSnapshot = new Object[] { - DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0), + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), (byte) 127, (short) 255, (short) 255, @@ -270,8 +270,9 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { DecimalData.fromBigDecimal(new BigDecimal("123.4567"), 8, 4), DecimalData.fromBigDecimal(new BigDecimal("123.4568"), 8, 4), DecimalData.fromBigDecimal(new BigDecimal("123.4569"), 8, 4), - DecimalData.fromBigDecimal(new BigDecimal("345.6"), 6, 0), - DecimalData.fromBigDecimal(new BigDecimal("34567892.1"), 9, 1), + DecimalData.fromBigDecimal(new BigDecimal("346"), 6, 0), + // Decimal precision larger than 38 will be treated as string. + BinaryStringData.fromString("34567892.1"), false, true, true, @@ -372,7 +373,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { private static final RowType COMMON_TYPES = RowType.of( - DataTypes.DECIMAL(1, 0).notNull(), + DataTypes.DECIMAL(20, 0).notNull(), DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), @@ -402,7 +403,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { DataTypes.DECIMAL(8, 4), DataTypes.DECIMAL(8, 4), DataTypes.DECIMAL(6, 0), - DataTypes.DECIMAL(9, 1), + // Decimal precision larger than 38 will be treated as string. + DataTypes.STRING(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java index d6798c65d..b6f6ddb06 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -21,6 +21,7 @@ import org.apache.flink.util.Collector; import com.ververica.cdc.common.annotation.Internal; import com.ververica.cdc.common.data.DecimalData; +import com.ververica.cdc.common.data.LocalZonedTimestampData; import com.ververica.cdc.common.data.RecordData; import com.ververica.cdc.common.data.TimestampData; import com.ververica.cdc.common.data.binary.BinaryStringData; @@ -330,7 +331,7 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve String str = (String) dbzObj; // TIMESTAMP_LTZ type is encoded in string type Instant instant = Instant.parse(str); - return TimestampData.fromMillis(instant.toEpochMilli(), instant.getNano()); + return LocalZonedTimestampData.fromInstant(instant); } throw new IllegalArgumentException( "Unable to convert to TimestampData from unexpected value '" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java index f575fa8ec..460d760a2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/event/DebeziumSchemaDataTypeInference.java @@ -20,6 +20,8 @@ import com.ververica.cdc.common.annotation.Internal; import com.ververica.cdc.common.types.DataField; import com.ververica.cdc.common.types.DataType; import com.ververica.cdc.common.types.DataTypes; +import com.ververica.cdc.common.types.DecimalType; +import io.debezium.data.SpecialValueDecimal; import io.debezium.data.VariableScaleDecimal; import io.debezium.time.MicroTime; import io.debezium.time.MicroTimestamp; @@ -38,14 +40,16 @@ import java.math.BigDecimal; import java.time.Instant; import java.util.Optional; -import static com.ververica.cdc.common.types.DecimalType.DEFAULT_PRECISION; - /** {@link DataType} inference for debezium {@link Schema}. */ @Internal public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference, Serializable { private static final long serialVersionUID = 1L; + public static final String PRECISION_PARAMETER_KEY = "connect.decimal.precision"; + + public static final int DEFAULT_DECIMAL_PRECISION = 20; + @Override public DataType infer(Object value, Schema schema) { return schema.isOptional() @@ -154,25 +158,38 @@ public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference, } else { precision = 0; } - return DataTypes.TIMESTAMP(precision); + return DataTypes.TIMESTAMP_LTZ(precision); } return DataTypes.STRING(); } protected DataType inferBytes(Object value, Schema schema) { - if (Decimal.LOGICAL_NAME.equals(schema.name()) - || VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { - if (value instanceof BigDecimal) { - BigDecimal decimal = (BigDecimal) value; - return DataTypes.DECIMAL(decimal.precision(), decimal.scale()); + if (Decimal.LOGICAL_NAME.equals(schema.name())) { + int scale = + Optional.ofNullable(schema.parameters().get(Decimal.SCALE_FIELD)) + .map(Integer::parseInt) + .orElse(DecimalType.DEFAULT_SCALE); + + int precision = + Optional.ofNullable(schema.parameters().get(PRECISION_PARAMETER_KEY)) + .map(Integer::parseInt) + .orElse(DEFAULT_DECIMAL_PRECISION); + + if (precision > DecimalType.MAX_PRECISION) { + return DataTypes.STRING(); } - return DataTypes.DECIMAL(DEFAULT_PRECISION, 0); + return DataTypes.DECIMAL(precision, scale); } return DataTypes.BYTES(); } protected DataType inferStruct(Object value, Schema schema) { Struct struct = (Struct) value; + if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + SpecialValueDecimal decimal = VariableScaleDecimal.toLogical(struct); + BigDecimal bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); + return DataTypes.DECIMAL(bigDecimal.precision(), bigDecimal.scale()); + } return DataTypes.ROW( schema.fields().stream() .map(