[cdc-connector][oracledb] Fix oracle read exceed high_watermark in backfill phase

pull/2963/head
Hongshun Wang 1 year ago committed by gongzhongqiang
parent 1b643026fe
commit 15a734efa0

@ -0,0 +1,282 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.oracle.source.reader.fetch;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import static com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset.NO_STOPPING_OFFSET;
/**
* Factory to produce a LogMinerEventProcessor with enhanced processRow method to distinguish
* whether is bounded.
*/
@Internal
public class EventProcessorFactory {
private static final Logger LOG = LoggerFactory.getLogger(EventProcessorFactory.class);
private EventProcessorFactory() {}
public static LogMinerEventProcessor createProcessor(
ChangeEventSource.ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics,
ErrorHandler errorHandler,
StreamSplit redoLogSplit) {
final OracleConnectorConfig.LogMiningBufferType bufferType =
connectorConfig.getLogMiningBufferType();
if (bufferType.equals(OracleConnectorConfig.LogMiningBufferType.MEMORY)) {
return new CDCMemoryLogMinerEventProcessor(
context,
connectorConfig,
jdbcConnection,
dispatcher,
partition,
offsetContext,
schema,
metrics,
errorHandler,
redoLogSplit);
} else if (bufferType.equals(
OracleConnectorConfig.LogMiningBufferType.INFINISPAN_EMBEDDED)) {
return new CDCEmbeddedInfinispanLogMinerEventProcessor(
context,
connectorConfig,
jdbcConnection,
dispatcher,
partition,
offsetContext,
schema,
metrics,
errorHandler,
redoLogSplit);
} else if (bufferType.equals(OracleConnectorConfig.LogMiningBufferType.INFINISPAN_REMOTE)) {
return new CDCRemoteInfinispanLogMinerEventProcessor(
context,
connectorConfig,
jdbcConnection,
dispatcher,
partition,
offsetContext,
schema,
metrics,
errorHandler,
redoLogSplit);
} else {
throw new IllegalArgumentException(
"not support this type of bufferType: " + bufferType);
}
}
/**
* A {@link MemoryLogMinerEventProcessor} with enhanced processRow method to distinguish whether
* is bounded.
*/
public static class CDCMemoryLogMinerEventProcessor extends MemoryLogMinerEventProcessor {
private final StreamSplit redoLogSplit;
private final ErrorHandler errorHandler;
private ChangeEventSource.ChangeEventSourceContext context;
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
public CDCMemoryLogMinerEventProcessor(
ChangeEventSource.ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics,
ErrorHandler errorHandler,
StreamSplit redoLogSplit) {
super(
context,
connectorConfig,
jdbcConnection,
dispatcher,
partition,
offsetContext,
schema,
metrics);
this.redoLogSplit = redoLogSplit;
this.errorHandler = errorHandler;
this.context = context;
this.dispatcher = dispatcher;
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row)
throws SQLException, InterruptedException {
super.processRow(partition, row);
afterRowProcess(partition, row, redoLogSplit, errorHandler, dispatcher, context);
}
}
/**
* A {@link EmbeddedInfinispanLogMinerEventProcessor} with enhanced processRow method to
* distinguish whether is bounded.
*/
public static class CDCEmbeddedInfinispanLogMinerEventProcessor
extends EmbeddedInfinispanLogMinerEventProcessor {
private final StreamSplit redoLogSplit;
private final ErrorHandler errorHandler;
private ChangeEventSource.ChangeEventSourceContext context;
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
public CDCEmbeddedInfinispanLogMinerEventProcessor(
ChangeEventSource.ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics,
ErrorHandler errorHandler,
StreamSplit redoLogSplit) {
super(
context,
connectorConfig,
jdbcConnection,
dispatcher,
partition,
offsetContext,
schema,
metrics);
this.redoLogSplit = redoLogSplit;
this.errorHandler = errorHandler;
this.context = context;
this.dispatcher = dispatcher;
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row)
throws SQLException, InterruptedException {
super.processRow(partition, row);
afterRowProcess(partition, row, redoLogSplit, errorHandler, dispatcher, context);
}
}
/**
* A {@link CDCRemoteInfinispanLogMinerEventProcessor} with enhanced processRow method to
* distinguish whether is bounded.
*/
public static class CDCRemoteInfinispanLogMinerEventProcessor
extends RemoteInfinispanLogMinerEventProcessor {
private final StreamSplit redoLogSplit;
private final ErrorHandler errorHandler;
private ChangeEventSource.ChangeEventSourceContext context;
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
public CDCRemoteInfinispanLogMinerEventProcessor(
ChangeEventSource.ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
JdbcSourceEventDispatcher<OraclePartition> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics,
ErrorHandler errorHandler,
StreamSplit redoLogSplit) {
super(
context,
connectorConfig,
jdbcConnection,
dispatcher,
partition,
offsetContext,
schema,
metrics);
this.redoLogSplit = redoLogSplit;
this.errorHandler = errorHandler;
this.context = context;
this.dispatcher = dispatcher;
}
@Override
protected void processRow(OraclePartition partition, LogMinerEventRow row)
throws SQLException, InterruptedException {
super.processRow(partition, row);
afterRowProcess(partition, row, redoLogSplit, errorHandler, dispatcher, context);
}
}
public static void afterRowProcess(
OraclePartition partition,
LogMinerEventRow row,
StreamSplit redoLogSplit,
ErrorHandler errorHandler,
JdbcSourceEventDispatcher dispatcher,
ChangeEventSource.ChangeEventSourceContext context) {
// check do we need to stop for fetch redo log for snapshot split.
if (isBoundedRead(redoLogSplit)) {
final RedoLogOffset currentRedoLogOffset = new RedoLogOffset(row.getScn().longValue());
// reach the high watermark, the redo log fetcher should be finished
if (currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) {
// send redo log end event
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
redoLogSplit,
currentRedoLogOffset,
WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing redo log signal event", e));
}
// tell fetcher the redo log task finished
((OracleScanFetchTask.OracleSnapshotRedoLogSplitChangeEventSourceContext) context)
.finished();
}
}
}
private static boolean isBoundedRead(StreamSplit redoLogSplit) {
return !NO_STOPPING_OFFSET.equals(redoLogSplit.getEndingOffset());
}
}

