[mysql-cdc] Add handler for catching async exceptions in snapshot reading executor

This closes #2016.
pull/2024/head
Leonard Xu 2 years ago
parent 70db0d2958
commit 7eceafad9b

@ -95,7 +95,11 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("snapshot-reader-" + subtaskId).build();
new ThreadFactoryBuilder()
.setNameFormat("debezium-reader-" + subtaskId)
.setUncaughtExceptionHandler(
(thread, throwable) -> setReadException(throwable))
.build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = false;
this.hasNextElement = new AtomicBoolean(false);
@ -120,7 +124,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
statefulTaskContext.getSnapshotReceiver(),
StatefulTaskContext.getClock(),
currentSnapshotSplit);
executorService.submit(
executorService.execute(
() -> {
try {
currentTaskRunning = true;
@ -160,20 +164,14 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
new SnapshotBinlogSplitChangeEventSourceContextImpl(),
mySqlOffsetContext);
} else {
readException =
setReadException(
new IllegalStateException(
String.format(
"Read snapshot for mysql split %s fail",
currentSnapshotSplit));
currentSnapshotSplit)));
}
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail",
currentSnapshotSplit),
e);
readException = e;
setReadException(e);
}
});
}
@ -335,6 +333,19 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
lowWatermark));
}
private void setReadException(Throwable throwable) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail", currentSnapshotSplit),
throwable);
if (readException == null) {
readException = throwable;
} else {
readException.addSuppressed(throwable);
}
}
@Override
public void close() {
try {

Loading…
Cancel
Save