From a6eccf5789499166179e696dc7fba7b681f09415 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Fri, 27 Aug 2021 16:17:21 +0800 Subject: [PATCH] [mysql] Add tests to check mysql-cdc works well under various timezones (#367) Co-authored-by: Leonard Xu --- .../mysql/table/MysqlTimezoneITCase.java | 267 ++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java new file mode 100644 index 000000000..fdf7b524a --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MysqlTimezoneITCase.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import com.ververica.cdc.connectors.mysql.MySqlValidatorTest; +import com.ververica.cdc.connectors.mysql.source.utils.MySqlContainer; +import com.ververica.cdc.connectors.mysql.source.utils.UniqueDatabase; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.sql.Connection; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +/** Integration tests to check mysql-cdc works well under different MySQL server timezone. */ +@RunWith(Parameterized.class) +public class MysqlTimezoneITCase { + + private static final Logger LOG = LoggerFactory.getLogger(MysqlTimezoneITCase.class); + private static TemporaryFolder tempFolder; + private static File resourceFolder; + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, + EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()); + + private final boolean incrementalSnapshot; + + @Parameterized.Parameters(name = "incrementalSnapshot: {0}") + public static Object[] parameters() { + return new Object[][] {new Object[] {true} + // , + // new Object[] {false} + }; + } + + public MysqlTimezoneITCase(boolean incrementalSnapshot) { + this.incrementalSnapshot = incrementalSnapshot; + } + + @Before + public void setup() throws Exception { + resourceFolder = + Paths.get( + Objects.requireNonNull( + MySqlValidatorTest.class + .getClassLoader() + .getResource(".")) + .toURI()) + .toFile(); + tempFolder = new TemporaryFolder(resourceFolder); + tempFolder.create(); + if (incrementalSnapshot) { + env.setParallelism(4); + env.enableCheckpointing(200); + } else { + env.setParallelism(1); + } + } + + @Test + public void testMysqlServerInBerlin() throws Exception { + testTemporalTypesWithMySqlServerTimezone("Asia/Shanghai"); + } + + @Test + public void testMysqlServerInShanghai() throws Exception { + testTemporalTypesWithMySqlServerTimezone("Europe/Berlin"); + } + + private void testTemporalTypesWithMySqlServerTimezone(String timezone) throws Exception { + MySqlContainer mySqlContainer = + (MySqlContainer) + new MySqlContainer() + .withConfigurationOverride(buildMySqlConfigWithTimezone(timezone)) + .withSetupSQL("docker/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(mySqlContainer)).join(); + LOG.info("Containers are started."); + + UniqueDatabase fullTypesDatabase = + new UniqueDatabase(mySqlContainer, "column_type_test", "mysqluser", "mysqlpw"); + fullTypesDatabase.createAndInitialize(); + + String sourceDDL = + String.format( + "CREATE TABLE full_types (\n" + + " `id` INT NOT NULL,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c SMALLINT ,\n" + + " small_c SMALLINT,\n" + + " small_un_c INT,\n" + + " int_c INT ,\n" + + " int_un_c BIGINT,\n" + + " int11_c BIGINT,\n" + + " big_c BIGINT,\n" + + " varchar_c STRING,\n" + + " char_c STRING,\n" + + " float_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " numeric_c DECIMAL(6, 0),\n" + + " boolean_c BOOLEAN,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP(0),\n" + + " file_uuid BYTES,\n" + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'," + + " 'server-time-zone'='%s'" + + ")", + mySqlContainer.getHost(), + mySqlContainer.getDatabasePort(), + fullTypesDatabase.getUsername(), + fullTypesDatabase.getPassword(), + fullTypesDatabase.getDatabaseName(), + "full_types", + incrementalSnapshot, + getServerId(), + getSplitSize(), + timezone); + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "SELECT date_c, time_c, datetime3_c, datetime6_c, timestamp_c FROM full_types"); + + CloseableIterator iterator = result.collect(); + String[] expectedSnapshot = + new String[] { + "+I[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]", + }; + assertThat( + fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); + + try (Connection connection = fullTypesDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;"); + } + + String[] expectedBinlog = + new String[] { + "-U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:00:22]", + "+U[2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, 2020-07-17T18:00:22.123456, 2020-07-17T18:33:22]" + }; + + assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); + result.getJobClient().get().cancel().get(); + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + if (incrementalSnapshot) { + return serverId + "-" + (serverId + env.getParallelism()); + } + return String.valueOf(serverId); + } + + private int getSplitSize() { + if (incrementalSnapshot) { + // test parallel read + return 4; + } + return 0; + } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } + + private String buildMySqlConfigWithTimezone(String timezone) { + try { + File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID())); + Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf")); + String mysqldConf = + "[mysqld]\n" + + "binlog_format = row\n" + + "log_bin = mysql-bin\n" + + "server-id = 223344\n" + + "binlog_row_image = FULL\n"; + String timezoneConf = "default-time_zone = '" + timezone + "'\n"; + Files.write( + cnf, + Collections.singleton(mysqldConf + timezoneConf), + StandardCharsets.UTF_8, + StandardOpenOption.APPEND); + return Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString(); + } catch (Exception e) { + throw new RuntimeException("Failed to create my.cnf file.", e); + } + } +}