@ -16,13 +16,11 @@
package com.ververica.cdc.connectors.oracle.source.reader.fetch;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
@ -31,18 +29,15 @@ import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import static com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset.NO_STOPPING_OFFSET;
/** The task to work for fetching data of Oracle table stream split. */
@Internal
public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
private final StreamSplit split;
@ -100,6 +95,12 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;
private final ErrorHandler errorHandler;
private ChangeEventSourceContext context;
private final OracleConnectorConfig connectorConfig;
private final OracleConnection connection;
private final OracleDatabaseSchema schema;
private final OracleStreamingChangeEventSourceMetrics metrics;
public RedoLogSplitReadTask(
OracleConnectorConfig connectorConfig,
@ -122,6 +123,10 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
this.redoLogSplit = redoLogSplit;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.connectorConfig = connectorConfig;
this.connection = connection;
this.metrics = metrics;
this.schema = schema;
}
@Override
@ -133,48 +138,26 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> {
super.execute(context, partition, offsetContext);
}
/**
* Delegate {@link EventProcessorFactory} to produce a LogMinerEventProcessor with enhanced
* processRow method to distinguish whether is bounded.
*/
@Override
protected void afterHandleScn(
OraclePartition partition, OracleOffsetContext offsetContext) {
super.afterHandleScn(partition, offsetContext);
// check do we need to stop for fetch redo log for snapshot split.
if (isBoundedRead()) {
final RedoLogOffset currentRedoLogOffset =
getCurrentRedoLogOffset(offsetContext.getOffset());
// reach the high watermark, the redo log fetcher should be finished
if (currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) {
// send redo log end event
try {
dispatcher.dispatchWatermarkEvent(
partition.getSourcePartition(),
redoLogSplit,
currentRedoLogOffset,
WatermarkKind.END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing redo log signal event", e));
}
// tell fetcher the redo log task finished
((OracleScanFetchTask.OracleSnapshotRedoLogSplitChangeEventSourceContext)
context)
.finished();
}
}
}
private boolean isBoundedRead() {
return !NO_STOPPING_OFFSET.equals(redoLogSplit.getEndingOffset());
}
public static RedoLogOffset getCurrentRedoLogOffset(Map<String, ?> offset) {
Map<String, String> offsetStrMap = new HashMap<>();
for (Map.Entry<String, ?> entry : offset.entrySet()) {
offsetStrMap.put(
entry.getKey(),
entry.getValue() == null ? null : entry.getValue().toString());
}
return new RedoLogOffset(offsetStrMap);
protected LogMinerEventProcessor createProcessor(
ChangeEventSourceContext context,
OraclePartition partition,
OracleOffsetContext offsetContext) {
return EventProcessorFactory.createProcessor(
context,
connectorConfig,
connection,
dispatcher,
partition,
offsetContext,
schema,
metrics,
errorHandler,
redoLogSplit);
}
}

@ -63,9 +63,11 @@ import static io.debezium.connector.oracle.logminer.LogMinerHelper.logError;
import static io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining;
/**
* Copied from Debezium 1.9.7. Diff: added afterHandleScn() method. A {@link
* StreamingChangeEventSource} based on Oracle's LogMiner utility. The event handler loop is
* executed in a separate executor.
* Copied from Debezium 1.9.7. A {@link StreamingChangeEventSource} based on Oracle's LogMiner
* utility. The event handler loop is executed in a separate executor.
*
* <p>Diff: Make createProcessor method as protected to produce a LogMinerEventProcessor with
* enhanced processRow method to distinguish whether is bounded.
*/
public class LogMinerStreamingChangeEventSource
implements StreamingChangeEventSource<OraclePartition, OracleOffsetContext> {
@ -251,8 +253,6 @@ public class LogMinerStreamingChangeEventSource
}
pauseBetweenMiningSessions();
}
afterHandleScn(partition, offsetContext);
}
}
}
@ -266,8 +266,6 @@ public class LogMinerStreamingChangeEventSource
}
}
protected void afterHandleScn(OraclePartition partition, OracleOffsetContext offsetContext) {}
/**
* Computes the start SCN for the first mining session.
*
@ -361,7 +359,7 @@ public class LogMinerStreamingChangeEventSource
format.format(sessionProcessGlobalAreaMaxMemory / 1024.f / 1024.f));
}
private LogMinerEventProcessor createProcessor(
protected LogMinerEventProcessor createProcessor(
ChangeEventSourceContext context,
OraclePartition partition,
OracleOffsetContext offsetContext) {

@ -65,6 +65,7 @@ import static org.apache.flink.util.Preconditions.checkState;
public class OracleSourceITCase extends OracleSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
private static final Logger LOG = LoggerFactory.getLogger(OracleSourceITCase.class);
@ -194,6 +195,39 @@ public class OracleSourceITCase extends OracleSourceTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(false, 21, USE_POST_HIGHWATERMARK_HOOK);
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]");
// when enable backfill, the wal log between [low_watermark, snapshot) will be applied
// as snapshot image
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK);
@ -329,10 +363,16 @@ public class OracleSourceITCase extends OracleSourceTestBase {
}
};
if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
switch (hookType) {
case USE_POST_LOWWATERMARK_HOOK:
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
break;
case USE_PRE_HIGHWATERMARK_HOOK:
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
break;
case USE_POST_HIGHWATERMARK_HOOK:
hooks.setPostHighWatermarkAction(snapshotPhaseHook);
break;
}
source.setSnapshotHooks(hooks);

Loading…
Cancel
Save