[pipeline-connector][mysql] fix timestamp with timezone format (#2952)

* fix ts with tz parser

* test timestamp with default value

* fix related test

* use timestamp string in test cases
pull/2986/head
He Wang 1 year ago committed by GitHub
parent 557944488b
commit f3762a1b30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -45,6 +45,7 @@ import org.testcontainers.lifecycle.Startables;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
@ -118,6 +119,7 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(0));
Object[] expectedSnapshot =
@ -131,10 +133,11 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
// Because Flink's BinaryWriter force write int value for TIME(6).
// See BinaryWriter#write for detail.
64822123,
TimestampData.fromMillis(1595008822000L),
TimestampData.fromMillis(1595008822123L),
TimestampData.fromMillis(1595008822123L, 456000),
LocalZonedTimestampData.fromEpochMillis(1595008822000L, 0)
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
null
};
Object[] expectedStreamRecord =
@ -145,10 +148,11 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
64822000,
64822123,
null,
TimestampData.fromMillis(1595008822000L),
TimestampData.fromMillis(1595008822123L),
TimestampData.fromMillis(1595008822123L, 456000),
LocalZonedTimestampData.fromEpochMillis(1595008822000L, 0)
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))
};
testTimeDataTypes(
@ -170,7 +174,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6));
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.TIMESTAMP_LTZ(0));
Object[] expectedSnapshot =
new Object[] {
@ -183,13 +188,13 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
// Because Flink's BinaryWriter force write int value for TIME(6).
// See BinaryWriter#write for detail.
64822123,
TimestampData.fromMillis(1595008822000L),
TimestampData.fromMillis(1595008822123L),
TimestampData.fromMillis(1595008822123L, 456000),
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22Z")),
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22.123Z")),
LocalZonedTimestampData.fromInstant(
Instant.parse("2020-07-17T18:00:22.123456Z"))
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")),
null
};
Object[] expectedStreamRecord =
@ -200,13 +205,13 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
64822000,
64822123,
null,
TimestampData.fromMillis(1595008822000L),
TimestampData.fromMillis(1595008822123L),
TimestampData.fromMillis(1595008822123L, 456000),
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22Z")),
LocalZonedTimestampData.fromInstant(Instant.parse("2020-07-17T18:00:22.123Z")),
LocalZonedTimestampData.fromInstant(
Instant.parse("2020-07-17T18:00:22.123456Z"))
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123")),
TimestampData.fromTimestamp(Timestamp.valueOf("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123")),
LocalZonedTimestampData.fromInstant(toInstant("2020-07-17 18:00:22.123456")),
LocalZonedTimestampData.fromInstant(toInstant("2000-01-01 00:00:00"))
};
testTimeDataTypes(
@ -317,6 +322,10 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
assertThat(recordFields(streamRecord, COMMON_TYPES)).isEqualTo(expectedStreamRecord);
}
private Instant toInstant(String ts) {
return Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of("UTC")).toInstant();
}
private void testTimeDataTypes(
UniqueDatabase database,
RowType recordType,
@ -340,7 +349,8 @@ public class MySqlFullTypesITCase extends MySqlSourceTestBase {
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE time_types SET time_6_c = null WHERE id = 1;");
statement.execute(
"UPDATE time_types SET time_6_c = null, timestamp_def_c = default WHERE id = 1;");
}
List<Event> streamResults = fetchResults(iterator, 1);

@ -130,6 +130,7 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(0)
},
new String[] {
@ -142,7 +143,8 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
"datetime_c",
"datetime3_c",
"datetime6_c",
"timestamp_c"
"timestamp_c",
"timestamp_def_c"
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);
@ -176,7 +178,8 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP_LTZ(0),
DataTypes.TIMESTAMP_LTZ(3),
DataTypes.TIMESTAMP_LTZ(6)
DataTypes.TIMESTAMP_LTZ(6),
DataTypes.TIMESTAMP_LTZ(0)
},
new String[] {
"id",
@ -190,7 +193,8 @@ public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
"datetime6_c",
"timestamp_c",
"timestamp3_c",
"timestamp6_c"
"timestamp6_c",
"timestamp_def_c"
}))
.build();
assertThat(actualSchema).isEqualTo(expectedSchema);

@ -106,6 +106,7 @@ CREATE TABLE time_types
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP NULL,
timestamp_def_c TIMESTAMP NULL DEFAULT '2000-01-01 00:00:00',
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
@ -119,4 +120,5 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22',
'2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
'2020-07-17 18:00:22');
'2020-07-17 18:00:22',
NULL);

@ -108,6 +108,7 @@ CREATE TABLE time_types
timestamp_c TIMESTAMP(0),
timestamp3_c TIMESTAMP(3),
timestamp6_c TIMESTAMP(6),
timestamp_def_c TIMESTAMP NULL DEFAULT '2000-01-01 00:00:00',
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;
@ -123,4 +124,5 @@ VALUES (DEFAULT,
'2020-07-17 18:00:22.123456',
'2020-07-17 18:00:22',
'2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456');
'2020-07-17 18:00:22.123456',
NULL);

@ -142,7 +142,7 @@ public class DebeziumSchemaDataTypeInference implements SchemaDataTypeInference,
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
int nano =
Optional.ofNullable((String) value)
.map(Instant::parse)
.map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from))
.map(Instant::getNano)
.orElse(0);

Loading…
Cancel
Save