|
|
|
@ -110,17 +110,17 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
|
|
|
|
|
sourceFetchContext.getPartition(),
|
|
|
|
|
sourceFetchContext.getOffsetContext());
|
|
|
|
|
|
|
|
|
|
final StreamSplit backfillBinlogSplit =
|
|
|
|
|
final StreamSplit backfillRedoLogSplit =
|
|
|
|
|
createBackfillRedoLogSplit(changeEventSourceContext);
|
|
|
|
|
// optimization that skip the binlog read when the low watermark equals high
|
|
|
|
|
// optimization that skip the redo log read when the low watermark equals high
|
|
|
|
|
// watermark
|
|
|
|
|
final boolean binlogBackfillRequired =
|
|
|
|
|
backfillBinlogSplit
|
|
|
|
|
final boolean redoLogBackfillRequired =
|
|
|
|
|
backfillRedoLogSplit
|
|
|
|
|
.getEndingOffset()
|
|
|
|
|
.isAfter(backfillBinlogSplit.getStartingOffset());
|
|
|
|
|
if (!binlogBackfillRequired) {
|
|
|
|
|
dispatchBinlogEndEvent(
|
|
|
|
|
backfillBinlogSplit,
|
|
|
|
|
.isAfter(backfillRedoLogSplit.getStartingOffset());
|
|
|
|
|
if (!redoLogBackfillRequired) {
|
|
|
|
|
dispatchRedoLogEndEvent(
|
|
|
|
|
backfillRedoLogSplit,
|
|
|
|
|
sourceFetchContext.getPartition().getSourcePartition(),
|
|
|
|
|
((OracleSourceFetchTaskContext) context).getDispatcher());
|
|
|
|
|
taskRunning = false;
|
|
|
|
@ -128,16 +128,16 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
|
|
|
|
|
}
|
|
|
|
|
// execute redoLog read task
|
|
|
|
|
if (snapshotResult.isCompletedOrSkipped()) {
|
|
|
|
|
final RedoLogSplitReadTask backfillBinlogReadTask =
|
|
|
|
|
createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext);
|
|
|
|
|
final RedoLogSplitReadTask backfillRedoLogReadTask =
|
|
|
|
|
createBackfillRedoLogReadTask(backfillRedoLogSplit, sourceFetchContext);
|
|
|
|
|
|
|
|
|
|
final LogMinerOracleOffsetContextLoader loader =
|
|
|
|
|
new LogMinerOracleOffsetContextLoader(
|
|
|
|
|
((OracleSourceFetchTaskContext) context).getDbzConnectorConfig());
|
|
|
|
|
final OracleOffsetContext oracleOffsetContext =
|
|
|
|
|
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
|
|
|
|
|
backfillBinlogReadTask.execute(
|
|
|
|
|
new SnapshotBinlogSplitChangeEventSourceContext(),
|
|
|
|
|
loader.load(backfillRedoLogSplit.getStartingOffset().getOffset());
|
|
|
|
|
backfillRedoLogReadTask.execute(
|
|
|
|
|
new SnapshotRedoLogSplitChangeEventSourceContext(),
|
|
|
|
|
sourceFetchContext.getPartition(),
|
|
|
|
|
oracleOffsetContext);
|
|
|
|
|
taskRunning = false;
|
|
|
|
@ -160,7 +160,7 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RedoLogSplitReadTask createBackfillRedoLogReadTask(
|
|
|
|
|
StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) {
|
|
|
|
|
StreamSplit backfillRedoLogSplit, OracleSourceFetchTaskContext context) {
|
|
|
|
|
// we should only capture events for the current table,
|
|
|
|
|
// otherwise, we may can't find corresponding schema
|
|
|
|
|
Configuration dezConf =
|
|
|
|
@ -171,7 +171,7 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
|
|
|
|
|
// Disable heartbeat event in snapshot split fetcher
|
|
|
|
|
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
|
|
|
|
|
.build();
|
|
|
|
|
// task to read binlog and backfill for current split
|
|
|
|
|
// task to read redo log and backfill for current split
|
|
|
|
|
return new RedoLogSplitReadTask(
|
|
|
|
|
new OracleConnectorConfig(dezConf),
|
|
|
|
|
context.getConnection(),
|
|
|
|
@ -180,18 +180,18 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
|
|
|
|
|
context.getDatabaseSchema(),
|
|
|
|
|
context.getSourceConfig().getOriginDbzConnectorConfig(),
|
|
|
|
|
context.getStreamingChangeEventSourceMetrics(),
|
|
|
|
|
backfillBinlogSplit);
|
|
|
|
|
backfillRedoLogSplit);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void dispatchBinlogEndEvent(
|
|
|
|
|
StreamSplit backFillBinlogSplit,
|
|
|
|
|
private void dispatchRedoLogEndEvent(
|
|
|
|
|
StreamSplit backFillRedoLogSplit,
|
|
|
|
|
Map<String, ?> sourcePartition,
|
|
|
|
|
JdbcSourceEventDispatcher<OraclePartition> eventDispatcher)
|
|
|
|
|
throws InterruptedException {
|
|
|
|
|
eventDispatcher.dispatchWatermarkEvent(
|
|
|
|
|
sourcePartition,
|
|
|
|
|
backFillBinlogSplit,
|
|
|
|
|
backFillBinlogSplit.getEndingOffset(),
|
|
|
|
|
backFillRedoLogSplit,
|
|
|
|
|
backFillRedoLogSplit.getEndingOffset(),
|
|
|
|
|
WatermarkKind.END);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -446,10 +446,10 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded binlog task
|
|
|
|
|
* of a snapshot split task.
|
|
|
|
|
* The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded redo log
|
|
|
|
|
* task of a snapshot split task.
|
|
|
|
|
*/
|
|
|
|
|
public class SnapshotBinlogSplitChangeEventSourceContext
|
|
|
|
|
public class SnapshotRedoLogSplitChangeEventSourceContext
|
|
|
|
|
implements ChangeEventSource.ChangeEventSourceContext {
|
|
|
|
|
|
|
|
|
|
public void finished() {
|
|
|
|
|