[mysql] Throw underlying debezium reader exception

pull/270/head
Leonard Xu committed by Jark Wu
parent 35c636224d
commit ae54f0b11a
No known key found for this signature in database
GPG Key ID: 85BACB5AEFAE3202

@ -18,6 +18,8 @@
package com.alibaba.ververica.cdc.connectors.mysql.debezium.reader;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
@ -63,8 +65,10 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
private final StatefulTaskContext statefulTaskContext;
private final ExecutorService executor;
private volatile boolean currentTaskRunning;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile boolean currentTaskRunning;
private volatile Throwable readException;
private MySqlBinlogSplitReadTask binlogSplitReadTask;
private MySqlBinlogSplit currentBinlogSplit;
private Map<TableId, List<FinishedSnapshotSplitInfo>> finishedSplitsInfo;
@ -114,7 +118,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
"Execute binlog read task for mysql split %s fail",
currentBinlogSplit),
e);
e.printStackTrace();
readException = e;
}
});
}
@ -135,6 +139,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
@Nullable
@Override
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
if (currentTaskRunning) {
List<DataChangeEvent> batch = queue.poll();
@ -147,6 +152,16 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
return sourceRecords.iterator();
}
private void checkReadException() {
if (readException != null) {
throw new FlinkRuntimeException(
String.format(
"Read split %s error due to %s.",
currentBinlogSplit, readException.getMessage()),
readException);
}
}
@Override
public void close() {
try {

@ -18,6 +18,8 @@
package com.alibaba.ververica.cdc.connectors.mysql.debezium.reader;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
@ -63,6 +65,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile boolean currentTaskRunning;
private volatile Throwable readException;
// task to read snapshot for current split
private MySqlSnapshotSplitReadTask splitSnapshotReadTask;
@ -137,10 +140,11 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
splitBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContextImpl());
} else {
throw new IllegalStateException(
String.format(
"Read snapshot for mysql split %s fail",
currentSnapshotSplit));
readException =
new IllegalStateException(
String.format(
"Read snapshot for mysql split %s fail",
currentSnapshotSplit));
}
} catch (Exception e) {
currentTaskRunning = false;
@ -149,6 +153,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
"Execute snapshot read task for mysql split %s fail",
currentSnapshotSplit),
e);
readException = e;
}
});
}
@ -173,6 +178,8 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
@Nullable
@Override
public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
checkReadException();
if (hasNextElement.get()) {
// data input: [low watermark event][snapshot events][high watermark event][binlog
// events][binlog-end event]
@ -199,6 +206,16 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
return null;
}
private void checkReadException() {
if (readException != null) {
throw new FlinkRuntimeException(
String.format(
"Read split %s error due to %s.",
currentSnapshotSplit, readException.getMessage()),
readException);
}
}
@Override
public void close() {
try {

Loading…
Cancel
Save