This closes #1122.
@ -111,6 +111,7 @@ public class JdbcSourceScanFetcher implements Fetcher<SourceRecord, SourceSplitB
boolean reachBinlogEnd = false;
final List<SourceRecord> sourceRecords = new ArrayList<>();
while (!reachBinlogEnd) {
checkReadException();
List<DataChangeEvent> batch = queue.poll();
for (DataChangeEvent event : batch) {
sourceRecords.add(event.getRecord());
@ -77,6 +77,7 @@ public class JdbcSourceStreamFetcher implements Fetcher<SourceRecord, SourceSpli
public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
this.streamFetchTask = fetchTask;
this.currentStreamSplit = fetchTask.getSplit().asStreamSplit();
configureFilter();
taskContext.configure(currentStreamSplit);
this.queue = taskContext.getQueue();
executor.submit(