[FLINK-36560][pipeline-connector][paimon] Fix the issue that timestamp_ltz field is not correctly converted

This closes  #3717.
pull/3721/head
Kunni 3 months ago committed by GitHub
parent 8c2c17c3bc
commit 3025d9138a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -21,6 +21,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
@ -42,6 +43,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -84,7 +86,11 @@ public class DorisPipelineITCase extends DorisSinkTestBase {
DorisContainer.DORIS_DATABASE_NAME,
DorisContainer.DORIS_TABLE_NAME,
"id",
Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)"));
Arrays.asList(
"id INT NOT NULL",
"number DOUBLE",
"name VARCHAR(51)",
"birthday DATETIMEV2(6)"));
// waiting for table to be created
DORIS_CONTAINER.waitForLog(
@ -135,41 +141,76 @@ public class DorisPipelineITCase extends DorisSinkTestBase {
.column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.column(new PhysicalColumn("birthday", DataTypes.TIMESTAMP_LTZ(6), null))
.primaryKey("id")
.build();
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
RowType.of(
DataTypes.INT(),
DataTypes.DOUBLE(),
DataTypes.VARCHAR(17),
DataTypes.TIMESTAMP_LTZ(6)));
return Arrays.asList(
new CreateTableEvent(tableId, schema),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")})),
new Object[] {
17,
3.14,
BinaryStringData.fromString("Doris Day"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
19,
2.718,
BinaryStringData.fromString("Que Sera Sera"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
21, 1.732, BinaryStringData.fromString("Disenchanted")
21,
1.732,
BinaryStringData.fromString("Disenchanted"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})),
DataChangeEvent.updateEvent(
tableId,
generator.generate(
new Object[] {17, 3.14, BinaryStringData.fromString("Doris Day")}),
new Object[] {
17,
3.14,
BinaryStringData.fromString("Doris Day"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
}),
generator.generate(
new Object[] {17, 6.28, BinaryStringData.fromString("Doris Day")})),
new Object[] {
17,
6.28,
BinaryStringData.fromString("Doris Day"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})),
DataChangeEvent.deleteEvent(
tableId,
generator.generate(
new Object[] {
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
19,
2.718,
BinaryStringData.fromString("Que Sera Sera"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})));
}
@ -201,9 +242,12 @@ public class DorisPipelineITCase extends DorisSinkTestBase {
env.execute("Values to Doris Sink");
List<String> actual = fetchTableContent(tableId, 3);
List<String> actual = fetchTableContent(tableId, 4);
List<String> expected = Arrays.asList("17 | 6.28 | Doris Day", "21 | 1.732 | Disenchanted");
List<String> expected =
Arrays.asList(
"17 | 6.28 | Doris Day | 2023-01-01 00:00:00",
"21 | 1.732 | Disenchanted | 2023-01-01 00:00:00");
assertEqualsInAnyOrder(expected, actual);
}

@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeChecks;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@ -30,7 +31,6 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
@ -163,14 +163,11 @@ public class TableSchemaInfo {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
fieldGetter =
record ->
TimestampData.fromLocalDateTime(
ZonedDateTime.ofInstant(
record.getLocalZonedTimestampData(
fieldPos,
getPrecision(fieldType))
.toInstant(),
zoneId)
.toLocalDateTime());
TimestampData.fromInstant(
record.getLocalZonedTimestampData(
fieldPos,
DataTypeChecks.getPrecision(fieldType))
.toInstant());
break;
default:
throw new IllegalArgumentException(

@ -156,11 +156,10 @@ public class TableSchemaInfoTest {
Timestamp.valueOf("2023-01-01 00:00:00.000")),
org.apache.flink.table.data.TimestampData.fromTimestamp(
Timestamp.valueOf("2023-01-01 00:00:00")),
// plus 8 hours.
org.apache.flink.table.data.TimestampData.fromInstant(
Instant.parse("2023-01-01T08:00:00.000Z")),
Instant.parse("2023-01-01T00:00:00.000Z")),
org.apache.flink.table.data.TimestampData.fromInstant(
Instant.parse("2023-01-01T08:00:00.000Z")),
Instant.parse("2023-01-01T00:00:00.000Z")),
null),
tableSchemaInfo.getRowDataFromRecordData(recordData, false));
}

