From 4bf5a395a5a8f83a5e309e5e985ad7c839b953db Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Fri, 9 Aug 2024 15:38:58 +0800 Subject: [PATCH] [FLINK-34688][cdc-connector][mysql] Make scan newly table trigger condition strictly This closes #3519. --- .../assigner/SnapshotSplitAssigner.java | 3 ++- .../assigners/MySqlSnapshotSplitAssigner.java | 3 ++- .../MySqlSnapshotSplitAssignerTest.java | 25 ++++++++++++++++--- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java index cd0e77200..a3a234b73 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java @@ -203,7 +203,8 @@ public class SnapshotSplitAssigner implements SplitAssig private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled() - && !sourceConfig.getStartupOptions().isSnapshotOnly()) { + && !sourceConfig.getStartupOptions().isSnapshotOnly() + && AssignerStatus.isAssigningFinished(assignerStatus)) { try { // check whether we got newly added tables final List currentCapturedTables = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index e209921b5..9ea69b11a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -215,7 +215,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private void captureNewlyAddedTables() { // Don't scan newly added table in snapshot mode. if (sourceConfig.isScanNewlyAddedTableEnabled() - && !sourceConfig.getStartupOptions().isSnapshotOnly()) { + && !sourceConfig.getStartupOptions().isSnapshotOnly() + && AssignerStatus.isAssigningFinished(assignerStatus)) { // check whether we got newly added tables try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) { final List currentCapturedTables = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 759827d9d..de875a0ed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -484,7 +484,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { } @Test - public void testScanNewlyAddedTableStartFromCheckpoint() { + public void testScanNewlyAddedTableStartFromInitialAssigningFinishedCheckpoint() { List expected = Arrays.asList( "customers_sparse_dist [109] null", @@ -492,7 +492,24 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { "customers_even_dist [10] [18]", "customers_even_dist [18] null", "customer_card_single_line null null"); - assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint()); + assertEquals( + expected, + getTestAssignSnapshotSplitsFromCheckpoint( + AssignerStatus.INITIAL_ASSIGNING_FINISHED)); + } + + @Test + public void testScanNewlyAddedTableStartFromNewlyAddedAssigningSnapshotFinishedCheckpoint() { + List expected = + Arrays.asList( + "customers_sparse_dist [109] null", + "customers_even_dist null [10]", + "customers_even_dist [10] [18]", + "customers_even_dist [18] null"); + assertEquals( + expected, + getTestAssignSnapshotSplitsFromCheckpoint( + AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED)); } private List getTestAssignSnapshotSplits( @@ -536,7 +553,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { return getSplitsFromAssigner(assigner); } - private List getTestAssignSnapshotSplitsFromCheckpoint() { + private List getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus assignerStatus) { TableId newTable = TableId.parse(customerDatabase.getDatabaseName() + ".customer_card_single_line"); TableId processedTable = @@ -619,7 +636,7 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { assignedSplits, new HashMap<>(), splitFinishedOffsets, - AssignerStatus.INITIAL_ASSIGNING, + assignerStatus, remainingTables, false, true,