[oracle] Fix the backfill task not running bug in oracle cdc connector (#2218)

pull/2156/head
Hang Ruan 2 years ago committed by GitHub
parent 66556ee501
commit d8c1e335dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -168,11 +168,12 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment {
"108,jacket,water resistent black wind breaker,0.1",
"109,spare tire,24 inch spare tire,22.2",
"111,jacket,new water resistent white wind breaker,0.5");
// Oracle cdc's backfill task will cost much time, increase the timeout here
proxy.checkResultWithTimeout(
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight"},
150000L);
300000L);
}
private Connection getOracleJdbcConnection() throws SQLException {

@ -130,9 +130,15 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
if (snapshotResult.isCompletedOrSkipped()) {
final RedoLogSplitReadTask backfillBinlogReadTask =
createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext);
final LogMinerOracleOffsetContextLoader loader =
new LogMinerOracleOffsetContextLoader(
((OracleSourceFetchTaskContext) context).getDbzConnectorConfig());
final OracleOffsetContext oracleOffsetContext =
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContext(),
sourceFetchContext.getOffsetContext());
new SnapshotBinlogSplitChangeEventSourceContext(), oracleOffsetContext);
taskRunning = false;
} else {
taskRunning = false;
throw new IllegalStateException(
@ -280,7 +286,7 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> {
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark);
((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(highWatermark);
dispatcher.dispatchWatermarkEvent(
offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH);
return SnapshotResult.completed(ctx.offset);

Loading…
Cancel
Save