@ -32,7 +32,6 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.RowKind;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
@ -111,15 +110,11 @@ public class PaimonWriterHelper {
case TIMESTAMP_WITH_TIME_ZONE:
fieldGetter =
row ->
Timestamp.fromLocalDateTime(
ZonedDateTime.ofInstant(
row.getLocalZonedTimestampData(
fieldPos,
DataTypeChecks.getPrecision(
fieldType))
.toInstant(),
zoneId)
.toLocalDateTime());
Timestamp.fromInstant(
row.getLocalZonedTimestampData(
fieldPos,
DataTypeChecks.getPrecision(fieldType))
.toInstant());
break;
case ROW:
final int rowFieldCount = getFieldCount(fieldType);

@ -130,9 +130,8 @@ public class PaimonWriterHelperTest {
java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
Timestamp.fromSQLTimestamp(
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
// plus 8 hours.
Timestamp.fromInstant(Instant.parse("2023-01-01T08:00:00.000Z")),
Timestamp.fromInstant(Instant.parse("2023-01-01T08:00:00.000Z")),
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
Timestamp.fromInstant(Instant.parse("2023-01-01T00:00:00.000Z")),
null),
genericRow);
}

@ -21,6 +21,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
@ -42,6 +43,7 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
@ -71,7 +73,12 @@ public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
LOG.info("Database {} created.", StarRocksContainer.STARROCKS_DATABASE_NAME);
List<String> schema = Arrays.asList("id INT NOT NULL", "number DOUBLE", "name VARCHAR(51)");
List<String> schema =
Arrays.asList(
"id INT NOT NULL",
"number DOUBLE",
"name VARCHAR(51)",
"birthday DATETIME");
executeSql(
String.format(
@ -107,43 +114,76 @@ public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
.column(new PhysicalColumn("id", DataTypes.INT(), null))
.column(new PhysicalColumn("number", DataTypes.DOUBLE(), null))
.column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null))
.column(new PhysicalColumn("birthday", DataTypes.TIMESTAMP_LTZ(6), null))
.primaryKey("id")
.build();
BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(
RowType.of(DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.VARCHAR(17)));
RowType.of(
DataTypes.INT(),
DataTypes.DOUBLE(),
DataTypes.VARCHAR(17),
DataTypes.TIMESTAMP_LTZ(6)));
return Arrays.asList(
new CreateTableEvent(tableId, schema),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")})),
new Object[] {
17,
3.14,
BinaryStringData.fromString("StarRocks"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
19,
2.718,
BinaryStringData.fromString("Que Sera Sera"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})),
DataChangeEvent.insertEvent(
tableId,
generator.generate(
new Object[] {
21, 1.732, BinaryStringData.fromString("Disenchanted")
21,
1.732,
BinaryStringData.fromString("Disenchanted"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})),
DataChangeEvent.deleteEvent(
tableId,
generator.generate(
new Object[] {
19, 2.718, BinaryStringData.fromString("Que Sera Sera")
19,
2.718,
BinaryStringData.fromString("Que Sera Sera"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})),
DataChangeEvent.updateEvent(
tableId,
generator.generate(
new Object[] {17, 3.14, BinaryStringData.fromString("StarRocks")}),
new Object[] {
17,
3.14,
BinaryStringData.fromString("StarRocks"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
}),
generator.generate(
new Object[] {
17, 6.28, BinaryStringData.fromString("StarRocks")
17,
6.28,
BinaryStringData.fromString("StarRocks"),
LocalZonedTimestampData.fromInstant(
Instant.parse("2023-01-01T00:00:00.000Z"))
})));
}
@ -170,8 +210,11 @@ public class StarRocksPipelineITCase extends StarRocksSinkTestBase {
env.execute("Values to StarRocks Sink");
List<String> actual = fetchTableContent(tableId, 3);
List<String> expected = Arrays.asList("17 | 6.28 | StarRocks", "21 | 1.732 | Disenchanted");
List<String> actual = fetchTableContent(tableId, 4);
List<String> expected =
Arrays.asList(
"17 | 6.28 | StarRocks | 2023-01-01 00:00:00.0",
"21 | 1.732 | Disenchanted | 2023-01-01 00:00:00.0");
assertEqualsInAnyOrder(expected, actual);
}

Loading…
Cancel
Save