diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java index 51b8d054f..57fcb0bba 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/OracleE2eITCase.java @@ -168,11 +168,12 @@ public class OracleE2eITCase extends FlinkContainerTestEnvironment { "108,jacket,water resistent black wind breaker,0.1", "109,spare tire,24 inch spare tire,22.2", "111,jacket,new water resistent white wind breaker,0.5"); + // Oracle cdc's backfill task will cost much time, increase the timeout here proxy.checkResultWithTimeout( expectResult, "products_sink", new String[] {"id", "name", "description", "weight"}, - 150000L); + 300000L); } private Connection getOracleJdbcConnection() throws SQLException { diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index edc0924d8..9905b5a12 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -130,9 +130,15 @@ public class OracleScanFetchTask implements FetchTask { if (snapshotResult.isCompletedOrSkipped()) { final RedoLogSplitReadTask backfillBinlogReadTask = createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext); + + final LogMinerOracleOffsetContextLoader loader = + new LogMinerOracleOffsetContextLoader( + ((OracleSourceFetchTaskContext) context).getDbzConnectorConfig()); + final OracleOffsetContext oracleOffsetContext = + loader.load(backfillBinlogSplit.getStartingOffset().getOffset()); backfillBinlogReadTask.execute( - new SnapshotBinlogSplitChangeEventSourceContext(), - sourceFetchContext.getOffsetContext()); + new SnapshotBinlogSplitChangeEventSourceContext(), oracleOffsetContext); + taskRunning = false; } else { taskRunning = false; throw new IllegalStateException( @@ -280,7 +286,7 @@ public class OracleScanFetchTask implements FetchTask { "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); - ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark); + ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(highWatermark); dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH); return SnapshotResult.completed(ctx.offset);