diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 6ccf305cf..dff21c677 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -45,11 +45,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -206,15 +208,24 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { if (sourceConfig.isScanNewlyAddedTableEnabled()) { // check whether we got newly added tables try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { - final List newlyAddedTables = discoverCapturedTables(jdbc, sourceConfig); + final List currentCapturedTables = + discoverCapturedTables(jdbc, sourceConfig); + final Set previousCapturedTables = new HashSet<>(); + List tablesInRemainingSplits = + remainingSplits.stream() + .map(MySqlSnapshotSplit::getTableId) + .collect(Collectors.toList()); + previousCapturedTables.addAll(tablesInRemainingSplits); + previousCapturedTables.addAll(alreadyProcessedTables); + previousCapturedTables.addAll(remainingTables); // Get the removed tables with the new table filter - List tablesToRemove = new LinkedList<>(alreadyProcessedTables); - tablesToRemove.addAll(remainingTables); - tablesToRemove.removeAll(newlyAddedTables); + Set tablesToRemove = new HashSet<>(previousCapturedTables); + tablesToRemove.removeAll(currentCapturedTables); - newlyAddedTables.removeAll(alreadyProcessedTables); - newlyAddedTables.removeAll(remainingTables); + // Get the newly added tables + currentCapturedTables.removeAll(previousCapturedTables); + List newlyAddedTables = currentCapturedTables; // case 1: there are old tables to remove from state if (!tablesToRemove.isEmpty()) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 38a0fa2c3..082f3255d 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -17,14 +17,21 @@ package com.ververica.cdc.connectors.mysql.source.assigners; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.ExceptionUtils; import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase; +import com.ververica.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState; +import com.ververica.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import io.debezium.relational.Column; import io.debezium.relational.TableId; import org.junit.BeforeClass; import org.junit.Test; @@ -41,6 +48,7 @@ import java.util.stream.Collectors; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -416,7 +424,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {"customers_even_dist"}, - "id"); + "id", + false); final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner( @@ -428,6 +437,18 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { assertFalse(assigner.needToDiscoveryTables()); } + @Test + public void testScanNewlyAddedTableStartFromCheckpoint() { + List expected = + Arrays.asList( + "customers_sparse_dist [109] null", + "customers_even_dist null [10]", + "customers_even_dist [10] [18]", + "customers_even_dist [18] null", + "customer_card_single_line null null"); + assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint()); + } + private List getTestAssignSnapshotSplits( int splitSize, double distributionFactorUpper, @@ -456,7 +477,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { distributionFactorUpper, distributionFactorLower, captureTables, - chunkKeyColumn); + chunkKeyColumn, + false); List remainingTables = Arrays.stream(captureTables) .map(t -> database.getDatabaseName() + "." + t) @@ -465,7 +487,103 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner( configuration, DEFAULT_PARALLELISM, remainingTables, false); + return getSplitsFromAssigner(assigner); + } + + private List getTestAssignSnapshotSplitsFromCheckpoint() { + TableId newTable = + TableId.parse(customerDatabase.getDatabaseName() + ".customer_card_single_line"); + TableId processedTable = + TableId.parse(customerDatabase.getDatabaseName() + ".customers_sparse_dist"); + TableId splitTable = + TableId.parse(customerDatabase.getDatabaseName() + ".customers_even_dist"); + String[] captureTables = {newTable.table(), processedTable.table(), splitTable.table()}; + MySqlSourceConfig configuration = + getConfig( + customerDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + captureTables, + null, + true); + List remainingTables = new ArrayList<>(); + List alreadyProcessedTables = new ArrayList<>(); + alreadyProcessedTables.add(processedTable); + + RowType splitKeyType = + ChunkUtils.getChunkKeyColumnType( + Column.editor().name("id").type("INT").jdbcType(4).create()); + List remainingSplits = + Arrays.asList( + new MySqlSchemalessSnapshotSplit( + processedTable, + processedTable + ":2", + splitKeyType, + new Object[] {109}, + null, + null), + new MySqlSchemalessSnapshotSplit( + splitTable, + splitTable + ":0", + splitKeyType, + null, + new Object[] {10}, + null), + new MySqlSchemalessSnapshotSplit( + splitTable, + splitTable + ":1", + splitKeyType, + new Object[] {10}, + new Object[] {18}, + null), + new MySqlSchemalessSnapshotSplit( + splitTable, + splitTable + ":2", + splitKeyType, + new Object[] {18}, + null, + null)); + + Map assignedSplits = new HashMap<>(); + assignedSplits.put( + processedTable + ":0", + new MySqlSchemalessSnapshotSplit( + processedTable, + processedTable + ":0", + splitKeyType, + null, + new Object[] {105}, + null)); + assignedSplits.put( + processedTable + ":1", + new MySqlSchemalessSnapshotSplit( + processedTable, + processedTable + ":1", + splitKeyType, + new Object[] {105}, + new Object[] {109}, + null)); + Map splitFinishedOffsets = new HashMap<>(); + splitFinishedOffsets.put(processedTable + ":0", ofEarliest()); + SnapshotPendingSplitsState checkpoint = + new SnapshotPendingSplitsState( + alreadyProcessedTables, + remainingSplits, + assignedSplits, + new HashMap<>(), + splitFinishedOffsets, + AssignerStatus.INITIAL_ASSIGNING, + remainingTables, + false, + true, + ChunkSplitterState.NO_SPLITTING_TABLE_STATE); + final MySqlSnapshotSplitAssigner assigner = + new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint); + return getSplitsFromAssigner(assigner); + } + private List getSplitsFromAssigner(final MySqlSnapshotSplitAssigner assigner) { assigner.open(); List sqlSplits = new ArrayList<>(); while (true) { @@ -500,7 +618,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { double distributionFactorUpper, double distributionLower, String[] captureTables, - String chunkKeyColumn) { + String chunkKeyColumn, + boolean scanNewlyAddedTableEnabled) { Map chunkKeys = new HashMap<>(); for (String table : captureTables) { chunkKeys.put(new ObjectPath(database.getDatabaseName(), table), chunkKeyColumn); @@ -523,6 +642,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) .chunkKeyColumn(chunkKeys) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .createConfig(0); } }