[FLINK-36351][pipeline-connector/doris] Support the conversion of Flink TIME type to Doris String type

This closes #3620
pull/3868/head
Petrichor 2 weeks ago committed by GitHub
parent 6b13868dd5
commit 177da16fac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -34,6 +34,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
@ -116,6 +117,8 @@ public class DorisRowConverter implements Serializable {
case TIMESTAMP_WITH_TIME_ZONE:
final int zonedP = ((ZonedTimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
case TIME_WITHOUT_TIME_ZONE:
return (index, val) -> LocalTime.ofNanoOfDay(val.getLong(index) * 1_000_000L);
case ARRAY:
return (index, val) -> convertArrayData(val.getArray(index), type);
case MAP:

@ -259,10 +259,14 @@ public class DorisMetadataApplierITCase extends DorisSinkTestBase {
.column(new PhysicalColumn("string", DataTypes.STRING(), "String"))
.column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal"))
.column(new PhysicalColumn("date", DataTypes.DATE(), "Date"))
// Doris sink doesn't support TIME type yet.
// .column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
// .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With
// Precision"))
// Doris sink doesn't support TIME type thus convert TIME to STRING
.column(new PhysicalColumn("time", DataTypes.TIME(), "Time"))
.column(
new PhysicalColumn(
"time_3", DataTypes.TIME(3), "Time With Precision"))
.column(
new PhysicalColumn(
"time_6", DataTypes.TIME(6), "Time With Precision"))
.column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp"))
.column(
new PhysicalColumn(
@ -321,6 +325,9 @@ public class DorisMetadataApplierITCase extends DorisSinkTestBase {
"string | TEXT | Yes | false | null",
"decimal | DECIMAL(17, 7) | Yes | false | null",
"date | DATE | Yes | false | null",
"time | TEXT | Yes | false | null",
"time_3 | TEXT | Yes | false | null",
"time_6 | TEXT | Yes | false | null",
"timestamp | DATETIME(6) | Yes | false | null",
"timestamp_3 | DATETIME(3) | Yes | false | null",
"timestamptz | DATETIME(6) | Yes | false | null",

@ -62,7 +62,10 @@ public class DorisRowConverterTest {
Column.physicalColumn("f19", DataTypes.TIMESTAMP()),
Column.physicalColumn("f20", DataTypes.TIMESTAMP_LTZ()),
Column.physicalColumn("f21", DataTypes.TIMESTAMP_LTZ()),
Column.physicalColumn("f22", DataTypes.TIMESTAMP_LTZ()));
Column.physicalColumn("f22", DataTypes.TIMESTAMP_LTZ()),
Column.physicalColumn("f23", DataTypes.TIME(0)),
Column.physicalColumn("f24", DataTypes.TIME(3)),
Column.physicalColumn("f24", DataTypes.TIME(6)));
List<DataType> dataTypes =
columns.stream().map(v -> v.getType()).collect(Collectors.toList());
@ -104,6 +107,9 @@ public class DorisRowConverterTest {
LocalZonedTimestampData.fromInstant(f20),
LocalZonedTimestampData.fromInstant(f21),
LocalZonedTimestampData.fromInstant(f22),
3661000,
3661123,
3661123
});
List row = new ArrayList();
for (int i = 0; i < recordData.getArity(); i++) {
@ -115,7 +121,7 @@ public class DorisRowConverterTest {
Assert.assertEquals(
"[true, 1.2, 1.2345, 1, 32, 64, 128, 2021-01-01 08:00:00.000000, 2021-01-01, a, doris, 2021-01-01 "
+ "08:01:11.000000, 2021-01-01 08:01:11.123000, 2021-01-01 08:01:11.123456, 2021-01-01 "
+ "16:01:11.000000, 2021-01-01 16:01:11.123000, 2021-01-01 16:01:11.123456]",
+ "16:01:11.000000, 2021-01-01 16:01:11.123000, 2021-01-01 16:01:11.123456, 01:01:01, 01:01:01.123, 01:01:01.123]",
row.toString());
}
}

Loading…
Cancel
Save