diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 40c774ffc..636ed975f 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -268,6 +268,13 @@ pipeline: 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。 + + scan.newly-added-table.enabled + optional + false + Boolean + 是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。 + diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 41eccf939..879701614 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -275,6 +275,13 @@ pipeline: so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true' + + scan.newly-added-table.enabled + optional + false + Boolean + Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint. + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index b1d3e5966..e8c39ce3d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -61,6 +61,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; @@ -123,6 +124,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { Duration connectTimeout = config.get(CONNECT_TIMEOUT); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); + boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -158,7 +160,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory { .closeIdleReaders(closeIdleReaders) .includeSchemaChanges(includeSchemaChanges) .debeziumProperties(getDebeziumProperties(configMap)) - .jdbcProperties(getJdbcProperties(configMap)); + .jdbcProperties(getJdbcProperties(configMap)) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); List capturedTables = getTableList(configFactory.createConfig(0), selectors); @@ -216,7 +219,7 @@ public class MySqlDataSourceFactory implements DataSourceFactory { options.add(CONNECTION_POOL_SIZE); options.add(HEARTBEAT_INTERVAL); options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); - + options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); options.add(CHUNK_META_GROUP_SIZE); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index e852eb3d7..f6f1a671a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -224,6 +224,14 @@ public class MySqlDataSourceOptions { + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + "greater than or equal to 1.14 when enabling this feature."); + @Experimental + public static final ConfigOption SCAN_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.newly-added-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to scan the newly added tables or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint."); + @Experimental public static final ConfigOption SCHEMA_CHANGE_ENABLED = ConfigOptions.key("schema-change.enabled") diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java new file mode 100644 index 000000000..7187e6447 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.factories.FactoryHelper; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.RowType; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer; +import org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.util.ExceptionUtils; + +import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.jdbc.JdbcConnection; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.getServerId; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT tests to cover various newly added tables during capture process in pipeline mode. */ +public class MysqlPipelineNewlyAddedTableITCase extends MySqlSourceTestBase { + private final UniqueDatabase customDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + + private final ScheduledExecutorService mockBinlogExecutor = Executors.newScheduledThreadPool(1); + + @Before + public void before() throws SQLException { + TestValuesTableFactory.clearAllData(); + customDatabase.createAndInitialize(); + + try (MySqlConnection connection = getConnection()) { + connection.setAutoCommit(false); + // prepare initial data for given table + String tableId = customDatabase.getDatabaseName() + ".produce_binlog_table"; + connection.execute( + format("CREATE TABLE %s ( id BIGINT PRIMARY KEY, cnt BIGINT);", tableId)); + connection.execute( + format("INSERT INTO %s VALUES (0, 100), (1, 101), (2, 102);", tableId)); + connection.commit(); + + // mock continuous binlog during the newly added table capturing process + mockBinlogExecutor.schedule( + () -> { + try { + connection.execute( + format("UPDATE %s SET cnt = cnt +1 WHERE id < 2;", tableId)); + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + }, + 500, + TimeUnit.MICROSECONDS); + } + } + + @After + public void after() { + mockBinlogExecutor.shutdown(); + } + + private MySqlConnection getConnection() { + Map properties = new HashMap<>(); + properties.put("database.hostname", MYSQL_CONTAINER.getHost()); + properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + properties.put("database.user", customDatabase.getUsername()); + properties.put("database.password", customDatabase.getPassword()); + properties.put("database.serverTimezone", ZoneId.of("UTC").toString()); + io.debezium.config.Configuration configuration = + io.debezium.config.Configuration.from(properties); + return DebeziumUtils.createMySqlConnection(configuration, new Properties()); + } + + @Test + public void testAddNewTableOneByOneSingleParallelism() throws Exception { + TestParam testParam = + TestParam.newBuilder( + Collections.singletonList("address_hangzhou"), + 4, + Arrays.asList("address_hangzhou", "address_beijing"), + 4) + .setFirstRoundInitTables( + Arrays.asList("address_hangzhou", "address_beijing")) + .build(); + + testAddNewTable(testParam, 1); + } + + @Test + public void testAddNewTableOneByOne() throws Exception { + TestParam testParam = + TestParam.newBuilder( + Collections.singletonList("address_hangzhou"), + 4, + Arrays.asList("address_hangzhou", "address_beijing"), + 4) + .setFirstRoundInitTables( + Arrays.asList("address_hangzhou", "address_beijing")) + .build(); + + testAddNewTable(testParam, DEFAULT_PARALLELISM); + } + + @Test + public void testAddNewTableByPatternSingleParallelism() throws Exception { + TestParam testParam = + TestParam.newBuilder( + Collections.singletonList("address_\\.*"), + 8, + Collections.singletonList("address_\\.*"), + 8) + .setFirstRoundInitTables( + Arrays.asList("address_hangzhou", "address_beijing")) + .setSecondRoundInitTables( + Arrays.asList("address_shanghai", "address_suzhou")) + .build(); + + testAddNewTable(testParam, 1); + } + + @Test + public void testAddNewTableByPattern() throws Exception { + TestParam testParam = + TestParam.newBuilder( + Collections.singletonList("address_\\.*"), + 8, + Collections.singletonList("address_\\.*"), + 12) + .setFirstRoundInitTables( + Arrays.asList("address_hangzhou", "address_beijing")) + .setSecondRoundInitTables( + Arrays.asList( + "address_shanghai", "address_suzhou", "address_shenzhen")) + .build(); + + testAddNewTable(testParam, DEFAULT_PARALLELISM); + } + + private void testAddNewTable(TestParam testParam, int parallelism) throws Exception { + // step 1: create mysql tables + if (CollectionUtils.isNotEmpty(testParam.getFirstRoundInitTables())) { + initialAddressTables(getConnection(), testParam.getFirstRoundInitTables()); + } + Path savepointDir = Files.createTempDirectory("add-new-table-test"); + final String savepointDirectory = savepointDir.toAbsolutePath().toString(); + String finishedSavePointPath = null; + StreamExecutionEnvironment env = + getStreamExecutionEnvironment(finishedSavePointPath, parallelism); + // step 2: listen tables first time + List listenTablesFirstRound = testParam.getFirstRoundListenTables(); + + FlinkSourceProvider sourceProvider = + getFlinkSourceProvider(listenTablesFirstRound, parallelism); + DataStreamSource source = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()); + + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + CheckpointedCollectResultBuffer resultBuffer = + new CheckpointedCollectResultBuffer<>(serializer); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectResultIterator iterator = + addCollector(env, source, resultBuffer, serializer, accumulatorName); + JobClient jobClient = env.executeAsync("beforeAddNewTable"); + iterator.setJobClient(jobClient); + + List actual = fetchResults(iterator, testParam.getFirstRoundFetchSize()); + Optional listenByPattern = + listenTablesFirstRound.stream() + .filter(table -> StringUtils.contains(table, "\\.*")) + .findAny(); + multiAssert( + actual, + listenByPattern.isPresent() + ? testParam.getFirstRoundInitTables() + : listenTablesFirstRound); + + // step 3: create new tables if needed + if (CollectionUtils.isNotEmpty(testParam.getSecondRoundInitTables())) { + initialAddressTables(getConnection(), testParam.getSecondRoundInitTables()); + } + + // step 4: trigger a savepoint and cancel the job + finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); + jobClient.cancel().get(); + iterator.close(); + + // step 5: restore from savepoint + StreamExecutionEnvironment restoredEnv = + getStreamExecutionEnvironment(finishedSavePointPath, parallelism); + List listenTablesSecondRound = testParam.getSecondRoundListenTables(); + FlinkSourceProvider restoredSourceProvider = + getFlinkSourceProvider(listenTablesSecondRound, parallelism); + DataStreamSource restoreSource = + restoredEnv.fromSource( + restoredSourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()); + CollectResultIterator restoredIterator = + addCollector(restoredEnv, restoreSource, resultBuffer, serializer, accumulatorName); + JobClient restoreClient = restoredEnv.executeAsync("AfterAddNewTable"); + + List newlyAddTables = + listenTablesSecondRound.stream() + .filter(table -> !listenTablesFirstRound.contains(table)) + .collect(Collectors.toList()); + // it means listen by pattern when newlyAddTables is empty + if (CollectionUtils.isEmpty(newlyAddTables)) { + newlyAddTables = testParam.getSecondRoundInitTables(); + } + List newlyTableEvent = + fetchResults(restoredIterator, testParam.getSecondRoundFetchSize()); + multiAssert(newlyTableEvent, newlyAddTables); + restoreClient.cancel().get(); + restoredIterator.close(); + } + + private void multiAssert(List actualEvents, List listenTables) { + List expectedCreateTableEvents = new ArrayList<>(); + List expectedDataChangeEvents = new ArrayList<>(); + for (String table : listenTables) { + expectedCreateTableEvents.add( + getCreateTableEvent(TableId.tableId(customDatabase.getDatabaseName(), table))); + expectedDataChangeEvents.addAll( + getSnapshotExpected(TableId.tableId(customDatabase.getDatabaseName(), table))); + } + // compare create table events + List actualCreateTableEvents = + actualEvents.stream() + .filter(event -> event instanceof CreateTableEvent) + .collect(Collectors.toList()); + assertThat(actualCreateTableEvents) + .containsExactlyInAnyOrder(expectedCreateTableEvents.toArray(new Event[0])); + + // compare data change events + List actualDataChangeEvents = + actualEvents.stream() + .filter(event -> event instanceof DataChangeEvent) + .collect(Collectors.toList()); + assertThat(actualDataChangeEvents) + .containsExactlyInAnyOrder(expectedDataChangeEvents.toArray(new Event[0])); + } + + private CreateTableEvent getCreateTableEvent(TableId tableId) { + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("country", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("city", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("detail_address", DataTypes.VARCHAR(1024)) + .primaryKey(Collections.singletonList("id")) + .build(); + return new CreateTableEvent(tableId, schema); + } + + private List getSnapshotExpected(TableId tableId) { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.BIGINT().notNull(), + DataTypes.VARCHAR(255).notNull(), + DataTypes.VARCHAR(255).notNull(), + DataTypes.VARCHAR(1024) + }, + new String[] {"id", "country", "city", "detail_address"}); + BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); + String cityName = tableId.getTableName().split("_")[1]; + return Arrays.asList( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 416874195632735147L, + BinaryStringData.fromString("China"), + BinaryStringData.fromString(cityName), + BinaryStringData.fromString(cityName + " West Town address 1") + })), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 416927583791428523L, + BinaryStringData.fromString("China"), + BinaryStringData.fromString(cityName), + BinaryStringData.fromString(cityName + " West Town address 2") + })), + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 417022095255614379L, + BinaryStringData.fromString("China"), + BinaryStringData.fromString(cityName), + BinaryStringData.fromString(cityName + " West Town address 3") + }))); + } + + private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory) + throws ExecutionException, InterruptedException { + int retryTimes = 0; + // retry 600 times, it takes 100 milliseconds per time, at most retry 1 minute + while (retryTimes < 600) { + try { + return jobClient.triggerSavepoint(savepointDirectory).get(); + } catch (Exception e) { + Optional exception = + ExceptionUtils.findThrowable(e, CheckpointException.class); + if (exception.isPresent() + && exception.get().getMessage().contains("Checkpoint triggering task")) { + Thread.sleep(100); + retryTimes++; + } else { + throw e; + } + } + } + return null; + } + + private void initialAddressTables(JdbcConnection connection, List addressTables) + throws SQLException { + try { + connection.setAutoCommit(false); + for (String tableName : addressTables) { + // make initial data for given table + String tableId = customDatabase.getDatabaseName() + "." + tableName; + String cityName = tableName.split("_")[1]; + connection.execute( + "CREATE TABLE IF NOT EXISTS " + + tableId + + "(" + + " id BIGINT NOT NULL PRIMARY KEY," + + " country VARCHAR(255) NOT NULL," + + " city VARCHAR(255) NOT NULL," + + " detail_address VARCHAR(1024)" + + ");"); + connection.execute( + format( + "INSERT INTO %s " + + "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1')," + + " (416927583791428523, 'China', '%s', '%s West Town address 2')," + + " (417022095255614379, 'China', '%s', '%s West Town address 3');", + tableId, cityName, cityName, cityName, cityName, cityName, + cityName)); + } + connection.commit(); + } finally { + connection.close(); + } + } + + private FlinkSourceProvider getFlinkSourceProvider(List tables, int parallelism) { + List fullTableNames = + tables.stream() + .map(table -> customDatabase.getDatabaseName() + "." + table) + .collect(Collectors.toList()); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(SERVER_TIME_ZONE.key(), "UTC"); + options.put(TABLES.key(), StringUtils.join(fullTableNames, ",")); + options.put(SERVER_ID.key(), getServerId(parallelism)); + options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true"); + Factory.Context context = + new FactoryHelper.DefaultContext( + org.apache.flink.cdc.common.configuration.Configuration.fromMap(options), + null, + this.getClass().getClassLoader()); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + + return (FlinkSourceProvider) dataSource.getEventSourceProvider(); + } + + private CollectResultIterator addCollector( + StreamExecutionEnvironment env, + DataStreamSource source, + AbstractCollectResultBuffer buffer, + TypeSerializer serializer, + String accumulatorName) { + CollectSinkOperatorFactory sinkFactory = + new CollectSinkOperatorFactory<>(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) sinkFactory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator<>( + buffer, operator.getOperatorIdFuture(), accumulatorName, 0); + CollectStreamSink sink = new CollectStreamSink<>(source, sinkFactory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + env.registerCollectIterator(iterator); + return iterator; + } + + private StreamExecutionEnvironment getStreamExecutionEnvironment( + String finishedSavePointPath, int parallelism) { + Configuration configuration = new Configuration(); + if (finishedSavePointPath != null) { + configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath); + } + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.setParallelism(parallelism); + env.enableCheckpointing(500L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000L)); + return env; + } + + private static class TestParam { + private final List firstRoundInitTables; + private final List firstRoundListenTables; + private final Integer firstRoundFetchSize; + private final List secondRoundInitTables; + private final List secondRoundListenTables; + private final Integer secondRoundFetchSize; + + private TestParam(Builder builder) { + this.firstRoundInitTables = builder.firstRoundInitTables; + this.firstRoundListenTables = builder.firstRoundListenTables; + this.firstRoundFetchSize = builder.firstRoundFetchSize; + this.secondRoundInitTables = builder.secondRoundInitTables; + this.secondRoundListenTables = builder.secondRoundListenTables; + this.secondRoundFetchSize = builder.secondRoundFetchSize; + } + + public static Builder newBuilder( + List firstRoundListenTables, + Integer firstRoundFetchSize, + List secondRoundListenTables, + Integer secondRoundFetchSize) { + return new Builder( + firstRoundListenTables, + firstRoundFetchSize, + secondRoundListenTables, + secondRoundFetchSize); + } + + public static class Builder { + private List firstRoundInitTables; + private final List firstRoundListenTables; + private final Integer firstRoundFetchSize; + + private List secondRoundInitTables; + private final List secondRoundListenTables; + private final Integer secondRoundFetchSize; + + public Builder( + List firstRoundListenTables, + Integer firstRoundFetchSize, + List secondRoundListenTables, + Integer secondRoundFetchSize) { + this.firstRoundListenTables = firstRoundListenTables; + this.firstRoundFetchSize = firstRoundFetchSize; + this.secondRoundListenTables = secondRoundListenTables; + this.secondRoundFetchSize = secondRoundFetchSize; + } + + public TestParam build() { + return new TestParam(this); + } + + public Builder setFirstRoundInitTables(List firstRoundInitTables) { + this.firstRoundInitTables = firstRoundInitTables; + return this; + } + + public Builder setSecondRoundInitTables(List secondRoundInitTables) { + this.secondRoundInitTables = secondRoundInitTables; + return this; + } + } + + public List getFirstRoundInitTables() { + return firstRoundInitTables; + } + + public List getFirstRoundListenTables() { + return firstRoundListenTables; + } + + public Integer getFirstRoundFetchSize() { + return firstRoundFetchSize; + } + + public List getSecondRoundInitTables() { + return secondRoundInitTables; + } + + public List getSecondRoundListenTables() { + return secondRoundListenTables; + } + + public Integer getSecondRoundFetchSize() { + return secondRoundFetchSize; + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql new file mode 100644 index 000000000..e4df63f1a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql @@ -0,0 +1,328 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: customer +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create and populate our users using a single insert with many rows +CREATE TABLE customers ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + +-- Create a table will not be read +CREATE TABLE prefix_customers ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO prefix_customers +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"); + +-- table has same name prefix with 'customers.*' +CREATE TABLE customers_1 ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_1 +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + +-- create table whose split key is evenly distributed +CREATE TABLE customers_even_dist ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL , + address VARCHAR(1024), + phone_number VARCHAR(512) +); +INSERT INTO customers_even_dist +VALUES (101,'user_1','Shanghai','123567891234'), + (102,'user_2','Shanghai','123567891234'), + (103,'user_3','Shanghai','123567891234'), + (104,'user_4','Shanghai','123567891234'), + (105,'user_5','Shanghai','123567891234'), + (106,'user_6','Shanghai','123567891234'), + (107,'user_7','Shanghai','123567891234'), + (108,'user_8','Shanghai','123567891234'), + (109,'user_9','Shanghai','123567891234'), + (110,'user_10','Shanghai','123567891234'); + +-- create table whose split key is evenly distributed and sparse +CREATE TABLE customers_sparse_dist ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL , + address VARCHAR(1024), + phone_number VARCHAR(512) +); +INSERT INTO customers_sparse_dist +VALUES (2,'user_1','Shanghai','123567891234'), + (4,'user_2','Shanghai','123567891234'), + (6,'user_3','Shanghai','123567891234'), + (8,'user_4','Shanghai','123567891234'), + (10,'user_5','Shanghai','123567891234'), + (16,'user_6','Shanghai','123567891234'), + (17,'user_7','Shanghai','123567891234'), + (18,'user_8','Shanghai','123567891234'), + (20,'user_9','Shanghai','123567891234'), + (22,'user_10','Shanghai','123567891234'); + +-- create table whose split key is evenly distributed and dense +CREATE TABLE customers_dense_dist ( + id1 INTEGER NOT NULL, + id2 VARCHAR(255) NOT NULL , + address VARCHAR(1024), + phone_number VARCHAR(512), + PRIMARY KEY(id1, id2) +); +INSERT INTO customers_dense_dist +VALUES (1,'user_1','Shanghai','123567891234'), + (1,'user_2','Shanghai','123567891234'), + (1,'user_3','Shanghai','123567891234'), + (1,'user_4','Shanghai','123567891234'), + (2,'user_5','Shanghai','123567891234'), + (2,'user_6','Shanghai','123567891234'), + (2,'user_7','Shanghai','123567891234'), + (3,'user_8','Shanghai','123567891234'), + (3,'user_9','Shanghai','123567891234'), + (3,'user_10','Shanghai','123567891234'); + +CREATE TABLE customers_no_pk ( + id INTEGER, + name VARCHAR(255) DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_no_pk +VALUES (101,"user_1","Shanghai","123567891234"), + (102,"user_2","Shanghai","123567891234"), + (103,"user_3","Shanghai","123567891234"), + (109,"user_4","Shanghai","123567891234"), + (110,"user_5","Shanghai","123567891234"), + (111,"user_6","Shanghai","123567891234"), + (118,"user_7","Shanghai","123567891234"), + (121,"user_8","Shanghai","123567891234"), + (123,"user_9","Shanghai","123567891234"), + (1009,"user_10","Shanghai","123567891234"), + (1010,"user_11","Shanghai","123567891234"), + (1011,"user_12","Shanghai","123567891234"), + (1012,"user_13","Shanghai","123567891234"), + (1013,"user_14","Shanghai","123567891234"), + (1014,"user_15","Shanghai","123567891234"), + (1015,"user_16","Shanghai","123567891234"), + (1016,"user_17","Shanghai","123567891234"), + (1017,"user_18","Shanghai","123567891234"), + (1018,"user_19","Shanghai","123567891234"), + (1019,"user_20","Shanghai","123567891234"), + (2000,"user_21","Shanghai","123567891234"); + +-- table has combined primary key +CREATE TABLE customer_card ( + card_no BIGINT NOT NULL, + level VARCHAR(10) NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + note VARCHAR(1024), + PRIMARY KEY(card_no, level) +); + +insert into customer_card +VALUES (20001, 'LEVEL_4', 'user_1', 'user with level 4'), + (20002, 'LEVEL_4', 'user_2', 'user with level 4'), + (20003, 'LEVEL_4', 'user_3', 'user with level 4'), + (20004, 'LEVEL_4', 'user_4', 'user with level 4'), + (20004, 'LEVEL_1', 'user_4', 'user with level 4'), + (20004, 'LEVEL_2', 'user_4', 'user with level 4'), + (20004, 'LEVEL_3', 'user_4', 'user with level 4'), + (30006, 'LEVEL_3', 'user_5', 'user with level 3'), + (30007, 'LEVEL_3', 'user_6', 'user with level 3'), + (30008, 'LEVEL_3', 'user_7', 'user with level 3'), + (30009, 'LEVEL_3', 'user_8', 'user with level 3'), + (30009, 'LEVEL_2', 'user_8', 'user with level 3'), + (30009, 'LEVEL_1', 'user_8', 'user with level 3'), + (40001, 'LEVEL_2', 'user_9', 'user with level 2'), + (40002, 'LEVEL_2', 'user_10', 'user with level 2'), + (40003, 'LEVEL_2', 'user_11', 'user with level 2'), + (50001, 'LEVEL_1', 'user_12', 'user with level 1'), + (50002, 'LEVEL_1', 'user_13', 'user with level 1'), + (50003, 'LEVEL_1', 'user_14', 'user with level 1'); + +-- table has single line +CREATE TABLE customer_card_single_line ( + card_no BIGINT NOT NULL, + level VARCHAR(10) NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + note VARCHAR(1024), + PRIMARY KEY(card_no, level) +); + +insert into customer_card_single_line +VALUES (20001, 'LEVEL_1', 'user_1', 'user with level 1'); + + +-- table has combined primary key +CREATE TABLE shopping_cart ( + product_no INT NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) NOT NULL, + PRIMARY KEY(user_id, product_no, product_kind) +); + +insert into shopping_cart +VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'), + (101, 'KIND_002', 'user_1', 'my shopping cart'), + (102, 'KIND_007', 'user_1', 'my shopping cart'), + (102, 'KIND_008', 'user_1', 'my shopping cart'), + (501, 'KIND_100', 'user_2', 'my shopping list'), + (701, 'KIND_999', 'user_3', 'my shopping list'), + (801, 'KIND_010', 'user_4', 'my shopping list'), + (600, 'KIND_009', 'user_4', 'my shopping list'), + (401, 'KIND_002', 'user_5', 'leo list'), + (401, 'KIND_007', 'user_5', 'leo list'), + (404, 'KIND_008', 'user_5', 'leo list'), + (600, 'KIND_009', 'user_6', 'my shopping cart'); + +-- table has combined primary key and one of the primary key is evenly +CREATE TABLE evenly_shopping_cart ( + product_no INT NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) NOT NULL, + PRIMARY KEY(product_kind, product_no, user_id) +); + +insert into evenly_shopping_cart +VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'), + (102, 'KIND_002', 'user_1', 'my shopping cart'), + (103, 'KIND_007', 'user_1', 'my shopping cart'), + (104, 'KIND_008', 'user_1', 'my shopping cart'), + (105, 'KIND_100', 'user_2', 'my shopping list'), + (105, 'KIND_999', 'user_3', 'my shopping list'), + (107, 'KIND_010', 'user_4', 'my shopping list'), + (108, 'KIND_009', 'user_4', 'my shopping list'), + (109, 'KIND_002', 'user_5', 'leo list'), + (111, 'KIND_007', 'user_5', 'leo list'), + (111, 'KIND_008', 'user_5', 'leo list'), + (112, 'KIND_009', 'user_6', 'my shopping cart'); + +-- table has bigint unsigned auto increment primary key +CREATE TABLE shopping_cart_big ( + product_no BIGINT UNSIGNED AUTO_INCREMENT NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) NOT NULL, + PRIMARY KEY(product_no) +); + +insert into shopping_cart_big +VALUES (default, 'KIND_001', 'user_1', 'my shopping cart'), + (default, 'KIND_002', 'user_1', 'my shopping cart'), + (default, 'KIND_003', 'user_1', 'my shopping cart'); + +-- table has decimal primary key +CREATE TABLE shopping_cart_dec ( + product_no DECIMAL(10, 4) NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) DEFAULT 'flink', + PRIMARY KEY(product_no) +); + +insert into shopping_cart_dec +VALUES (123456.123, 'KIND_001', 'user_1', 'my shopping cart'), + (123457.456, 'KIND_002', 'user_2', 'my shopping cart'), + (123458.6789, 'KIND_003', 'user_3', 'my shopping cart'), + (123459.1234, 'KIND_004', 'user_4', null); + +-- create table whose primary key are produced by snowflake algorithm +CREATE TABLE address ( + id BIGINT UNSIGNED NOT NULL PRIMARY KEY, + country VARCHAR(255) NOT NULL, + city VARCHAR(255) NOT NULL, + detail_address VARCHAR(1024) +); + +INSERT INTO address +VALUES (416874195632735147, 'China', 'Beijing', 'West Town address 1'), + (416927583791428523, 'China', 'Beijing', 'West Town address 2'), + (417022095255614379, 'China', 'Beijing', 'West Town address 3'), + (417111867899200427, 'America', 'New York', 'East Town address 1'), + (417271541558096811, 'America', 'New York', 'East Town address 2'), + (417272886855938987, 'America', 'New York', 'East Town address 3'), + (417420106184475563, 'Germany', 'Berlin', 'West Town address 1'), + (418161258277847979, 'Germany', 'Berlin', 'West Town address 2'); + +CREATE TABLE default_value_test ( + id INTEGER NOT NULL PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number INTEGER DEFAULT ' 123 ' +); +INSERT INTO default_value_test +VALUES (1,'user1','Shanghai',123567), + (2,'user2','Shanghai',123567);