[cdc-base] Fix TM hangs caused by uncaught exception (#2511)

pull/2523/head
Jiabao Sun 1 year ago committed by GitHub
parent 07db169765
commit 7172695471
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -75,6 +75,8 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
.setNameFormat("debezium-snapshot-reader-" + subtaskId)
.setUncaughtExceptionHandler(
(thread, throwable) -> setReadException(throwable))
.build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.hasNextElement = new AtomicBoolean(false);
@ -89,17 +91,13 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
this.queue = taskContext.getQueue();
this.hasNextElement.set(true);
this.reachEnd.set(false);
executorService.submit(
executorService.execute(
() -> {
try {
snapshotSplitReadTask.execute(taskContext);
} catch (Exception e) {
LOG.error(
String.format(
"Execute snapshot read task for snapshot split %s fail",
currentSnapshotSplit),
e);
readException = e;
setReadException(e);
}
});
}
@ -186,6 +184,19 @@ public class IncrementalSourceScanFetcher implements Fetcher<SourceRecords, Sour
}
}
private void setReadException(Throwable throwable) {
LOG.error(
String.format(
"Execute snapshot read task for snapshot split %s fail",
currentSnapshotSplit),
throwable);
if (readException == null) {
readException = throwable;
} else {
readException.addSuppressed(throwable);
}
}
@Override
public void close() {
try {

Loading…
Cancel
Save