[FLINK-35592][cdc] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp (#3380)

pull/3423/head
ConradJam 8 months ago committed by GitHub
parent 7287eaceca
commit f476a5b1dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -113,54 +113,73 @@ public class MysqlDebeziumTimeConverter
registration.register(
SchemaBuilder.string().name(schemaName).optional(),
value -> {
log.debug(
"find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field "
+ "default:{}",
field.name(),
columnType,
value == null ? "null" : value,
field.hasDefaultValue() ? field.defaultValue() : "null");
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(
columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l =
Math.multiplyExact(
(Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
try {
return convertDateObject(field, value, columnType);
} catch (Exception e) {
printConvertDateErrorClassLogs(field, registration, value);
throw new RuntimeException("MysqlDebeziumConverter error", e);
}
});
}
private void printConvertDateErrorClassLogs(
RelationalColumn field,
ConverterRegistration<SchemaBuilder> registration,
Object value) {
boolean useDefaultValueConvert = (value == null);
String fieldName = field.name();
String fieldType = field.typeName().toUpperCase();
String defaultValue = "null";
if (field.hasDefaultValue()) {
if (field.defaultValue() != null) {
defaultValue = field.defaultValue().toString();
}
}
log.warn(
"find schema need to change dateType, field name:||{}|| field type:||{}|| is use default "
+ "convert:||{}|| field default value:||{}|| field charge value fail",
fieldName,
fieldType,
useDefaultValueConvert,
defaultValue);
}
private Object convertDateObject(RelationalColumn field, Object value, String columnType) {
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l = Math.multiplyExact((Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
}
}
private Object convertToTimestampWithTimezone(String columnType, Object timestamp) {
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually
@ -174,11 +193,12 @@ public class MysqlDebeziumTimeConverter
ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof OffsetDateTime) {
OffsetDateTime value = (OffsetDateTime) timestamp;
OffsetDateTime value =
((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime();
return ConvertTimeBceUtil.resolveEra(
value.toLocalDate(), value.format(timestampFormatter));
} else if (timestamp instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp;
ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(
zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof Instant) {

@ -154,7 +154,7 @@ public class MysqlDebeziumTimeConverterITCase {
private void validTimestampValue(List<String> result) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00"};
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00", "15:04:00"};
for (String after : result) {
JsonNode jsonNode = mapper.readTree(after);
Assert.assertEquals(
@ -232,7 +232,8 @@ public class MysqlDebeziumTimeConverterITCase {
new String[] {
"+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]",
"+I[3, 00:00:00, null, null, 00:01:20]",
"+I[2, 00:00:00, null, null, 00:00:00]"
"+I[2, 00:00:00, null, null, 00:00:00]",
"+I[4, 15:04:00, null, null, 00:01:10]"
};
List<String> expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable));
@ -283,7 +284,8 @@ public class MysqlDebeziumTimeConverterITCase {
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
+ "binlog_row_image = FULL\n";
+ "binlog_row_image = FULL\n"
+ "sql_mode = ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION\n";
String timezoneConf = "default-time_zone = '" + timezone + "'\n";
Files.write(
cnf,

@ -33,4 +33,5 @@ INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, test
VALUES
(1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'),
(2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'),
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120);
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120),
(4,20240612150400, DEFAULT, NULL ,110);
Loading…
Cancel
Save