diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java index aa5a8e5f9..b4e32875d 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java @@ -130,10 +130,11 @@ public class MySqlParallelSource public SplitEnumerator createEnumerator( SplitEnumeratorContext enumContext) { MySqlValidator validator = new MySqlValidator(config); + final int currentParallelism = enumContext.currentParallelism(); final MySqlSplitAssigner splitAssigner = startupMode.equals("initial") - ? new MySqlHybridSplitAssigner(config) + ? new MySqlHybridSplitAssigner(config, currentParallelism) : new MySqlBinlogSplitAssigner(config); return new MySqlSourceEnumerator(enumContext, splitAssigner, validator); @@ -144,9 +145,11 @@ public class MySqlParallelSource SplitEnumeratorContext enumContext, PendingSplitsState checkpoint) { MySqlValidator validator = new MySqlValidator(config); final MySqlSplitAssigner splitAssigner; + final int currentParallelism = enumContext.currentParallelism(); if (checkpoint instanceof HybridPendingSplitsState) { splitAssigner = - new MySqlHybridSplitAssigner(config, (HybridPendingSplitsState) checkpoint); + new MySqlHybridSplitAssigner( + config, currentParallelism, (HybridPendingSplitsState) checkpoint); } else if (checkpoint instanceof BinlogPendingSplitsState) { splitAssigner = new MySqlBinlogSplitAssigner(config, (BinlogPendingSplitsState) checkpoint); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index 1f6f396ef..65a1564d0 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -50,15 +50,17 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { private final MySqlSnapshotSplitAssigner snapshotSplitAssigner; - public MySqlHybridSplitAssigner(Configuration configuration) { - this(new MySqlSnapshotSplitAssigner(configuration), false); + public MySqlHybridSplitAssigner(Configuration configuration, int currentParallelism) { + this(new MySqlSnapshotSplitAssigner(configuration, currentParallelism), false); } public MySqlHybridSplitAssigner( - Configuration configuration, HybridPendingSplitsState checkpoint) { + Configuration configuration, + int currentParallelism, + HybridPendingSplitsState checkpoint) { this( new MySqlSnapshotSplitAssigner( - configuration, checkpoint.getSnapshotPendingSplits()), + configuration, currentParallelism, checkpoint.getSnapshotPendingSplits()), checkpoint.isBinlogSplitAssigned()); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 1043945b1..b69da4ee0 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -69,6 +69,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private boolean assignerFinished; private final Configuration configuration; + private final int currentParallelism; private final LinkedList remainingTables; private final RelationalTableFilters tableFilters; private final int chunkSize; @@ -78,9 +79,10 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { @Nullable private Long checkpointIdToFinish; - public MySqlSnapshotSplitAssigner(Configuration configuration) { + public MySqlSnapshotSplitAssigner(Configuration configuration, int currentParallelism) { this( configuration, + currentParallelism, new ArrayList<>(), new ArrayList<>(), new HashMap<>(), @@ -89,9 +91,12 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { } public MySqlSnapshotSplitAssigner( - Configuration configuration, SnapshotPendingSplitsState checkpoint) { + Configuration configuration, + int currentParallelism, + SnapshotPendingSplitsState checkpoint) { this( configuration, + currentParallelism, checkpoint.getAlreadyProcessedTables(), checkpoint.getRemainingSplits(), checkpoint.getAssignedSplits(), @@ -101,12 +106,14 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private MySqlSnapshotSplitAssigner( Configuration configuration, + int currentParallelism, List alreadyProcessedTables, List remainingSplits, Map assignedSplits, Map splitFinishedOffsets, boolean assignerFinished) { this.configuration = configuration; + this.currentParallelism = currentParallelism; this.alreadyProcessedTables = alreadyProcessedTables; this.remainingSplits = remainingSplits; this.assignedSplits = assignedSplits; @@ -166,8 +173,17 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { public void onFinishedSplits(Map splitFinishedOffsets) { this.splitFinishedOffsets.putAll(splitFinishedOffsets); if (allSplitsFinished()) { - LOG.info( - "Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished."); + // Skip the waiting checkpoint when current parallelism is 1 which means we do not need + // to care about the global output data order of snapshot splits and binlog split. + if (currentParallelism == 1) { + assignerFinished = true; + LOG.info( + "Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status."); + + } else { + LOG.info( + "Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished."); + } } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 0c74bc579..16686c411 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -75,6 +75,7 @@ import static org.junit.Assert.assertEquals; /** Tests for {@link BinlogSplitReader}. */ public class BinlogSplitReaderTest extends MySqlTestBase { + private static final int currentParallelism = 4; private final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -586,7 +587,8 @@ public class BinlogSplitReaderTest extends MySqlTestBase { } private List getMySqlSplits(Configuration configuration, RowType pkType) { - final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration); + final MySqlSnapshotSplitAssigner assigner = + new MySqlSnapshotSplitAssigner(configuration, currentParallelism); assigner.open(); List mySqlSplits = new ArrayList<>(); while (true) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 90bfb71b0..14cdacc34 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -60,6 +60,7 @@ import static org.junit.Assert.assertEquals; /** Tests for {@link SnapshotSplitReader}. */ public class SnapshotSplitReaderTest extends MySqlTestBase { + private static final int currentParallelism = 4; private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -277,7 +278,8 @@ public class SnapshotSplitReaderTest extends MySqlTestBase { } private List getMySqlSplits(Configuration configuration, RowType pkType) { - final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration); + final MySqlSnapshotSplitAssigner assigner = + new MySqlSnapshotSplitAssigner(configuration, currentParallelism); assigner.open(); List mySqlSplitList = new ArrayList<>(); while (true) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java index 229d782dd..381cb4dc3 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java @@ -55,6 +55,7 @@ import static org.junit.Assert.assertEquals; /** Tests for {@link MySqlHybridSplitAssigner}. */ public class MySqlHybridSplitAssignerTest extends MySqlTestBase { + private static final int currentParallelism = 4; private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -114,7 +115,7 @@ public class MySqlHybridSplitAssignerTest extends MySqlTestBase { HybridPendingSplitsState checkpoint = new HybridPendingSplitsState(snapshotPendingSplitsState, false); final MySqlHybridSplitAssigner assigner = - new MySqlHybridSplitAssigner(configuration, checkpoint); + new MySqlHybridSplitAssigner(configuration, currentParallelism, checkpoint); // step 2. Get the MySqlBinlogSplit after all snapshot splits finished Optional binlogSplit = assigner.getNext(); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index a61754991..558bd5cc8 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -48,6 +48,7 @@ import static org.junit.Assert.fail; /** Tests for {@link MySqlSnapshotSplitAssigner}. */ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase { + private static final int currentParallelism = 4; private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -121,7 +122,8 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase { .collect(Collectors.toList()); configuration.setString("table.whitelist", String.join(",", captureTableIds)); - final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration); + final MySqlSnapshotSplitAssigner assigner = + new MySqlSnapshotSplitAssigner(configuration, currentParallelism); assigner.open(); List sqlSplits = new ArrayList<>(); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 99ff010d5..10e72d8ab 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -206,6 +206,94 @@ public class MySqlConnectorITCase extends MySqlTestBase { result.getJobClient().get().cancel().get(); } + @Test + public void testCheckpointIsOptionalUnderSingleParallelism() throws Exception { + if (incrementalSnapshot) { + env.setParallelism(1); + // check the checkpoint is optional when parallelism is 1 + env.getCheckpointConfig().disableCheckpointing(); + } else { + return; + } + inventoryDatabase.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'debezium.internal.implementation' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + inventoryDatabase.getUsername(), + inventoryDatabase.getPassword(), + inventoryDatabase.getDatabaseName(), + "products", + getDezImplementation(), + incrementalSnapshot, + getServerId(), + getSplitSize()); + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM debezium_source"); + CloseableIterator iterator = result.collect(); + String[] expectedSnapshot = + new String[] { + "+I[101, scooter, Small 2-wheel scooter, 3.140]", + "+I[102, car battery, 12V car battery, 8.100]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]", + "+I[104, hammer, 12oz carpenter's hammer, 0.750]", + "+I[105, hammer, 14oz carpenter's hammer, 0.875]", + "+I[106, hammer, 16oz carpenter's hammer, 1.000]", + "+I[107, rocks, box of assorted rocks, 5.300]", + "+I[108, jacket, water resistent black wind breaker, 0.100]", + "+I[109, spare tire, 24 inch spare tire, 22.200]" + }; + assertThat( + fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot)); + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); + } + + String[] expectedBinlog = + new String[] { + "+I[110, jacket, water resistent white wind breaker, 0.200]", + "+I[111, scooter, Big 2-wheel scooter , 5.180]", + "-U[110, jacket, water resistent white wind breaker, 0.200]", + "+U[110, jacket, new water resistent white wind breaker, 0.500]", + "-U[111, scooter, Big 2-wheel scooter , 5.180]", + "+U[111, scooter, Big 2-wheel scooter , 5.170]", + "-D[111, scooter, Big 2-wheel scooter , 5.170]" + }; + assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog)); + result.getJobClient().get().cancel().get(); + } + @Test public void testAllTypes() throws Throwable { fullTypesDatabase.createAndInitialize();