diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java index b32458af6..37ad58439 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java @@ -111,6 +111,7 @@ public class JdbcSourceScanFetcher implements Fetcher sourceRecords = new ArrayList<>(); while (!reachBinlogEnd) { + checkReadException(); List batch = queue.poll(); for (DataChangeEvent event : batch) { sourceRecords.add(event.getRecord()); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java index c1f01435c..bdbdc8f8a 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java @@ -77,6 +77,7 @@ public class JdbcSourceStreamFetcher implements Fetcher fetchTask) { this.streamFetchTask = fetchTask; this.currentStreamSplit = fetchTask.getSplit().asStreamSplit(); + configureFilter(); taskContext.configure(currentStreamSplit); this.queue = taskContext.getQueue(); executor.submit(