diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java index a2d91bbd8..0b02ad7e1 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java @@ -26,12 +26,6 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.StringData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.ZonedTimestampData; -import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.common.types.DataTypeRoot; -import org.apache.flink.cdc.common.types.DecimalType; -import org.apache.flink.cdc.common.types.LocalZonedTimestampType; -import org.apache.flink.cdc.common.types.TimestampType; -import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; @@ -72,45 +66,6 @@ public final class BinaryRecordData extends BinarySection implements RecordData, return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8; } - public static int calculateFixPartSizeInBytes(int arity) { - return calculateBitSetWidthInBytes(arity) + 8 * arity; - } - - /** - * If it is a fixed-length field, we can call this BinaryRecordData's setXX method for in-place - * updates. If it is variable-length field, can't use this method, because the underlying data - * is stored continuously. - */ - public static boolean isInFixedLengthPart(DataType type) { - switch (type.getTypeRoot()) { - case BOOLEAN: - case TINYINT: - case SMALLINT: - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - case BIGINT: - case FLOAT: - case DOUBLE: - return true; - case DECIMAL: - return DecimalData.isCompact(((DecimalType) type).getPrecision()); - case TIMESTAMP_WITHOUT_TIME_ZONE: - return TimestampData.isCompact(((TimestampType) type).getPrecision()); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return LocalZonedTimestampData.isCompact( - ((LocalZonedTimestampType) type).getPrecision()); - case TIMESTAMP_WITH_TIME_ZONE: - return ZonedTimestampData.isCompact(((ZonedTimestampType) type).getPrecision()); - default: - return false; - } - } - - public static boolean isMutable(DataType type) { - return isInFixedLengthPart(type) || type.getTypeRoot() == DataTypeRoot.DECIMAL; - } - private final int arity; private final int nullBitsSizeInBytes; @@ -213,10 +168,6 @@ public final class BinaryRecordData extends BinarySection implements RecordData, public TimestampData getTimestamp(int pos, int precision) { assertIndexIsValid(pos); - if (TimestampData.isCompact(precision)) { - return TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos))); - } - int fieldOffset = getFieldOffset(pos); final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset); return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli); @@ -233,11 +184,6 @@ public final class BinaryRecordData extends BinarySection implements RecordData, public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision) { assertIndexIsValid(pos); - if (LocalZonedTimestampData.isCompact(precision)) { - return LocalZonedTimestampData.fromEpochMillis( - segments[0].getLong(getFieldOffset(pos))); - } - int fieldOffset = getFieldOffset(pos); final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset); return BinarySegmentUtils.readLocalZonedTimestampData( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java index 41613a86e..91351dabf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java @@ -223,6 +223,95 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase { fullTypesMySql8Database, recordType, expectedSnapshot, expectedStreamRecord); } + @Test + public void testMysql57PrecisionTypes() throws Throwable { + testMysqlPrecisionTypes(fullTypesMySql57Database); + } + + @Test + public void testMysql8PrecisionTypes() throws Throwable { + testMysqlPrecisionTypes(fullTypesMySql8Database); + } + + public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable { + RowType recordType = + RowType.of( + DataTypes.DECIMAL(20, 0).notNull(), + DataTypes.DECIMAL(6, 2), + DataTypes.DECIMAL(9, 4), + DataTypes.DECIMAL(20, 4), + DataTypes.TIME(0), + DataTypes.TIME(3), + DataTypes.TIME(6), + DataTypes.TIMESTAMP(0), + DataTypes.TIMESTAMP(3), + DataTypes.TIMESTAMP(6), + DataTypes.TIMESTAMP_LTZ(0), + DataTypes.TIMESTAMP_LTZ(3), + DataTypes.TIMESTAMP_LTZ(6), + DataTypes.TIMESTAMP_LTZ(0)); + + Object[] expectedSnapshot = + new Object[] { + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), + DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2), + DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4), + DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20, 4), + 64800000, + 64822100, + 64822100, + TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:00")), + TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), + TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")), + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")) + }; + + Object[] expectedStreamRecord = + new Object[] { + DecimalData.fromBigDecimal(new BigDecimal("1"), 20, 0), + DecimalData.fromBigDecimal(new BigDecimal("123.4"), 6, 2), + DecimalData.fromBigDecimal(new BigDecimal("1234.5"), 9, 4), + DecimalData.fromBigDecimal(new BigDecimal("1234.56"), 20, 4), + 64800000, + 64822100, + null, + TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:00")), + TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), + TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")), + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:00")), + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")), + LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")) + }; + + database.createAndInitialize(); + CloseableIterator iterator = + env.fromSource( + getFlinkSourceProvider(new String[] {"precision_types"}, database) + .getSource(), + WatermarkStrategy.noWatermarks(), + "Event-Source") + .executeAndCollect(); + + // skip CreateTableEvent + List snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2); + RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after(); + + Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType)) + .isEqualTo(expectedSnapshot); + + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("UPDATE precision_types SET time_6_c = null WHERE id = 1;"); + } + + List streamResults = MySqSourceTestUtils.fetchResults(iterator, 1); + RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after(); + Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType)) + .isEqualTo(expectedStreamRecord); + } + private void testCommonDataTypes(UniqueDatabase database) throws Exception { database.createAndInitialize(); CloseableIterator iterator = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java index 4b45e1079..3d3f0276b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMetadataAccessorITCase.java @@ -210,7 +210,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase { private void testAccessDatabaseAndTable(UniqueDatabase database) { database.createAndInitialize(); - String[] tables = new String[] {"common_types", "time_types"}; + String[] tables = new String[] {"common_types", "time_types", "precision_types"}; MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database); assertThatThrownBy(metadataAccessor::listNamespaces) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql index a83e8710e..08d25a714 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test.sql @@ -123,4 +123,37 @@ VALUES (DEFAULT, '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', - NULL); \ No newline at end of file + NULL); + +CREATE TABLE precision_types +( + id SERIAL, + decimal_c0 DECIMAL(6, 2), + decimal_c1 DECIMAL(9, 4), + decimal_c2 DECIMAL(20, 4), + time_c TIME(0), + time_3_c TIME(3), + time_6_c TIME(6), + datetime_c DATETIME(0), + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), + timestamp_c TIMESTAMP(0) NULL, + timestamp3_c TIMESTAMP(3) NULL, + timestamp6_c TIMESTAMP(6) NULL, + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; + +INSERT INTO precision_types +VALUES (DEFAULT, + 123.4, + 1234.5, + 1234.56, + '18:00', + '18:00:22.1', + '18:00:22.1', + '2020-07-17 18:00', + '2020-07-17 18:00:22', + '2020-07-17 18:00:22', + '2020-07-17 18:00', + '2020-07-17 18:00:22', + '2020-07-17 18:00:22'); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql index 66b75bfef..54c6c7170 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/column_type_test_mysql8.sql @@ -127,4 +127,37 @@ VALUES (DEFAULT, '2020-07-17 18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', - NULL); \ No newline at end of file + NULL); + +CREATE TABLE precision_types +( + id SERIAL, + decimal_c0 DECIMAL(6, 2), + decimal_c1 DECIMAL(9, 4), + decimal_c2 DECIMAL(20, 4), + time_c TIME(0), + time_3_c TIME(3), + time_6_c TIME(6), + datetime_c DATETIME(0), + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), + timestamp_c TIMESTAMP(0), + timestamp3_c TIMESTAMP(3), + timestamp6_c TIMESTAMP(6), + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; + +INSERT INTO precision_types +VALUES (DEFAULT, + 123.4, + 1234.5, + 1234.56, + '18:00', + '18:00:22.1', + '18:00:22.1', + '2020-07-17 18:00', + '2020-07-17 18:00:22', + '2020-07-17 18:00:22', + '2020-07-17 18:00', + '2020-07-17 18:00:22', + '2020-07-17 18:00:22'); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java index 1422935ea..44636fd7e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java @@ -173,46 +173,38 @@ abstract class AbstractBinaryWriter implements BinaryWriter { @Override public void writeTimestamp(int pos, TimestampData value, int precision) { - if (TimestampData.isCompact(precision)) { - writeLong(pos, value.getMillisecond()); - } else { - // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond - ensureCapacity(8); + // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond + ensureCapacity(8); - if (value == null) { - setNullBit(pos); - // zero-out the bytes - segment.putLong(cursor, 0L); - setOffsetAndSize(pos, cursor, 0); - } else { - segment.putLong(cursor, value.getMillisecond()); - setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond()); - } - - cursor += 8; + if (value == null) { + setNullBit(pos); + // zero-out the bytes + segment.putLong(cursor, 0L); + setOffsetAndSize(pos, cursor, 0); + } else { + segment.putLong(cursor, value.getMillisecond()); + setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond()); } + + cursor += 8; } @Override public void writeLocalZonedTimestamp(int pos, LocalZonedTimestampData value, int precision) { - if (LocalZonedTimestampData.isCompact(precision)) { - writeLong(pos, value.getEpochMillisecond()); - } else { - // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond - ensureCapacity(8); + // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond + ensureCapacity(8); - if (value == null) { - setNullBit(pos); - // zero-out the bytes - segment.putLong(cursor, 0L); - setOffsetAndSize(pos, cursor, 0); - } else { - segment.putLong(cursor, value.getEpochMillisecond()); - setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond()); - } - - cursor += 8; + if (value == null) { + setNullBit(pos); + // zero-out the bytes + segment.putLong(cursor, 0L); + setOffsetAndSize(pos, cursor, 0); + } else { + segment.putLong(cursor, value.getEpochMillisecond()); + setOffsetAndSize(pos, cursor, value.getEpochNanoOfMillisecond()); } + + cursor += 8; } @Override