[FLINK-35173][cdc][mysql] Debezium custom time serializer for MySQL connector (#3240) (#3264)

pull/3274/head
ConradJam 10 months ago committed by GitHub
parent 34e1388a74
commit 90511b3a65
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.debezium.utils;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.chrono.IsoEra;
/** Convert And Check TimeBce Util. */
public class ConvertTimeBceUtil {
private static final Date ONE_CE = Date.valueOf("0001-01-01");
public static String resolveEra(boolean isBce, String value) {
String mangledValue = value;
if (isBce) {
if (mangledValue.startsWith("-")) {
mangledValue = mangledValue.substring(1);
}
if (!mangledValue.endsWith(" BC")) {
mangledValue += " BC";
}
}
return mangledValue;
}
public static boolean isBce(LocalDate date) {
return date.getEra() == IsoEra.BCE;
}
public static String resolveEra(LocalDate date, String value) {
return resolveEra(isBce(date), value);
}
public static String resolveEra(Date date, String value) {
return resolveEra(date.before(ONE_CE), value);
}
public static String resolveEra(Timestamp timestamp, String value) {
return resolveEra(timestamp.before(ONE_CE), value);
}
}

@ -0,0 +1,326 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.debezium.connector.mysql.converters;
import org.apache.flink.cdc.debezium.utils.ConvertTimeBceUtil;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.time.Conversions;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* Debezium converts the datetime type in MySQL into a UTC timestamp by default ({@link
* io.debezium.time.Timestamp} )The time zone is hard-coded and cannot be changed. causing
* conversion errors part of the time Enable this converter to convert the four times "DATE",
* "DATETIME", "TIME", and "TIMESTAMP" into the format corresponding to the configured time zone
* (for example, yyyy-MM-dd)
*
* @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
*/
public class MysqlDebeziumTimeConverter
implements CustomConverter<SchemaBuilder, RelationalColumn> {
private static final Logger log = LoggerFactory.getLogger(MysqlDebeziumTimeConverter.class);
private static boolean loggedUnknownTimestampClass = false;
private static boolean loggedUnknownDateClass = false;
private static boolean loggedUnknownTimeClass = false;
private static boolean loggedUnknownTimestampWithTimeZoneClass = false;
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMESTAMP"};
protected static final String DATE_FORMAT = "yyyy-MM-dd";
protected static final String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
protected static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
protected ZoneId zoneId;
protected static final String DEFAULT_DATE_FORMAT_PATTERN = "1970-01-01 00:00:00";
protected DateTimeFormatter dateFormatter;
protected DateTimeFormatter timeFormatter;
protected DateTimeFormatter datetimeFormatter;
protected DateTimeFormatter timestampFormatter;
protected String schemaNamePrefix;
protected static DateTimeFormatter originalFormat =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
protected Boolean parseNullDefaultValue = true;
@Override
public void configure(Properties properties) {
String dateFormat = properties.getProperty("format.date", DATE_FORMAT);
String timeFormat = properties.getProperty("format.time", TIME_FORMAT);
String datetimeFormat = properties.getProperty("format.datetime", DATETIME_FORMAT);
String timestampFormat = properties.getProperty("format.timestamp", DATETIME_FORMAT);
this.parseNullDefaultValue =
Boolean.parseBoolean(
properties.getProperty("format.default.value.convert", "true"));
String className = this.getClass().getName();
this.schemaNamePrefix = properties.getProperty("schema.name.prefix", className + ".mysql");
this.dateFormatter = DateTimeFormatter.ofPattern(dateFormat);
this.timeFormatter = DateTimeFormatter.ofPattern(timeFormat);
this.datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat);
this.timestampFormatter = DateTimeFormatter.ofPattern(timestampFormat);
this.zoneId =
ZoneId.of(
properties.getProperty(
"format.timezone", ZoneId.systemDefault().toString()));
}
@Override
public void converterFor(
final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
if (Arrays.stream(DATE_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerDateConverter(field, registration);
}
}
private void registerDateConverter(
final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
String columnType = field.typeName().toUpperCase();
String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();
registration.register(
SchemaBuilder.string().name(schemaName).optional(),
value -> {
log.debug(
"find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field "
+ "default:{}",
field.name(),
columnType,
value == null ? "null" : value,
field.hasDefaultValue() ? field.defaultValue() : "null");
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(
columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l =
Math.multiplyExact(
(Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
}
});
}
private Object convertToTimestampWithTimezone(String columnType, Object timestamp) {
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually
// mangles the value for ancient dates, because leap years weren't applied consistently in
// ye olden days. Additionally, toInstant() (and toLocalDateTime()) actually lose the era
// indicator,
// so we can't rely on their getEra() methods.
// So we have special handling for this case, which sidesteps the toInstant conversion.
if (timestamp instanceof Timestamp) {
Timestamp value = (Timestamp) timestamp;
ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof OffsetDateTime) {
OffsetDateTime value = (OffsetDateTime) timestamp;
return ConvertTimeBceUtil.resolveEra(
value.toLocalDate(), value.format(timestampFormatter));
} else if (timestamp instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp;
return ConvertTimeBceUtil.resolveEra(
zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof Instant) {
OffsetDateTime dateTime = OffsetDateTime.ofInstant((Instant) timestamp, zoneId);
ZonedDateTime timestampZt = ZonedDateTime.from(dateTime);
LocalDate localDate = timestampZt.toLocalDate();
return ConvertTimeBceUtil.resolveEra(localDate, timestampZt.format(timestampFormatter));
} else {
if (!loggedUnknownTimestampWithTimeZoneClass) {
printUnknownDateClassLogs(columnType, timestamp);
loggedUnknownTimestampWithTimeZoneClass = true;
}
// If init 1970-01-01T00:00:00Zd need to change
Instant instant = Instant.parse(timestamp.toString());
OffsetDateTime dateTime = OffsetDateTime.ofInstant(instant, zoneId);
ZonedDateTime timestampZt = ZonedDateTime.from(dateTime);
LocalDate localDate = timestampZt.toLocalDate();
return ConvertTimeBceUtil.resolveEra(localDate, timestampZt.format(timestampFormatter));
}
}
private Object convertToTimestamp(String columnType, Object timestamp) {
if (timestamp instanceof Timestamp) {
// Snapshot mode
LocalDateTime localDateTime = ((Timestamp) timestamp).toLocalDateTime();
return ConvertTimeBceUtil.resolveEra(
((Timestamp) timestamp), localDateTime.format(datetimeFormatter));
} else if (timestamp instanceof Instant) {
// Incremental mode
Instant time = (Instant) timestamp;
ZonedDateTime zonedDateTime = time.atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(
zonedDateTime.toLocalDate(),
time.atOffset(zonedDateTime.getOffset())
.toLocalDateTime()
.format(datetimeFormatter));
} else if (timestamp instanceof LocalDateTime) {
LocalDateTime dateTime = (LocalDateTime) timestamp;
LocalDate localDateTime = dateTime.toLocalDate();
return ConvertTimeBceUtil.resolveEra(localDateTime, dateTime.format(datetimeFormatter));
}
if (!loggedUnknownTimestampClass) {
printUnknownDateClassLogs(columnType, timestamp);
loggedUnknownTimestampClass = true;
}
LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString());
LocalDate localDate = localDateTime.toLocalDate();
return ConvertTimeBceUtil.resolveEra(localDate, localDateTime.format(datetimeFormatter));
}
private Object convertToTime(String columnType, Object time) {
if (time instanceof Time) {
return formatTime(((Time) time).toLocalTime());
} else if (time instanceof LocalTime) {
return formatTime((LocalTime) time);
} else if (time instanceof java.time.Duration) {
long value = ((java.time.Duration) time).toNanos();
if (value >= 0 && value < TimeUnit.DAYS.toNanos(1)) {
return formatTime(LocalTime.ofNanoOfDay(value));
} else {
long updatedValue = Math.min(Math.abs(value), LocalTime.MAX.toNanoOfDay());
log.debug(
"Time values must use number of nanoseconds greater than 0 and less than 86400000000000 but its {}, "
+ "converting to {} ",
value,
updatedValue);
return formatTime(LocalTime.ofNanoOfDay(updatedValue));
}
} else {
if (!loggedUnknownTimeClass) {
printUnknownDateClassLogs(columnType, time);
loggedUnknownTimeClass = true;
}
String valueAsString = time.toString();
if (valueAsString.startsWith("24")) {
log.debug("Time value {} is above range, converting to 23:59:59", valueAsString);
return LocalTime.MAX.toString();
}
return formatTime(LocalTime.parse(valueAsString));
}
}
private String formatTime(LocalTime localTime) {
return localTime.format(timeFormatter);
}
private int getTimePrecision(final RelationalColumn field) {
return field.length().orElse(-1);
}
private String convertToDate(String columnType, Object date) {
if (date instanceof Date) {
// Snapshot mode
LocalDate localDate = ((Date) date).toLocalDate();
return ConvertTimeBceUtil.resolveEra(localDate, localDate.format(dateFormatter));
} else if (date instanceof LocalDate) {
// Incremental mode
return dateFormatter.format((LocalDate) date);
} else if (date instanceof LocalDateTime) {
return datetimeFormatter.format((LocalDateTime) date);
} else if (date instanceof Integer) {
return LocalDate.ofEpochDay(((Integer) date).longValue()).format(dateFormatter);
} else {
if (!loggedUnknownDateClass) {
printUnknownDateClassLogs(columnType, date);
loggedUnknownDateClass = true;
}
LocalDate localDate = LocalDate.parse(date.toString());
return ConvertTimeBceUtil.resolveEra(localDate, localDate.format(dateFormatter));
}
}
public Object convertDateDefaultValue(RelationalColumn field) {
if (field.isOptional()) {
return null;
} else if (field.hasDefaultValue()) {
// There is an extreme case where the field defaultValue is 0, resulting in a Kafka
// Schema mismatch
if (parseNullDefaultValue) {
LocalDateTime dateTime =
LocalDateTime.parse(DEFAULT_DATE_FORMAT_PATTERN, originalFormat);
String columnType = field.typeName().toUpperCase();
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
return dateTime.format(dateFormatter);
case "DATETIME":
return dateTime.format(datetimeFormatter);
case "TIME":
return dateTime.format(timeFormatter);
case "TIMESTAMP":
return dateTime.format(timestampFormatter);
}
}
}
return null;
}
private static void printUnknownDateClassLogs(String type, Object value) {
log.warn(
"MySql Date Convert Database type : {} Unknown class for Date data type {}",
type,
value.getClass());
}
}

@ -0,0 +1,298 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.mysql.debezium.converters;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.mysql.MySqlValidatorTest;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.connector.mysql.converters.MysqlDebeziumTimeConverter;
import io.debezium.data.Envelope;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Stream;
import static java.lang.String.format;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder;
/** Test for {@link MysqlDebeziumTimeConverter}. */
public class MysqlDebeziumTimeConverterITCase {
private static TemporaryFolder tempFolder;
private static File resourceFolder;
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private static final Logger LOG =
LoggerFactory.getLogger(MysqlDebeziumTimeConverterITCase.class);
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
@Before
public void setup() throws Exception {
resourceFolder =
Paths.get(
Objects.requireNonNull(
MySqlValidatorTest.class
.getClassLoader()
.getResource("."))
.toURI())
.toFile();
tempFolder = new TemporaryFolder(resourceFolder);
tempFolder.create();
env.setParallelism(1);
}
@Test
public void testReadDateConvertDataStreamInJvmTime() throws Exception {
testReadDateConvertDataStreamSource(ZoneId.systemDefault().toString());
}
@Test
public void testReadDateConvertDataStreamInAsia() throws Exception {
testReadDateConvertDataStreamSource("Asia/Shanghai");
}
@Test
public void testReadDateConvertDataStreamInBerlin() throws Exception {
testReadDateConvertDataStreamSource("Europe/Berlin");
}
@Test
public void testReadDateConvertSQLSourceInAsia() throws Exception {
testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai");
}
@Test
public void testReadDateConvertSQLSourceInBerlin() throws Exception {
testTemporalTypesWithMySqlServerTimezone("Europe/Berlin");
}
private void testReadDateConvertDataStreamSource(String timezone) throws Exception {
MySqlContainer mySqlContainer = createMySqlContainer(timezone);
startContainers(mySqlContainer, timezone);
UniqueDatabase db = getUniqueDatabase(mySqlContainer);
db.createAndInitialize();
env.enableCheckpointing(1000L);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MySqlSourceBuilder<String> builder =
MySqlSource.<String>builder()
.hostname(db.getHost())
.port(db.getDatabasePort())
.databaseList(db.getDatabaseName())
.tableList(db.getDatabaseName() + ".date_convert_test")
.startupOptions(StartupOptions.initial())
.serverTimeZone(timezone)
.username(db.getUsername())
.password(db.getPassword())
.debeziumProperties(getDebeziumConfigurations(timezone));
builder.deserializer(new JsonDebeziumDeserializationSchema());
DataStreamSource<String> convertDataStreamSource =
env.fromSource(
builder.build(),
WatermarkStrategy.noWatermarks(),
"testDataStreamSourceConvertData");
List<String> result = convertDataStreamSource.executeAndCollect(3);
validTimestampValue(result);
}
private void validTimestampValue(List<String> result) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00"};
for (String after : result) {
JsonNode jsonNode = mapper.readTree(after);
Assert.assertEquals(
timestampValues[jsonNode.get(Envelope.FieldName.AFTER).get("id").asInt() - 1],
jsonNode.get("after").get("test_timestamp").asText());
}
}
private void testTemporalTypesWithMySqlServerTimezone(String timezone) throws Exception {
MySqlContainer mySqlContainer = createMySqlContainer(timezone);
startContainers(mySqlContainer, timezone);
UniqueDatabase db = getUniqueDatabase(mySqlContainer);
db.createAndInitialize();
env.enableCheckpointing(1000L);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sourceDDL =
format(
"CREATE TABLE customers ("
+ " id BIGINT NOT NULL,"
+ " test_timestamp STRING,"
+ " test_datetime STRING,"
+ " test_date STRING,"
+ " test_time STRING, "
+ "primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = 'date_convert_test',"
+ " 'scan.startup.mode' = '%s',"
+ " 'server-time-zone' = '%s',"
+ " 'debezium.converters' = 'datetime',"
+ " 'debezium.datetime.type' = '%s',"
+ " 'debezium.database.connectionTimeZone' = '%s',"
+ " 'debezium.datetime.format.time' = 'HH:mm:ss',"
+ " 'debezium.datetime.format.timezone' = '%s',"
+ " 'debezium.datetime.format.timestamp' = 'HH:mm:ss',"
+ " 'debezium.datetime.format.default.value.convert' = 'true'"
+ ")",
mySqlContainer.getHost(),
mySqlContainer.getDatabasePort(),
db.getUsername(),
db.getPassword(),
db.getDatabaseName(),
"initial",
timezone,
MysqlDebeziumTimeConverter.class.getName(),
timezone,
timezone);
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
checkData(tableResult);
}
private Properties getDebeziumConfigurations(String timezone) {
Properties debeziumProperties = new Properties();
// set properties
debeziumProperties.setProperty("converters", "datetime");
debeziumProperties.setProperty("datetime.type", MysqlDebeziumTimeConverter.class.getName());
debeziumProperties.setProperty("datetime.format.timestamp", "HH:mm:ss");
debeziumProperties.setProperty("datetime.format.default.value.convert", "false");
// If not set time convert maybe error
debeziumProperties.setProperty("database.connectionTimeZone", timezone);
debeziumProperties.setProperty("datetime.format.timezone", timezone);
LOG.info("Supplied debezium properties: {}", debeziumProperties);
return debeziumProperties;
}
private void checkData(TableResult tableResult) {
String[] snapshotForSingleTable =
new String[] {
"+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]",
"+I[3, 00:00:00, null, null, 00:01:20]",
"+I[2, 00:00:00, null, null, 00:00:00]"
};
List<String> expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable));
CloseableIterator<Row> collect = tableResult.collect();
tableResult.getJobClient().get().getJobID();
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(collect, expectedSnapshotData.size()));
}
private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
Row row = iter.next();
rows.add(row.toString());
size--;
}
return rows;
}
protected MySqlContainer createMySqlContainer(String timezone) {
return (MySqlContainer)
new MySqlContainer(MySqlVersion.V5_7)
.withConfigurationOverride(buildMySqlConfigWithTimezone(timezone))
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
}
protected void startContainers(MySqlContainer mySqlContainer, String timezone) {
LOG.info("Starting containers with timezone {} ...", timezone);
Startables.deepStart(Stream.of(mySqlContainer)).join();
LOG.info("Containers are started.");
LOG.info("JVM System Clock Zone Id : {}", ZoneId.systemDefault());
}
protected UniqueDatabase getUniqueDatabase(MySqlContainer mySqlContainer) {
return new UniqueDatabase(mySqlContainer, "date_convert_test", "mysqluser", "mysqlpw");
}
private String buildMySqlConfigWithTimezone(String timezone) {
try {
File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));
Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"));
String mysqldConf =
"[mysqld]\n"
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
+ "binlog_row_image = FULL\n";
String timezoneConf = "default-time_zone = '" + timezone + "'\n";
Files.write(
cnf,
Collections.singleton(mysqldConf + timezoneConf),
StandardCharsets.UTF_8,
StandardOpenOption.APPEND);
return Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
} catch (Exception e) {
throw new RuntimeException("Failed to create my.cnf file.", e);
}
}
}

@ -0,0 +1,36 @@
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the `License`); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an `AS IS` BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: column_type_test
-- ----------------------------------------------------------------------------------------------------------------
-- datetime NOT NULL
CREATE TABLE date_convert_test
(
`id` bigint NOT NULL AUTO_INCREMENT,
`test_timestamp` timestamp NULL,
`test_datetime` datetime NULL,
`test_date` date DEFAULT NULL,
`test_time` time DEFAULT NULL,
PRIMARY KEY (`id`)
) DEFAULT CHARSET=utf8;
INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, test_time)
VALUES
(1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'),
(2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'),
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120);
Loading…
Cancel
Save