From ae54f0b11a0d561b07e9604e8dca8614448614df Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Fri, 23 Jul 2021 09:24:14 +0800 Subject: [PATCH] [mysql] Throw underlying debezium reader exception --- .../debezium/reader/BinlogSplitReader.java | 19 ++++++++++++-- .../debezium/reader/SnapshotSplitReader.java | 25 ++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 5fe6a5a3e..428a43b1c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -18,6 +18,8 @@ package com.alibaba.ververica.cdc.connectors.mysql.debezium.reader; +import org.apache.flink.util.FlinkRuntimeException; + import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask; @@ -63,8 +65,10 @@ public class BinlogSplitReader implements DebeziumReader queue; + private volatile boolean currentTaskRunning; + private volatile Throwable readException; + private MySqlBinlogSplitReadTask binlogSplitReadTask; private MySqlBinlogSplit currentBinlogSplit; private Map> finishedSplitsInfo; @@ -114,7 +118,7 @@ public class BinlogSplitReader implements DebeziumReader pollSplitRecords() throws InterruptedException { + checkReadException(); final List sourceRecords = new ArrayList<>(); if (currentTaskRunning) { List batch = queue.poll(); @@ -147,6 +152,16 @@ public class BinlogSplitReader implements DebeziumReader queue; private volatile boolean currentTaskRunning; + private volatile Throwable readException; // task to read snapshot for current split private MySqlSnapshotSplitReadTask splitSnapshotReadTask; @@ -137,10 +140,11 @@ public class SnapshotSplitReader implements DebeziumReader pollSplitRecords() throws InterruptedException { + checkReadException(); + if (hasNextElement.get()) { // data input: [low watermark event][snapshot events][high watermark event][binlog // events][binlog-end event] @@ -199,6 +206,16 @@ public class SnapshotSplitReader implements DebeziumReader