[FLINK-34908][pipeline-connector][doris] Fix MySQL to doris pipeline will lose precision for timestamp type

This closes #3407.
pull/3409/head
Xin Gong 8 months ago committed by GitHub
parent 050c28649c
commit 1112987572
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -56,7 +56,7 @@ public class DorisEventSerializer implements DorisRecordSerializer<Event> {
/** Format timestamp-related type data. */
public static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
/** ZoneId from pipeline config to support timestamp with local time zone. */
public final ZoneId pipelineZoneId;

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.doris.sink;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
@ -29,6 +30,7 @@ import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.junit.Assert;
import org.junit.Test;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
@ -54,13 +56,32 @@ public class DorisRowConverterTest {
Column.physicalColumn("f12", DataTypes.TIMESTAMP()),
Column.physicalColumn("f14", DataTypes.DATE()),
Column.physicalColumn("f15", DataTypes.CHAR(1)),
Column.physicalColumn("f16", DataTypes.VARCHAR(256)));
Column.physicalColumn("f16", DataTypes.VARCHAR(256)),
Column.physicalColumn("f17", DataTypes.TIMESTAMP()),
Column.physicalColumn("f18", DataTypes.TIMESTAMP()),
Column.physicalColumn("f19", DataTypes.TIMESTAMP()),
Column.physicalColumn("f20", DataTypes.TIMESTAMP_LTZ()),
Column.physicalColumn("f21", DataTypes.TIMESTAMP_LTZ()),
Column.physicalColumn("f22", DataTypes.TIMESTAMP_LTZ()));
List<DataType> dataTypes =
columns.stream().map(v -> v.getType()).collect(Collectors.toList());
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
LocalDateTime time1 =
LocalDateTime.ofInstant(Instant.parse("2021-01-01T08:00:00Z"), ZoneId.of("Z"));
LocalDate date1 = LocalDate.of(2021, 1, 1);
LocalDateTime f17 =
LocalDateTime.ofInstant(Instant.parse("2021-01-01T08:01:11Z"), ZoneId.of("Z"));
LocalDateTime f18 =
LocalDateTime.ofInstant(Instant.parse("2021-01-01T08:01:11.123Z"), ZoneId.of("Z"));
LocalDateTime f19 =
LocalDateTime.ofInstant(
Instant.parse("2021-01-01T08:01:11.123456Z"), ZoneId.of("Z"));
Instant f20 = Instant.parse("2021-01-01T08:01:11Z");
Instant f21 = Instant.parse("2021-01-01T08:01:11.123Z");
Instant f22 = Instant.parse("2021-01-01T08:01:11.123456Z");
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(dataTypes.toArray(new DataType[] {})));
BinaryRecordData recordData =
@ -76,17 +97,25 @@ public class DorisRowConverterTest {
TimestampData.fromLocalDateTime(time1),
(int) date1.toEpochDay(),
BinaryStringData.fromString("a"),
BinaryStringData.fromString("doris")
BinaryStringData.fromString("doris"),
TimestampData.fromLocalDateTime(f17),
TimestampData.fromLocalDateTime(f18),
TimestampData.fromLocalDateTime(f19),
LocalZonedTimestampData.fromInstant(f20),
LocalZonedTimestampData.fromInstant(f21),
LocalZonedTimestampData.fromInstant(f22),
});
List row = new ArrayList();
for (int i = 0; i < recordData.getArity(); i++) {
DorisRowConverter.SerializationConverter converter =
DorisRowConverter.createNullableExternalConverter(
columns.get(i).getType(), ZoneId.systemDefault());
columns.get(i).getType(), ZoneId.of("GMT+08:00"));
row.add(converter.serialize(i, recordData));
}
Assert.assertEquals(
"[true, 1.2, 1.2345, 1, 32, 64, 128, 2021-01-01 08:00:00, 2021-01-01, a, doris]",
"[true, 1.2, 1.2345, 1, 32, 64, 128, 2021-01-01 08:00:00.000000, 2021-01-01, a, doris, 2021-01-01 "
+ "08:01:11.000000, 2021-01-01 08:01:11.123000, 2021-01-01 08:01:11.123456, 2021-01-01 "
+ "16:01:11.000000, 2021-01-01 16:01:11.123000, 2021-01-01 16:01:11.123456]",
row.toString());
}
}

Loading…
Cancel
Save