From 0a097a9e534e093e452f2cd465659ff6e05192e6 Mon Sep 17 00:00:00 2001 From: ehui <374488688@qq.com> Date: Mon, 1 Aug 2022 17:46:56 +0800 Subject: [PATCH] [cdc-base] Fix NPE during snpashot scan phase (#1339) This closes #1122. --- .../base/source/reader/external/JdbcSourceScanFetcher.java | 1 + .../base/source/reader/external/JdbcSourceStreamFetcher.java | 1 + 2 files changed, 2 insertions(+) diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java index b32458af6..37ad58439 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceScanFetcher.java @@ -111,6 +111,7 @@ public class JdbcSourceScanFetcher implements Fetcher sourceRecords = new ArrayList<>(); while (!reachBinlogEnd) { + checkReadException(); List batch = queue.poll(); for (DataChangeEvent event : batch) { sourceRecords.add(event.getRecord()); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java index c1f01435c..bdbdc8f8a 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java @@ -77,6 +77,7 @@ public class JdbcSourceStreamFetcher implements Fetcher fetchTask) { this.streamFetchTask = fetchTask; this.currentStreamSplit = fetchTask.getSplit().asStreamSplit(); + configureFilter(); taskContext.configure(currentStreamSplit); this.queue = taskContext.getQueue(); executor.submit(