[cdc-base] Fix TM hangs caused by uncaught exception (#2511)

(cherry picked from commit 7172695471)
pull/2694/head
Jiabao Sun 1 year ago committed by Leonard Xu
parent d26360010a
commit 5409ee4722

@ -75,6 +75,8 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("debezium-snapshot-reader-" + subtaskId)
.setUncaughtExceptionHandler(
(thread, throwable) -> setReadException(throwable))
.build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.hasNextElement = new AtomicBoolean(false);
@ -89,17 +91,13 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
this.queue = taskContext.getQueue();
this.hasNextElement.set(true);
this.reachEnd.set(false);
executorService.submit(
executorService.execute(
() -> {
try {
snapshotSplitReadTask.execute(taskContext);
} catch (Exception e) {
LOG.error(
String.format(
"Execute snapshot read task for snapshot split %s fail",
currentSnapshotSplit),
e);
readException = e;
setReadException(e);
}
});
}
@ -186,6 +184,19 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
}
}
private void setReadException(Throwable throwable) {
LOG.error(
String.format(
"Execute snapshot read task for snapshot split %s fail",
currentSnapshotSplit),
throwable);
if (readException == null) {
readException = throwable;
} else {
readException.addSuppressed(throwable);
}
}
@Override
public void close() {
try {

Loading…
Cancel
Save