diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index cfeafad0f..8837b9c96 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -276,9 +276,9 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { @Override public Optional getNext() { - checkSplitterErrors(); waitTableDiscoveryReady(); synchronized (lock) { + checkSplitterErrors(); if (!remainingSplits.isEmpty()) { // return remaining splits firstly Iterator iterator = remainingSplits.iterator(); @@ -496,7 +496,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private void splitChunksForRemainingTables() { try { - // restore from a checkpoint and start to split the table from the previous checkpoint + // restore from a checkpoint and start to split the table from the previous + // checkpoint if (chunkSplitter.hasNextChunk()) { LOG.info( "Start splitting remaining chunks for table {}", @@ -509,13 +510,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { splitTable(nextTable); } } catch (Throwable e) { - if (uncaughtSplitterException == null) { - uncaughtSplitterException = e; - } else { - uncaughtSplitterException.addSuppressed(e); - } - // Release the potential waiting getNext() call synchronized (lock) { + if (uncaughtSplitterException == null) { + uncaughtSplitterException = e; + } else { + uncaughtSplitterException.addSuppressed(e); + } + // Release the potential waiting getNext() call lock.notify(); } }