From 5409ee47228f1b22eee9eb861031b1626ca8fe11 Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Thu, 21 Sep 2023 03:59:56 -0500 Subject: [PATCH] [cdc-base] Fix TM hangs caused by uncaught exception (#2511) (cherry picked from commit 7172695471bc531fd4e22ee8b549111ebf5f65a4) --- .../IncrementalSourceScanFetcher.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java index b0553aac8..2e7a0abd6 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -75,6 +75,8 @@ public class IncrementalSourceScanFetcher implements Fetcher setReadException(throwable)) .build(); this.executorService = Executors.newSingleThreadExecutor(threadFactory); this.hasNextElement = new AtomicBoolean(false); @@ -89,17 +91,13 @@ public class IncrementalSourceScanFetcher implements Fetcher { try { snapshotSplitReadTask.execute(taskContext); } catch (Exception e) { - LOG.error( - String.format( - "Execute snapshot read task for snapshot split %s fail", - currentSnapshotSplit), - e); - readException = e; + setReadException(e); } }); } @@ -186,6 +184,19 @@ public class IncrementalSourceScanFetcher implements Fetcher