diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java index b0553aac8..2e7a0abd6 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -75,6 +75,8 @@ public class IncrementalSourceScanFetcher implements Fetcher setReadException(throwable)) .build(); this.executorService = Executors.newSingleThreadExecutor(threadFactory); this.hasNextElement = new AtomicBoolean(false); @@ -89,17 +91,13 @@ public class IncrementalSourceScanFetcher implements Fetcher { try { snapshotSplitReadTask.execute(taskContext); } catch (Exception e) { - LOG.error( - String.format( - "Execute snapshot read task for snapshot split %s fail", - currentSnapshotSplit), - e); - readException = e; + setReadException(e); } }); } @@ -186,6 +184,19 @@ public class IncrementalSourceScanFetcher implements Fetcher