|
|
|
@ -50,7 +50,7 @@ public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSp
|
|
|
|
|
private final Queue<SourceSplitBase> splits;
|
|
|
|
|
private final int subtaskId;
|
|
|
|
|
|
|
|
|
|
@Nullable private Fetcher<SourceRecord, SourceSplitBase> currenFetcher;
|
|
|
|
|
@Nullable private Fetcher<SourceRecord, SourceSplitBase> currentFetcher;
|
|
|
|
|
@Nullable private String currentSplitId;
|
|
|
|
|
private final JdbcDataSourceDialect dataSourceDialect;
|
|
|
|
|
|
|
|
|
@ -65,7 +65,7 @@ public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSp
|
|
|
|
|
checkSplitOrStartNext();
|
|
|
|
|
Iterator<SourceRecord> dataIt = null;
|
|
|
|
|
try {
|
|
|
|
|
dataIt = currenFetcher.pollSplitRecords();
|
|
|
|
|
dataIt = currentFetcher.pollSplitRecords();
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
LOG.warn("fetch data failed.", e);
|
|
|
|
|
throw new IOException(e);
|
|
|
|
@ -93,16 +93,16 @@ public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSp
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void close() throws Exception {
|
|
|
|
|
if (currenFetcher != null) {
|
|
|
|
|
LOG.info("Close current fetcher {}", currenFetcher.getClass().getCanonicalName());
|
|
|
|
|
currenFetcher.close();
|
|
|
|
|
if (currentFetcher != null) {
|
|
|
|
|
LOG.info("Close current fetcher {}", currentFetcher.getClass().getCanonicalName());
|
|
|
|
|
currentFetcher.close();
|
|
|
|
|
currentSplitId = null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void checkSplitOrStartNext() throws IOException {
|
|
|
|
|
// the binlog fetcher should keep alive
|
|
|
|
|
if (currenFetcher instanceof JdbcSourceStreamFetcher) {
|
|
|
|
|
if (currentFetcher instanceof JdbcSourceStreamFetcher) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -114,28 +114,28 @@ public class JdbcSourceSplitReader implements SplitReader<SourceRecord, SourceSp
|
|
|
|
|
currentSplitId = nextSplit.splitId();
|
|
|
|
|
|
|
|
|
|
if (nextSplit.isSnapshotSplit()) {
|
|
|
|
|
if (currenFetcher == null) {
|
|
|
|
|
if (currentFetcher == null) {
|
|
|
|
|
final JdbcSourceFetchTaskContext taskContext =
|
|
|
|
|
dataSourceDialect.createFetchTaskContext(nextSplit);
|
|
|
|
|
currenFetcher = new JdbcSourceScanFetcher(taskContext, subtaskId);
|
|
|
|
|
currentFetcher = new JdbcSourceScanFetcher(taskContext, subtaskId);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// point from snapshot split to binlog split
|
|
|
|
|
if (currenFetcher != null) {
|
|
|
|
|
if (currentFetcher != null) {
|
|
|
|
|
LOG.info("It's turn to read binlog split, close current snapshot fetcher.");
|
|
|
|
|
currenFetcher.close();
|
|
|
|
|
currentFetcher.close();
|
|
|
|
|
}
|
|
|
|
|
final JdbcSourceFetchTaskContext taskContext =
|
|
|
|
|
dataSourceDialect.createFetchTaskContext(nextSplit);
|
|
|
|
|
currenFetcher = new JdbcSourceStreamFetcher(taskContext, subtaskId);
|
|
|
|
|
currentFetcher = new JdbcSourceStreamFetcher(taskContext, subtaskId);
|
|
|
|
|
LOG.info("Stream fetcher is created.");
|
|
|
|
|
}
|
|
|
|
|
currenFetcher.submitTask(dataSourceDialect.createFetchTask(nextSplit));
|
|
|
|
|
currentFetcher.submitTask(dataSourceDialect.createFetchTask(nextSplit));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean canAssignNextSplit() {
|
|
|
|
|
return currenFetcher == null || currenFetcher.isFinished();
|
|
|
|
|
return currentFetcher == null || currentFetcher.isFinished();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private ChangeEventRecords finishedSnapshotSplit() {
|
|
|
|
|