[cdc-pipeline-connector][mysql] Ensure the precision inference of DECIMAL type matches table schema

pull/2810/head
Jiabao Sun 1 year ago committed by Hang Ruan
parent 1766e82d7c
commit 8ca3091805

@ -109,7 +109,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
public void testMysql57TimeDataTypes() throws Throwable {
RowType recordType =
RowType.of(
DataTypes.DECIMAL(1, 0).notNull(),
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.INT(),
DataTypes.DATE(),
DataTypes.TIME(0),
@ -122,7 +122,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
Object[] expectedSnapshot =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0),
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
2021,
18460,
64822000,
@ -139,7 +139,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
Object[] expectedStreamRecord =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0),
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
2021,
18460,
64822000,
@ -159,7 +159,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
public void testMysql8TimeDataTypes() throws Throwable {
RowType recordType =
RowType.of(
DataTypes.DECIMAL(1, 0).notNull(),
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.INT(),
DataTypes.DATE(),
DataTypes.TIME(0),
@ -174,7 +174,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
Object[] expectedSnapshot =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0),
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
2021,
18460,
64822000,
@ -194,7 +194,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
Object[] expectedStreamRecord =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0),
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
2021,
18460,
64822000,
@ -241,7 +241,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
Object[] expectedSnapshot =
new Object[] {
DecimalData.fromBigDecimal(new BigDecimal("1"), 1, 0),
DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0),
(byte) 127,
(short) 255,
(short) 255,
@ -270,8 +270,9 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
DecimalData.fromBigDecimal(new BigDecimal("123.4567"), 8, 4),
DecimalData.fromBigDecimal(new BigDecimal("123.4568"), 8, 4),
DecimalData.fromBigDecimal(new BigDecimal("123.4569"), 8, 4),
DecimalData.fromBigDecimal(new BigDecimal("345.6"), 6, 0),
DecimalData.fromBigDecimal(new BigDecimal("34567892.1"), 9, 1),
DecimalData.fromBigDecimal(new BigDecimal("346"), 6, 0),
// Decimal precision larger than 38 will be treated as string.
BinaryStringData.fromString("34567892.1"),
false,
true,
true,
@ -372,7 +373,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
private static final RowType COMMON_TYPES =
RowType.of(
DataTypes.DECIMAL(1, 0).notNull(),
DataTypes.DECIMAL(20, 0).notNull(),
DataTypes.TINYINT(),
DataTypes.SMALLINT(),
DataTypes.SMALLINT(),
@ -402,7 +403,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
DataTypes.DECIMAL(8, 4),
DataTypes.DECIMAL(8, 4),
DataTypes.DECIMAL(6, 0),
DataTypes.DECIMAL(9, 1),
// Decimal precision larger than 38 will be treated as string.
DataTypes.STRING(),
DataTypes.BOOLEAN(),
DataTypes.BOOLEAN(),
DataTypes.BOOLEAN(),

@ -21,6 +21,7 @@ import org.apache.flink.util.Collector;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.data.DecimalData;
import com.ververica.cdc.common.data.LocalZonedTimestampData;
import com.ververica.cdc.common.data.RecordData;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.binary.BinaryStringData;
@ -330,7 +331,7 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
String str = (String) dbzObj;
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return TimestampData.fromMillis(instant.toEpochMilli(), instant.getNano());
return LocalZonedTimestampData.fromInstant(instant);
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"

@ -20,6 +20,8 @@ import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.types.DataField;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import com.ververica.cdc.common.types.DecimalType;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
@ -38,14 +40,16 @@ import java.math.BigDecimal;
import java.time.Instant;
import java.util.Optional;
import static com.ververica.cdc.common.types.DecimalType.DEFAULT_PRECISION;
/** {@link DataType} inference for debezium {@link Schema}. */
@Internal
public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference, Serializable {
private static final long serialVersionUID = 1L;
public static final String PRECISION_PARAMETER_KEY = "connect.decimal.precision";
public static final int DEFAULT_DECIMAL_PRECISION = 20;
@Override
public DataType infer(Object value, Schema schema) {
return schema.isOptional()
@ -154,25 +158,38 @@ public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference,
} else {
precision = 0;
}
return DataTypes.TIMESTAMP(precision);
return DataTypes.TIMESTAMP_LTZ(precision);
}
return DataTypes.STRING();
}
protected DataType inferBytes(Object value, Schema schema) {
if (Decimal.LOGICAL_NAME.equals(schema.name())
|| VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
if (value instanceof BigDecimal) {
BigDecimal decimal = (BigDecimal) value;
return DataTypes.DECIMAL(decimal.precision(), decimal.scale());
if (Decimal.LOGICAL_NAME.equals(schema.name())) {
int scale =
Optional.ofNullable(schema.parameters().get(Decimal.SCALE_FIELD))
.map(Integer::parseInt)
.orElse(DecimalType.DEFAULT_SCALE);
int precision =
Optional.ofNullable(schema.parameters().get(PRECISION_PARAMETER_KEY))
.map(Integer::parseInt)
.orElse(DEFAULT_DECIMAL_PRECISION);
if (precision > DecimalType.MAX_PRECISION) {
return DataTypes.STRING();
}
return DataTypes.DECIMAL(DEFAULT_PRECISION, 0);
return DataTypes.DECIMAL(precision, scale);
}
return DataTypes.BYTES();
}
protected DataType inferStruct(Object value, Schema schema) {
Struct struct = (Struct) value;
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
SpecialValueDecimal decimal = VariableScaleDecimal.toLogical(struct);
BigDecimal bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
return DataTypes.DECIMAL(bigDecimal.precision(), bigDecimal.scale());
}
return DataTypes.ROW(
schema.fields().stream()
.map(

Loading…
Cancel
Save