diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory.java new file mode 100644 index 000000000..9b0b8c8be --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory.java @@ -0,0 +1,282 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.oracle.source.reader.fetch; + +import com.ververica.cdc.common.annotation.Internal; +import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; +import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; +import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind; +import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; +import io.debezium.DebeziumException; +import io.debezium.connector.oracle.OracleConnection; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.OracleDatabaseSchema; +import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OraclePartition; +import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; +import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; +import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor; +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.source.spi.ChangeEventSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; + +import static com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset.NO_STOPPING_OFFSET; + +/** + * Factory to produce a LogMinerEventProcessor with enhanced processRow method to distinguish + * whether is bounded. + */ +@Internal +public class EventProcessorFactory { + private static final Logger LOG = LoggerFactory.getLogger(EventProcessorFactory.class); + + private EventProcessorFactory() {} + + public static LogMinerEventProcessor createProcessor( + ChangeEventSource.ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleConnection jdbcConnection, + JdbcSourceEventDispatcher dispatcher, + OraclePartition partition, + OracleOffsetContext offsetContext, + OracleDatabaseSchema schema, + OracleStreamingChangeEventSourceMetrics metrics, + ErrorHandler errorHandler, + StreamSplit redoLogSplit) { + final OracleConnectorConfig.LogMiningBufferType bufferType = + connectorConfig.getLogMiningBufferType(); + if (bufferType.equals(OracleConnectorConfig.LogMiningBufferType.MEMORY)) { + return new CDCMemoryLogMinerEventProcessor( + context, + connectorConfig, + jdbcConnection, + dispatcher, + partition, + offsetContext, + schema, + metrics, + errorHandler, + redoLogSplit); + } else if (bufferType.equals( + OracleConnectorConfig.LogMiningBufferType.INFINISPAN_EMBEDDED)) { + return new CDCEmbeddedInfinispanLogMinerEventProcessor( + context, + connectorConfig, + jdbcConnection, + dispatcher, + partition, + offsetContext, + schema, + metrics, + errorHandler, + redoLogSplit); + } else if (bufferType.equals(OracleConnectorConfig.LogMiningBufferType.INFINISPAN_REMOTE)) { + return new CDCRemoteInfinispanLogMinerEventProcessor( + context, + connectorConfig, + jdbcConnection, + dispatcher, + partition, + offsetContext, + schema, + metrics, + errorHandler, + redoLogSplit); + } else { + throw new IllegalArgumentException( + "not support this type of bufferType: " + bufferType); + } + } + + /** + * A {@link MemoryLogMinerEventProcessor} with enhanced processRow method to distinguish whether + * is bounded. + */ + public static class CDCMemoryLogMinerEventProcessor extends MemoryLogMinerEventProcessor { + private final StreamSplit redoLogSplit; + private final ErrorHandler errorHandler; + + private ChangeEventSource.ChangeEventSourceContext context; + private final JdbcSourceEventDispatcher dispatcher; + + public CDCMemoryLogMinerEventProcessor( + ChangeEventSource.ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleConnection jdbcConnection, + JdbcSourceEventDispatcher dispatcher, + OraclePartition partition, + OracleOffsetContext offsetContext, + OracleDatabaseSchema schema, + OracleStreamingChangeEventSourceMetrics metrics, + ErrorHandler errorHandler, + StreamSplit redoLogSplit) { + super( + context, + connectorConfig, + jdbcConnection, + dispatcher, + partition, + offsetContext, + schema, + metrics); + this.redoLogSplit = redoLogSplit; + this.errorHandler = errorHandler; + this.context = context; + this.dispatcher = dispatcher; + } + + @Override + protected void processRow(OraclePartition partition, LogMinerEventRow row) + throws SQLException, InterruptedException { + super.processRow(partition, row); + afterRowProcess(partition, row, redoLogSplit, errorHandler, dispatcher, context); + } + } + + /** + * A {@link EmbeddedInfinispanLogMinerEventProcessor} with enhanced processRow method to + * distinguish whether is bounded. + */ + public static class CDCEmbeddedInfinispanLogMinerEventProcessor + extends EmbeddedInfinispanLogMinerEventProcessor { + private final StreamSplit redoLogSplit; + private final ErrorHandler errorHandler; + + private ChangeEventSource.ChangeEventSourceContext context; + private final JdbcSourceEventDispatcher dispatcher; + + public CDCEmbeddedInfinispanLogMinerEventProcessor( + ChangeEventSource.ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleConnection jdbcConnection, + JdbcSourceEventDispatcher dispatcher, + OraclePartition partition, + OracleOffsetContext offsetContext, + OracleDatabaseSchema schema, + OracleStreamingChangeEventSourceMetrics metrics, + ErrorHandler errorHandler, + StreamSplit redoLogSplit) { + super( + context, + connectorConfig, + jdbcConnection, + dispatcher, + partition, + offsetContext, + schema, + metrics); + this.redoLogSplit = redoLogSplit; + this.errorHandler = errorHandler; + this.context = context; + this.dispatcher = dispatcher; + } + + @Override + protected void processRow(OraclePartition partition, LogMinerEventRow row) + throws SQLException, InterruptedException { + super.processRow(partition, row); + afterRowProcess(partition, row, redoLogSplit, errorHandler, dispatcher, context); + } + } + + /** + * A {@link CDCRemoteInfinispanLogMinerEventProcessor} with enhanced processRow method to + * distinguish whether is bounded. + */ + public static class CDCRemoteInfinispanLogMinerEventProcessor + extends RemoteInfinispanLogMinerEventProcessor { + private final StreamSplit redoLogSplit; + private final ErrorHandler errorHandler; + + private ChangeEventSource.ChangeEventSourceContext context; + private final JdbcSourceEventDispatcher dispatcher; + + public CDCRemoteInfinispanLogMinerEventProcessor( + ChangeEventSource.ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleConnection jdbcConnection, + JdbcSourceEventDispatcher dispatcher, + OraclePartition partition, + OracleOffsetContext offsetContext, + OracleDatabaseSchema schema, + OracleStreamingChangeEventSourceMetrics metrics, + ErrorHandler errorHandler, + StreamSplit redoLogSplit) { + super( + context, + connectorConfig, + jdbcConnection, + dispatcher, + partition, + offsetContext, + schema, + metrics); + this.redoLogSplit = redoLogSplit; + this.errorHandler = errorHandler; + this.context = context; + this.dispatcher = dispatcher; + } + + @Override + protected void processRow(OraclePartition partition, LogMinerEventRow row) + throws SQLException, InterruptedException { + super.processRow(partition, row); + afterRowProcess(partition, row, redoLogSplit, errorHandler, dispatcher, context); + } + } + + public static void afterRowProcess( + OraclePartition partition, + LogMinerEventRow row, + StreamSplit redoLogSplit, + ErrorHandler errorHandler, + JdbcSourceEventDispatcher dispatcher, + ChangeEventSource.ChangeEventSourceContext context) { + // check do we need to stop for fetch redo log for snapshot split. + if (isBoundedRead(redoLogSplit)) { + final RedoLogOffset currentRedoLogOffset = new RedoLogOffset(row.getScn().longValue()); + // reach the high watermark, the redo log fetcher should be finished + if (currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) { + // send redo log end event + try { + dispatcher.dispatchWatermarkEvent( + partition.getSourcePartition(), + redoLogSplit, + currentRedoLogOffset, + WatermarkKind.END); + } catch (InterruptedException e) { + LOG.error("Send signal event error.", e); + errorHandler.setProducerThrowable( + new DebeziumException("Error processing redo log signal event", e)); + } + // tell fetcher the redo log task finished + ((OracleScanFetchTask.OracleSnapshotRedoLogSplitChangeEventSourceContext) context) + .finished(); + } + } + } + + private static boolean isBoundedRead(StreamSplit redoLogSplit) { + return !NO_STOPPING_OFFSET.equals(redoLogSplit.getEndingOffset()); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java index 097f763e1..3dd45bf2d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java @@ -16,13 +16,11 @@ package com.ververica.cdc.connectors.oracle.source.reader.fetch; +import com.ververica.cdc.common.annotation.Internal; import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; -import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; -import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; -import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; @@ -31,18 +29,15 @@ import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OraclePartition; import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource; +import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.util.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; - -import static com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset.NO_STOPPING_OFFSET; - /** The task to work for fetching data of Oracle table stream split. */ +@Internal public class OracleStreamFetchTask implements FetchTask { private final StreamSplit split; @@ -100,6 +95,12 @@ public class OracleStreamFetchTask implements FetchTask { private final JdbcSourceEventDispatcher dispatcher; private final ErrorHandler errorHandler; private ChangeEventSourceContext context; + private final OracleConnectorConfig connectorConfig; + private final OracleConnection connection; + + private final OracleDatabaseSchema schema; + + private final OracleStreamingChangeEventSourceMetrics metrics; public RedoLogSplitReadTask( OracleConnectorConfig connectorConfig, @@ -122,6 +123,10 @@ public class OracleStreamFetchTask implements FetchTask { this.redoLogSplit = redoLogSplit; this.dispatcher = dispatcher; this.errorHandler = errorHandler; + this.connectorConfig = connectorConfig; + this.connection = connection; + this.metrics = metrics; + this.schema = schema; } @Override @@ -133,48 +138,26 @@ public class OracleStreamFetchTask implements FetchTask { super.execute(context, partition, offsetContext); } + /** + * Delegate {@link EventProcessorFactory} to produce a LogMinerEventProcessor with enhanced + * processRow method to distinguish whether is bounded. + */ @Override - protected void afterHandleScn( - OraclePartition partition, OracleOffsetContext offsetContext) { - super.afterHandleScn(partition, offsetContext); - // check do we need to stop for fetch redo log for snapshot split. - if (isBoundedRead()) { - final RedoLogOffset currentRedoLogOffset = - getCurrentRedoLogOffset(offsetContext.getOffset()); - // reach the high watermark, the redo log fetcher should be finished - if (currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) { - // send redo log end event - try { - dispatcher.dispatchWatermarkEvent( - partition.getSourcePartition(), - redoLogSplit, - currentRedoLogOffset, - WatermarkKind.END); - } catch (InterruptedException e) { - LOG.error("Send signal event error.", e); - errorHandler.setProducerThrowable( - new DebeziumException("Error processing redo log signal event", e)); - } - // tell fetcher the redo log task finished - ((OracleScanFetchTask.OracleSnapshotRedoLogSplitChangeEventSourceContext) - context) - .finished(); - } - } - } - - private boolean isBoundedRead() { - return !NO_STOPPING_OFFSET.equals(redoLogSplit.getEndingOffset()); - } - - public static RedoLogOffset getCurrentRedoLogOffset(Map offset) { - Map offsetStrMap = new HashMap<>(); - for (Map.Entry entry : offset.entrySet()) { - offsetStrMap.put( - entry.getKey(), - entry.getValue() == null ? null : entry.getValue().toString()); - } - return new RedoLogOffset(offsetStrMap); + protected LogMinerEventProcessor createProcessor( + ChangeEventSourceContext context, + OraclePartition partition, + OracleOffsetContext offsetContext) { + return EventProcessorFactory.createProcessor( + context, + connectorConfig, + connection, + dispatcher, + partition, + offsetContext, + schema, + metrics, + errorHandler, + redoLogSplit); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index 216c1e313..ff3c4f288 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -63,9 +63,11 @@ import static io.debezium.connector.oracle.logminer.LogMinerHelper.logError; import static io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining; /** - * Copied from Debezium 1.9.7. Diff: added afterHandleScn() method. A {@link - * StreamingChangeEventSource} based on Oracle's LogMiner utility. The event handler loop is - * executed in a separate executor. + * Copied from Debezium 1.9.7. A {@link StreamingChangeEventSource} based on Oracle's LogMiner + * utility. The event handler loop is executed in a separate executor. + * + *

Diff: Make createProcessor method as protected to produce a LogMinerEventProcessor with + * enhanced processRow method to distinguish whether is bounded. */ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventSource { @@ -251,8 +253,6 @@ public class LogMinerStreamingChangeEventSource } pauseBetweenMiningSessions(); } - - afterHandleScn(partition, offsetContext); } } } @@ -266,8 +266,6 @@ public class LogMinerStreamingChangeEventSource } } - protected void afterHandleScn(OraclePartition partition, OracleOffsetContext offsetContext) {} - /** * Computes the start SCN for the first mining session. * @@ -361,7 +359,7 @@ public class LogMinerStreamingChangeEventSource format.format(sessionProcessGlobalAreaMaxMemory / 1024.f / 1024.f)); } - private LogMinerEventProcessor createProcessor( + protected LogMinerEventProcessor createProcessor( ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java index c31b46342..dd5572667 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/source/OracleSourceITCase.java @@ -65,6 +65,7 @@ import static org.apache.flink.util.Preconditions.checkState; public class OracleSourceITCase extends OracleSourceTestBase { private static final int USE_POST_LOWWATERMARK_HOOK = 1; private static final int USE_PRE_HIGHWATERMARK_HOOK = 2; + private static final int USE_POST_HIGHWATERMARK_HOOK = 3; private static final Logger LOG = LoggerFactory.getLogger(OracleSourceITCase.class); @@ -194,6 +195,39 @@ public class OracleSourceITCase extends OracleSourceTestBase { assertEqualsInAnyOrder(expectedRecords, records); } + @Test + public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception { + List records = + testBackfillWhenWritingEvents(false, 21, USE_POST_HIGHWATERMARK_HOOK); + + List expectedRecords = + Arrays.asList( + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]"); + // when enable backfill, the wal log between [low_watermark, snapshot) will be applied + // as snapshot image + assertEqualsInAnyOrder(expectedRecords, records); + } + @Test public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { List records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK); @@ -329,10 +363,16 @@ public class OracleSourceITCase extends OracleSourceTestBase { } }; - if (hookType == USE_POST_LOWWATERMARK_HOOK) { - hooks.setPostLowWatermarkAction(snapshotPhaseHook); - } else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) { - hooks.setPreHighWatermarkAction(snapshotPhaseHook); + switch (hookType) { + case USE_POST_LOWWATERMARK_HOOK: + hooks.setPostLowWatermarkAction(snapshotPhaseHook); + break; + case USE_PRE_HIGHWATERMARK_HOOK: + hooks.setPreHighWatermarkAction(snapshotPhaseHook); + break; + case USE_POST_HIGHWATERMARK_HOOK: + hooks.setPostHighWatermarkAction(snapshotPhaseHook); + break; } source.setSnapshotHooks(hooks);