[cdc-connector][base][tests] Introduce snapshot phase hooks for better snapshot test

pull/2793/head
Hongshun Wang 1 year ago committed by Leonard Xu
parent a771abef90
commit 13be14fe3f

@ -30,6 +30,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.common.annotation.Experimental;
import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.options.StartupMode;
@ -50,6 +51,7 @@ import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.relational.TableId;
@ -73,6 +75,12 @@ public class IncrementalSource<T, C extends SourceConfig>
protected final DebeziumDeserializationSchema<T> deserializationSchema;
protected final SourceSplitSerializer sourceSplitSerializer;
// Actions to perform during the snapshot phase.
// This field is introduced for testing purpose, for example testing if changes made in the
// snapshot phase are correctly backfilled into the snapshot by registering a pre high watermark
// hook for generating changes.
private SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty();
public IncrementalSource(
SourceConfig.Factory<C> configFactory,
DebeziumDeserializationSchema<T> deserializationSchema,
@ -111,7 +119,10 @@ public class IncrementalSource<T, C extends SourceConfig>
Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier =
() ->
new IncrementalSourceSplitReader<>(
readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig);
readerContext.getIndexOfSubtask(),
dataSourceDialect,
sourceConfig,
snapshotHooks);
return new IncrementalSourceReader<>(
elementsQueue,
splitReaderSupplier,
@ -205,4 +216,9 @@ public class IncrementalSource<T, C extends SourceConfig>
sourceConfig.isIncludeSchemaChanges(),
offsetFactory);
}
@VisibleForTesting
public void setSnapshotHooks(SnapshotPhaseHooks snapshotHooks) {
this.snapshotHooks = snapshotHooks;
}
}

@ -28,10 +28,12 @@ import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.base.source.reader.external.Fetcher;
import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,12 +59,18 @@ public class IncrementalSourceSplitReader<C extends SourceConfig>
private final DataSourceDialect<C> dataSourceDialect;
private final C sourceConfig;
private final SnapshotPhaseHooks snapshotHooks;
public IncrementalSourceSplitReader(
int subtaskId, DataSourceDialect<C> dataSourceDialect, C sourceConfig) {
int subtaskId,
DataSourceDialect<C> dataSourceDialect,
C sourceConfig,
SnapshotPhaseHooks snapshotHooks) {
this.subtaskId = subtaskId;
this.splits = new ArrayDeque<>();
this.dataSourceDialect = dataSourceDialect;
this.sourceConfig = sourceConfig;
this.snapshotHooks = snapshotHooks;
}
@Override
@ -113,16 +121,18 @@ public class IncrementalSourceSplitReader<C extends SourceConfig>
if (canAssignNextSplit()) {
final SourceSplitBase nextSplit = splits.poll();
if (nextSplit == null) {
throw new IOException("Cannot fetch from another split - no split remaining.");
}
currentSplitId = nextSplit.splitId();
FetchTask fetchTask = dataSourceDialect.createFetchTask(nextSplit);
if (nextSplit.isSnapshotSplit()) {
if (currentFetcher == null) {
final FetchTask.Context taskContext =
dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig);
currentFetcher = new IncrementalSourceScanFetcher(taskContext, subtaskId);
((AbstractScanFetchTask) fetchTask).setSnapshotPhaseHooks(snapshotHooks);
}
} else {
// point from snapshot split to stream split
@ -135,7 +145,7 @@ public class IncrementalSourceSplitReader<C extends SourceConfig>
currentFetcher = new IncrementalSourceStreamFetcher(taskContext, subtaskId);
LOG.info("Stream fetcher is created.");
}
currentFetcher.submitTask(dataSourceDialect.createFetchTask(nextSplit));
currentFetcher.submitTask(fetchTask);
}
}

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.base.source.reader.external;
import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
@ -23,6 +24,7 @@ import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,6 +37,7 @@ public abstract class AbstractScanFetchTask implements FetchTask {
protected volatile boolean taskRunning = false;
protected final SnapshotSplit snapshotSplit;
private SnapshotPhaseHooks snapshotPhaseHooks = SnapshotPhaseHooks.empty();
public AbstractScanFetchTask(SnapshotSplit snapshotSplit) {
this.snapshotSplit = snapshotSplit;
@ -49,23 +52,34 @@ public abstract class AbstractScanFetchTask implements FetchTask {
taskRunning = true;
if (snapshotPhaseHooks.getPreLowWatermarkAction() != null) {
snapshotPhaseHooks.getPreLowWatermarkAction().accept(sourceConfig, snapshotSplit);
}
final Offset lowWatermark = dialect.displayCurrentOffset(sourceConfig);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
lowWatermark,
snapshotSplit);
dispatchLowWaterMarkEvent(context, snapshotSplit, lowWatermark);
if (snapshotPhaseHooks.getPostLowWatermarkAction() != null) {
snapshotPhaseHooks.getPostLowWatermarkAction().accept(sourceConfig, snapshotSplit);
}
LOG.info("Snapshot step 2 - Snapshotting data");
executeDataSnapshot(context);
if (snapshotPhaseHooks.getPreHighWatermarkAction() != null) {
snapshotPhaseHooks.getPreHighWatermarkAction().accept(sourceConfig, snapshotSplit);
}
Offset highWatermark = dialect.displayCurrentOffset(sourceConfig);
LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
dispatchHighWaterMarkEvent(context, snapshotSplit, highWatermark);
if (snapshotPhaseHooks.getPostHighWatermarkAction() != null) {
snapshotPhaseHooks.getPostHighWatermarkAction().accept(sourceConfig, snapshotSplit);
}
// optimization that skip the stream read when the low watermark equals high watermark
final StreamSplit backfillStreamSplit =
@ -182,4 +196,9 @@ public abstract class AbstractScanFetchTask implements FetchTask {
public void close() {
taskRunning = false;
}
@VisibleForTesting
public void setSnapshotPhaseHooks(SnapshotPhaseHooks snapshotPhaseHooks) {
this.snapshotPhaseHooks = snapshotPhaseHooks;
}
}

@ -0,0 +1,37 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.base.source.utils.hooks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.util.function.BiConsumerWithException;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import java.io.Serializable;
import java.sql.SQLException;
/**
* Hook to be invoked during different stages in the snapshot phase.
*
* <p>Please note that implementations should be serializable in order to be used in integration
* tests, as the hook need to be serialized together with the source on job submission.
*/
@Internal
@FunctionalInterface
public interface SnapshotPhaseHook
extends BiConsumerWithException<SourceConfig, SourceSplit, SQLException>, Serializable {}

@ -0,0 +1,80 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.base.source.utils.hooks;
import javax.annotation.Nullable;
import java.io.Serializable;
/**
* A container class for hooks applied in the snapshot phase, including:
*
* <ul>
* <li>{@link #preHighWatermarkAction}: Hook to run before emitting high watermark, which is for
* testing whether binlog events created within snapshot phase are backfilled correctly.
* <li>{@link #postHighWatermarkAction}: Hook to run after emitting high watermark, which is for
* testing actions handling binlog events between snapshot splits.
* </ul>
*/
public class SnapshotPhaseHooks implements Serializable {
private static final long serialVersionUID = 1L;
private SnapshotPhaseHook preLowWatermarkAction;
private SnapshotPhaseHook postLowWatermarkAction;
private SnapshotPhaseHook preHighWatermarkAction;
private SnapshotPhaseHook postHighWatermarkAction;
public void setPreHighWatermarkAction(SnapshotPhaseHook preHighWatermarkAction) {
this.preHighWatermarkAction = preHighWatermarkAction;
}
public void setPostHighWatermarkAction(SnapshotPhaseHook postHighWatermarkAction) {
this.postHighWatermarkAction = postHighWatermarkAction;
}
public void setPreLowWatermarkAction(SnapshotPhaseHook preLowWatermarkAction) {
this.preLowWatermarkAction = preLowWatermarkAction;
}
public void setPostLowWatermarkAction(SnapshotPhaseHook postLowWatermarkAction) {
this.postLowWatermarkAction = postLowWatermarkAction;
}
@Nullable
public SnapshotPhaseHook getPreHighWatermarkAction() {
return preHighWatermarkAction;
}
@Nullable
public SnapshotPhaseHook getPostHighWatermarkAction() {
return postHighWatermarkAction;
}
@Nullable
public SnapshotPhaseHook getPreLowWatermarkAction() {
return preLowWatermarkAction;
}
@Nullable
public SnapshotPhaseHook getPostLowWatermarkAction() {
return postLowWatermarkAction;
}
public static SnapshotPhaseHooks empty() {
return new SnapshotPhaseHooks();
}
}

@ -23,7 +23,6 @@ import com.ververica.cdc.common.annotation.Experimental;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.experimental.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.base.experimental.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.base.experimental.fetch.MySqlScanFetchTask;
import com.ververica.cdc.connectors.base.experimental.fetch.MySqlSourceFetchTaskContext;
import com.ververica.cdc.connectors.base.experimental.fetch.MySqlStreamFetchTask;
@ -59,13 +58,11 @@ public class MySqlDialect implements JdbcDataSourceDialect {
private static final String QUOTED_CHARACTER = "`";
private static final long serialVersionUID = 1L;
private final MySqlSourceConfigFactory configFactory;
private final MySqlSourceConfig sourceConfig;
private transient MySqlSchema mySqlSchema;
public MySqlDialect(MySqlSourceConfigFactory configFactory) {
this.configFactory = configFactory;
this.sourceConfig = configFactory.create(0);
public MySqlDialect(MySqlSourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
}
@Override

@ -235,7 +235,7 @@ public class MySqlSourceBuilder<T> {
*/
public MySqlIncrementalSource<T> build() {
this.offsetFactory = new BinlogOffsetFactory();
this.dialect = new MySqlDialect(configFactory);
this.dialect = new MySqlDialect(configFactory.create(0));
return new MySqlIncrementalSource<>(
configFactory, checkNotNull(deserializer), offsetFactory, dialect);
}

@ -22,6 +22,7 @@ import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.SampleBucketSplitStrategy;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.ShardedSplitStrategy;
@ -114,7 +115,8 @@ public class MongoDBSnapshotSplitReaderTest extends MongoDBSourceTestBase {
assertTrue(snapshotSplits.size() > 0);
IncrementalSourceSplitReader<MongoDBSourceConfig> snapshotSplitReader =
new IncrementalSourceSplitReader<>(0, dialect, sourceConfig);
new IncrementalSourceSplitReader<>(
0, dialect, sourceConfig, SnapshotPhaseHooks.empty());
int retry = 0;
long actualCount = 0;

@ -24,6 +24,7 @@ import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
@ -118,7 +119,8 @@ public class MongoDBStreamSplitReaderTest extends MongoDBSourceTestBase {
@Test
public void testStreamSplitReader() throws Exception {
IncrementalSourceSplitReader<MongoDBSourceConfig> streamSplitReader =
new IncrementalSourceSplitReader<>(0, dialect, sourceConfig);
new IncrementalSourceSplitReader<>(
0, dialect, sourceConfig, SnapshotPhaseHooks.empty());
try {
ChangeStreamOffset startOffset = new ChangeStreamOffset(startupResumeToken);

@ -31,6 +31,7 @@ import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlConnectorConfig;
@ -78,6 +79,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReader.class);
private final StatefulTaskContext statefulTaskContext;
private final ExecutorService executorService;
private final SnapshotPhaseHooks hooks;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile boolean currentTaskRunning;
@ -92,7 +94,8 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
private static final long READER_CLOSE_TIMEOUT = 30L;
public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
public SnapshotSplitReader(
StatefulTaskContext statefulTaskContext, int subtaskId, SnapshotPhaseHooks hooks) {
this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder()
@ -101,11 +104,16 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
(thread, throwable) -> setReadException(throwable))
.build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.hooks = hooks;
this.currentTaskRunning = false;
this.hasNextElement = new AtomicBoolean(false);
this.reachEnd = new AtomicBoolean(false);
}
public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
this(statefulTaskContext, subtaskId, SnapshotPhaseHooks.empty());
}
@Override
public void submitSplit(MySqlSplit mySqlSplit) {
this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
@ -124,7 +132,8 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
statefulTaskContext.getTopicSelector(),
statefulTaskContext.getSnapshotReceiver(),
StatefulTaskContext.getClock(),
currentSnapshotSplit);
currentSnapshotSplit,
hooks);
executorService.execute(
() -> {
try {

@ -22,6 +22,7 @@ import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.utils.StatementUtils;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
@ -78,6 +79,8 @@ public class MySqlSnapshotSplitReadTask
private final EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver;
private final SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics;
private final SnapshotPhaseHooks hooks;
public MySqlSnapshotSplitReadTask(
MySqlConnectorConfig connectorConfig,
SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics,
@ -87,7 +90,8 @@ public class MySqlSnapshotSplitReadTask
TopicSelector<TableId> topicSelector,
EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver,
Clock clock,
MySqlSnapshotSplit snapshotSplit) {
MySqlSnapshotSplit snapshotSplit,
SnapshotPhaseHooks hooks) {
super(connectorConfig, snapshotChangeEventSourceMetrics);
this.connectorConfig = connectorConfig;
this.databaseSchema = databaseSchema;
@ -98,6 +102,7 @@ public class MySqlSnapshotSplitReadTask
this.topicSelector = topicSelector;
this.snapshotReceiver = snapshotReceiver;
this.snapshotChangeEventSourceMetrics = snapshotChangeEventSourceMetrics;
this.hooks = hooks;
}
@Override
@ -139,6 +144,9 @@ public class MySqlSnapshotSplitReadTask
topicSelector.topicNameFor(snapshotSplit.getTableId()),
dispatcher.getQueue());
if (hooks.getPreLowWatermarkAction() != null) {
hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit);
}
final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 1 - Determining low watermark {} for split {}",
@ -149,9 +157,17 @@ public class MySqlSnapshotSplitReadTask
signalEventDispatcher.dispatchWatermarkEvent(
snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);
if (hooks.getPostLowWatermarkAction() != null) {
hooks.getPostLowWatermarkAction().accept(jdbcConnection, snapshotSplit);
}
LOG.info("Snapshot step 2 - Snapshotting data");
createDataEvents(ctx, snapshotSplit.getTableId());
if (hooks.getPreHighWatermarkAction() != null) {
hooks.getPreHighWatermarkAction().accept(jdbcConnection, snapshotSplit);
}
final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);
LOG.info(
"Snapshot step 3 - Determining high watermark {} for split {}",
@ -162,6 +178,9 @@ public class MySqlSnapshotSplitReadTask
((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))
.setHighWatermark(highWatermark);
if (hooks.getPostHighWatermarkAction() != null) {
hooks.getPostHighWatermarkAction().accept(jdbcConnection, snapshotSplit);
}
return SnapshotResult.completed(ctx.offset);
}

@ -32,6 +32,7 @@ import org.apache.flink.util.FlinkRuntimeException;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.annotation.VisibleForTesting;
import com.ververica.cdc.connectors.mysql.MySqlValidator;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
@ -52,6 +53,7 @@ import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.jdbc.JdbcConnection;
@ -101,6 +103,12 @@ public class MySqlSource<T>
private final MySqlSourceConfigFactory configFactory;
private final DebeziumDeserializationSchema<T> deserializationSchema;
// Actions to perform during the snapshot phase.
// This field is introduced for testing purpose, for example testing if changes made in the
// snapshot phase are correctly backfilled into the snapshot by registering a pre high watermark
// hook for generating changes.
private SnapshotPhaseHooks snapshotHooks = SnapshotPhaseHooks.empty();
/**
* Get a MySqlParallelSourceBuilder to build a {@link MySqlSource}.
*
@ -150,7 +158,8 @@ public class MySqlSource<T>
new MySqlSplitReader(
sourceConfig,
readerContext.getIndexOfSubtask(),
mySqlSourceReaderContext);
mySqlSourceReaderContext,
snapshotHooks);
return new MySqlSourceReader<>(
elementsQueue,
splitReaderSupplier,
@ -230,4 +239,9 @@ public class MySqlSource<T>
public TypeInformation<T> getProducedType() {
return deserializationSchema.getProducedType();
}
@VisibleForTesting
public void setSnapshotHooks(SnapshotPhaseHooks snapshotHooks) {
this.snapshotHooks = snapshotHooks;
}
}

@ -33,6 +33,7 @@ import com.ververica.cdc.connectors.mysql.source.split.MySqlRecords;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import io.debezium.connector.mysql.MySqlConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,18 +60,24 @@ public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit>
private final int subtaskId;
private final MySqlSourceReaderContext context;
private final SnapshotPhaseHooks snapshotHooks;
@Nullable private String currentSplitId;
@Nullable private DebeziumReader<SourceRecords, MySqlSplit> currentReader;
@Nullable private SnapshotSplitReader reusedSnapshotReader;
@Nullable private BinlogSplitReader reusedBinlogReader;
public MySqlSplitReader(
MySqlSourceConfig sourceConfig, int subtaskId, MySqlSourceReaderContext context) {
MySqlSourceConfig sourceConfig,
int subtaskId,
MySqlSourceReaderContext context,
SnapshotPhaseHooks snapshotHooks) {
this.sourceConfig = sourceConfig;
this.subtaskId = subtaskId;
this.snapshotSplits = new ArrayDeque<>();
this.binlogSplits = new ArrayDeque<>(1);
this.context = context;
this.snapshotHooks = snapshotHooks;
}
@Override
@ -231,7 +238,8 @@ public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit>
createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
reusedSnapshotReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
reusedSnapshotReader =
new SnapshotSplitReader(statefulTaskContext, subtaskId, snapshotHooks);
}
return reusedSnapshotReader;
}

@ -0,0 +1,37 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.utils.hooks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.util.function.BiConsumerWithException;
import io.debezium.connector.mysql.MySqlConnection;
import java.io.Serializable;
import java.sql.SQLException;
/**
* Hook to be invoked during different stages in the snapshot phase.
*
* <p>Please note that implementations should be serializable in order to be used in integration
* tests, as the hook need to be serialized together with the source on job submission.
*/
@Internal
@FunctionalInterface
public interface SnapshotPhaseHook
extends BiConsumerWithException<MySqlConnection, SourceSplit, SQLException>, Serializable {}

@ -0,0 +1,80 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.utils.hooks;
import javax.annotation.Nullable;
import java.io.Serializable;
/**
* A container class for hooks applied in the snapshot phase, including:
*
* <ul>
* <li>{@link #preHighWatermarkAction}: Hook to run before emitting high watermark, which is for
* testing whether binlog events created within snapshot phase are backfilled correctly.
* <li>{@link #postHighWatermarkAction}: Hook to run after emitting high watermark, which is for
* testing actions handling binlog events between snapshot splits.
* </ul>
*/
public class SnapshotPhaseHooks implements Serializable {
private static final long serialVersionUID = 1L;
private SnapshotPhaseHook preLowWatermarkAction;
private SnapshotPhaseHook postLowWatermarkAction;
private SnapshotPhaseHook preHighWatermarkAction;
private SnapshotPhaseHook postHighWatermarkAction;
public void setPreHighWatermarkAction(SnapshotPhaseHook preHighWatermarkAction) {
this.preHighWatermarkAction = preHighWatermarkAction;
}
public void setPostHighWatermarkAction(SnapshotPhaseHook postHighWatermarkAction) {
this.postHighWatermarkAction = postHighWatermarkAction;
}
public void setPreLowWatermarkAction(SnapshotPhaseHook preLowWatermarkAction) {
this.preLowWatermarkAction = preLowWatermarkAction;
}
public void setPostLowWatermarkAction(SnapshotPhaseHook postLowWatermarkAction) {
this.postLowWatermarkAction = postLowWatermarkAction;
}
@Nullable
public SnapshotPhaseHook getPreHighWatermarkAction() {
return preHighWatermarkAction;
}
@Nullable
public SnapshotPhaseHook getPostHighWatermarkAction() {
return postHighWatermarkAction;
}
@Nullable
public SnapshotPhaseHook getPreLowWatermarkAction() {
return preLowWatermarkAction;
}
@Nullable
public SnapshotPhaseHook getPostLowWatermarkAction() {
return postLowWatermarkAction;
}
public static SnapshotPhaseHooks empty() {
return new SnapshotPhaseHooks();
}
}

@ -30,18 +30,12 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.data.Envelope;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionSchema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -53,7 +47,6 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotNull;
@ -304,11 +297,14 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
};
StatefulTaskContext statefulTaskContext =
new MakeBinlogEventTaskContext(
sourceConfig,
binaryLogClient,
mySqlConnection,
() -> executeSql(sourceConfig, changingDataSql));
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
snapshotHooks.setPreHighWatermarkAction(
(mySqlConnection, split) -> {
mySqlConnection.execute(changingDataSql);
mySqlConnection.commit();
});
final DataType dataType =
DataTypes.ROW(
@ -334,7 +330,11 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
List<String> actual =
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
mySqlSplits,
statefulTaskContext,
mySqlSplits.size(),
dataType,
snapshotHooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@ -357,11 +357,13 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
};
StatefulTaskContext statefulTaskContext =
new MakeBinlogEventTaskContext(
sourceConfig,
binaryLogClient,
mySqlConnection,
() -> executeSql(sourceConfig, insertDataSql));
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
snapshotHooks.setPostLowWatermarkAction(
(mySqlConnection, split) -> {
mySqlConnection.execute(insertDataSql);
mySqlConnection.commit();
});
final DataType dataType =
DataTypes.ROW(
@ -389,7 +391,11 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
List<String> actual =
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
mySqlSplits,
statefulTaskContext,
mySqlSplits.size(),
dataType,
snapshotHooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
executeSql(sourceConfig, recoveryDataSql);
}
@ -413,11 +419,13 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
};
StatefulTaskContext statefulTaskContext =
new MakeBinlogEventTaskContext(
sourceConfig,
binaryLogClient,
mySqlConnection,
() -> executeSql(sourceConfig, deleteDataSql));
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
snapshotHooks.setPreHighWatermarkAction(
(mySqlConnection, split) -> {
mySqlConnection.execute(deleteDataSql);
mySqlConnection.commit();
});
final DataType dataType =
DataTypes.ROW(
@ -441,7 +449,11 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
List<String> actual =
readTableSnapshotSplits(
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType);
mySqlSplits,
statefulTaskContext,
mySqlSplits.size(),
dataType,
snapshotHooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
executeSql(sourceConfig, recoveryDataSql);
}
@ -450,9 +462,11 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
List<MySqlSplit> mySqlSplits,
StatefulTaskContext statefulTaskContext,
int scanSplitsNum,
DataType dataType)
DataType dataType,
SnapshotPhaseHooks snapshotHooks)
throws Exception {
SnapshotSplitReader snapshotSplitReader = new SnapshotSplitReader(statefulTaskContext, 0);
SnapshotSplitReader snapshotSplitReader =
new SnapshotSplitReader(statefulTaskContext, 0, snapshotHooks);
List<SourceRecord> result = new ArrayList<>();
for (int i = 0; i < scanSplitsNum; i++) {
@ -477,6 +491,20 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
return formatResult(result, dataType);
}
private List<String> readTableSnapshotSplits(
List<MySqlSplit> mySqlSplits,
StatefulTaskContext statefulTaskContext,
int scanSplitsNum,
DataType dataType)
throws Exception {
return readTableSnapshotSplits(
mySqlSplits,
statefulTaskContext,
scanSplitsNum,
dataType,
SnapshotPhaseHooks.empty());
}
private List<String> formatResult(List<SourceRecord> records, DataType dataType) {
final RecordsFormatter formatter = new RecordsFormatter(dataType);
return formatter.format(records);
@ -540,47 +568,4 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
}
return true;
}
static class MakeBinlogEventTaskContext extends StatefulTaskContext {
private final Supplier<Boolean> makeBinlogFunction;
public MakeBinlogEventTaskContext(
MySqlSourceConfig sourceConfig,
BinaryLogClient binaryLogClient,
MySqlConnection connection,
Supplier<Boolean> makeBinlogFunction) {
super(sourceConfig, binaryLogClient, connection);
this.makeBinlogFunction = makeBinlogFunction;
}
@Override
public EventDispatcher.SnapshotReceiver<MySqlPartition> getSnapshotReceiver() {
EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver =
super.getSnapshotReceiver();
return new EventDispatcher.SnapshotReceiver<MySqlPartition>() {
@Override
public void changeRecord(
MySqlPartition partition,
DataCollectionSchema schema,
Envelope.Operation operation,
Object key,
Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
snapshotReceiver.changeRecord(
partition, schema, operation, key, value, offset, headers);
}
@Override
public void completeSnapshot() throws InterruptedException {
snapshotReceiver.completeSnapshot();
// make binlog events
makeBinlogFunction.get();
}
};
}
}
}

@ -49,6 +49,7 @@ import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
@ -375,7 +376,7 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
final CountDownLatch updatingExecuted = new CountDownLatch(1);
TestingReaderContext testingReaderContext = new TestingReaderContext();
MySqlSourceReader<SourceRecord> reader =
createReader(sourceConfig, testingReaderContext, 0);
createReader(sourceConfig, testingReaderContext, 0, SnapshotPhaseHooks.empty());
reader.start();
Thread updateWorker =
@ -440,11 +441,15 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
private MySqlSourceReader<SourceRecord> createReader(MySqlSourceConfig configuration, int limit)
throws Exception {
return createReader(configuration, new TestingReaderContext(), limit);
return createReader(
configuration, new TestingReaderContext(), limit, SnapshotPhaseHooks.empty());
}
private MySqlSourceReader<SourceRecord> createReader(
MySqlSourceConfig configuration, SourceReaderContext readerContext, int limit)
MySqlSourceConfig configuration,
SourceReaderContext readerContext,
int limit,
SnapshotPhaseHooks snapshotHooks)
throws Exception {
final FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>();
@ -467,7 +472,7 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
new MySqlSourceReaderContext(readerContext);
return new MySqlSourceReader<>(
elementsQueue,
() -> createSplitReader(configuration, mySqlSourceReaderContext),
() -> createSplitReader(configuration, mySqlSourceReaderContext, snapshotHooks),
recordEmitter,
readerContext.getConfiguration(),
mySqlSourceReaderContext,
@ -475,8 +480,10 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
}
private MySqlSplitReader createSplitReader(
MySqlSourceConfig configuration, MySqlSourceReaderContext readerContext) {
return new MySqlSplitReader(configuration, 0, readerContext);
MySqlSourceConfig configuration,
MySqlSourceReaderContext readerContext,
SnapshotPhaseHooks snapshotHooks) {
return new MySqlSplitReader(configuration, 0, readerContext, snapshotHooks);
}
private void makeBinlogEventsInOneTransaction(MySqlSourceConfig sourceConfig, String tableId)

@ -28,7 +28,6 @@ import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask;
@ -52,15 +51,8 @@ import static com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionU
public class OracleDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;
private final OracleSourceConfigFactory configFactory;
private final OracleSourceConfig sourceConfig;
private transient OracleSchema oracleSchema;
public OracleDialect(OracleSourceConfigFactory configFactory) {
this.configFactory = configFactory;
this.sourceConfig = configFactory.create(0);
}
@Override
public String getName() {
return "Oracle";

@ -233,7 +233,7 @@ public class OracleSourceBuilder<T> {
*/
public OracleIncrementalSource<T> build() {
this.offsetFactory = new RedoLogOffsetFactory();
this.dialect = new OracleDialect(configFactory);
this.dialect = new OracleDialect();
return new OracleIncrementalSource<T>(
configFactory, checkNotNull(deserializer), offsetFactory, dialect);
}

@ -28,7 +28,6 @@ import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import com.ververica.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask;
import com.ververica.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
import com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask;
@ -70,8 +69,8 @@ public class PostgresDialect implements JdbcDataSourceDialect {
@Nullable private PostgresStreamFetchTask streamFetchTask;
public PostgresDialect(PostgresSourceConfigFactory configFactory) {
this.sourceConfig = configFactory.create(0);
public PostgresDialect(PostgresSourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
}
@Override

@ -254,7 +254,7 @@ public class PostgresSourceBuilder<T> {
*/
public PostgresIncrementalSource<T> build() {
PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
PostgresDialect dialect = new PostgresDialect(configFactory);
PostgresDialect dialect = new PostgresDialect(configFactory.create(0));
return new PostgresIncrementalSource<>(
configFactory, checkNotNull(deserializer), offsetFactory, dialect);
}

@ -20,6 +20,8 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import com.ververica.cdc.connectors.postgres.testutils.UniqueDatabase;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
@ -48,7 +50,10 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Basic class for testing PostgreSQL source, this contains a PostgreSQL container which enables
@ -187,4 +192,32 @@ public abstract class PostgresTestBase extends AbstractTestBase {
}
}
}
protected PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
UniqueDatabase database, String schemaName, String tableName, int splitSize) {
PostgresSourceConfigFactory postgresSourceConfigFactory = new PostgresSourceConfigFactory();
postgresSourceConfigFactory.hostname(database.getHost());
postgresSourceConfigFactory.port(database.getDatabasePort());
postgresSourceConfigFactory.username(database.getUsername());
postgresSourceConfigFactory.password(database.getPassword());
postgresSourceConfigFactory.database(database.getDatabaseName());
postgresSourceConfigFactory.schemaList(new String[] {schemaName});
postgresSourceConfigFactory.tableList(schemaName + "." + tableName);
postgresSourceConfigFactory.splitSize(splitSize);
return postgresSourceConfigFactory;
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
}

@ -54,16 +54,9 @@ public class PostgresDialectTest extends PostgresTestBase {
// get table named 'customer.customers' from customDatabase which is actual in
// inventoryDatabase
PostgresSourceConfigFactory configFactoryOfCustomDatabase =
getMockPostgresSourceConfigFactory(
customDatabase.getHost(),
customDatabase.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
"customer",
"customers");
getMockPostgresSourceConfigFactory(customDatabase, "customer", "customers", 10);
PostgresDialect dialectOfcustomDatabase =
new PostgresDialect(configFactoryOfCustomDatabase);
new PostgresDialect(configFactoryOfCustomDatabase.create(0));
List<TableId> tableIdsOfcustomDatabase =
dialectOfcustomDatabase.discoverDataCollections(
configFactoryOfCustomDatabase.create(0));
@ -73,16 +66,9 @@ public class PostgresDialectTest extends PostgresTestBase {
// inventoryDatabase
// however, nothing is found
PostgresSourceConfigFactory configFactoryOfInventoryDatabase =
getMockPostgresSourceConfigFactory(
inventoryDatabase.getHost(),
inventoryDatabase.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"inventory",
"products");
getMockPostgresSourceConfigFactory(inventoryDatabase, "inventory", "products", 10);
PostgresDialect dialectOfInventoryDatabase =
new PostgresDialect(configFactoryOfInventoryDatabase);
new PostgresDialect(configFactoryOfInventoryDatabase.create(0));
List<TableId> tableIdsOfInventoryDatabase =
dialectOfInventoryDatabase.discoverDataCollections(
configFactoryOfInventoryDatabase.create(0));
@ -92,38 +78,12 @@ public class PostgresDialectTest extends PostgresTestBase {
// customDatabase
// however, something is found
PostgresSourceConfigFactory configFactoryOfInventoryDatabase2 =
getMockPostgresSourceConfigFactory(
inventoryDatabase.getHost(),
inventoryDatabase.getDatabasePort(),
inventoryDatabase.getUsername(),
inventoryDatabase.getPassword(),
inventoryDatabase.getDatabaseName(),
"customer",
"customers");
getMockPostgresSourceConfigFactory(inventoryDatabase, "customer", "customers", 10);
PostgresDialect dialectOfInventoryDatabase2 =
new PostgresDialect(configFactoryOfInventoryDatabase2);
new PostgresDialect(configFactoryOfInventoryDatabase2.create(0));
List<TableId> tableIdsOfInventoryDatabase2 =
dialectOfInventoryDatabase2.discoverDataCollections(
configFactoryOfInventoryDatabase2.create(0));
Assert.assertTrue(tableIdsOfInventoryDatabase2.isEmpty());
}
private static PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
String hostname,
int port,
String username,
String password,
String database,
String schemaName,
String tableName) {
PostgresSourceConfigFactory postgresSourceConfigFactory = new PostgresSourceConfigFactory();
postgresSourceConfigFactory.hostname(hostname);
postgresSourceConfigFactory.port(port);
postgresSourceConfigFactory.username(username);
postgresSourceConfigFactory.password(password);
postgresSourceConfigFactory.database(database);
postgresSourceConfigFactory.schemaList(new String[] {schemaName});
postgresSourceConfigFactory.tableList(schemaName + "." + tableName);
return postgresSourceConfigFactory;
}
}

@ -52,9 +52,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT;
/** Tests for Postgres Source based on incremental snapshot framework . */
@ -258,19 +255,6 @@ public class PostgresSourceExampleTest extends PostgresTestBase {
return connection;
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
private void makeWalEvents(PostgresConnection connection, String tableId) throws SQLException {
waitForReplicationSlotReady(connection);

@ -54,13 +54,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** IT tests for {@link PostgresSourceBuilder.PostgresIncrementalSource}. */
@ -660,19 +657,6 @@ public class PostgresSourceITCase extends PostgresTestBase {
miniCluster.startTaskManager();
}
public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
expected.stream().sorted().collect(Collectors.toList()),
actual.stream().sorted().collect(Collectors.toList()));
}
public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEquals(expected.size(), actual.size());
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0]));
}
private void waitUntilJobRunning(TableResult tableResult)
throws InterruptedException, ExecutionException {
do {

@ -0,0 +1,248 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.source.fetch;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.postgres.PostgresTestBase;
import com.ververica.cdc.connectors.postgres.source.PostgresDialect;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import com.ververica.cdc.connectors.postgres.testutils.RecordsFormatter;
import com.ververica.cdc.connectors.postgres.testutils.UniqueDatabase;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.relational.TableId;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** Tests for {@link PostgresScanFetchTask}. */
public class PostgresScanFetchTaskTest extends PostgresTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static String schemaName = "customer";
private static String tableName = "customers";
private final UniqueDatabase customDatabase =
new UniqueDatabase(
POSTGRES_CONTAINER,
"postgres",
"customer",
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
@Test
public void testChangingDataInSnapshotScan() throws Exception {
customDatabase.createAndInitialize();
String tableId = schemaName + "." + tableName;
String[] changingDataSql =
new String[] {
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO " + tableId + " VALUES(102, 'user_2','hangzhou','123567891234')",
"UPDATE " + tableId + " SET address = 'Shanghai' where id = 103",
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 110",
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 111",
};
String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, hangzhou, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Hangzhou, 123567891234]",
"+I[111, user_6, Hangzhou, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
};
List<String> actual =
getDataInSnapshotScan(
changingDataSql, schemaName, tableName, USE_PRE_HIGHWATERMARK_HOOK);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@Test
public void testInsertDataInSnapshotScan() throws Exception {
customDatabase.createAndInitialize();
String tableId = schemaName + "." + tableName;
String[] insertDataSql =
new String[] {
"INSERT INTO " + tableId + " VALUES(112, 'user_12','Shanghai','123567891234')",
"INSERT INTO " + tableId + " VALUES(113, 'user_13','Shanghai','123567891234')",
};
String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[112, user_12, Shanghai, 123567891234]",
"+I[113, user_13, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
};
List<String> actual =
getDataInSnapshotScan(
insertDataSql, schemaName, tableName, USE_POST_LOWWATERMARK_HOOK);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@Test
public void testDeleteDataInSnapshotScan() throws Exception {
customDatabase.createAndInitialize();
String tableId = schemaName + "." + tableName;
String[] deleteDataSql =
new String[] {
"DELETE FROM " + tableId + " where id = 101",
"DELETE FROM " + tableId + " where id = 102",
};
String[] expected =
new String[] {
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
};
List<String> actual =
getDataInSnapshotScan(
deleteDataSql, schemaName, tableName, USE_PRE_HIGHWATERMARK_HOOK);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
private List<String> getDataInSnapshotScan(
String[] changingDataSql, String schemaName, String tableName, int hookType)
throws Exception {
PostgresSourceConfigFactory sourceConfigFactory =
getMockPostgresSourceConfigFactory(customDatabase, schemaName, tableName, 10);
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0));
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
try (PostgresConnection postgresConnection = postgresDialect.openJdbcConnection()) {
SnapshotPhaseHook snapshotPhaseHook =
(postgresSourceConfig, split) -> {
postgresConnection.execute(changingDataSql);
postgresConnection.commit();
};
if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
}
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, postgresDialect);
PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect);
List<String> actual =
readTableSnapshotSplits(
snapshotSplits, postgresSourceFetchTaskContext, 1, dataType, hooks);
return actual;
}
}
private List<String> readTableSnapshotSplits(
List<SnapshotSplit> snapshotSplits,
PostgresSourceFetchTaskContext taskContext,
int scanSplitsNum,
DataType dataType,
SnapshotPhaseHooks snapshotPhaseHooks)
throws Exception {
IncrementalSourceScanFetcher sourceScanFetcher =
new IncrementalSourceScanFetcher(taskContext, 0);
List<SourceRecord> result = new ArrayList<>();
for (int i = 0; i < scanSplitsNum; i++) {
SnapshotSplit sqlSplit = snapshotSplits.get(i);
if (sourceScanFetcher.isFinished()) {
FetchTask<SourceSplitBase> fetchTask =
taskContext.getDataSourceDialect().createFetchTask(sqlSplit);
((AbstractScanFetchTask) fetchTask).setSnapshotPhaseHooks(snapshotPhaseHooks);
sourceScanFetcher.submitTask(fetchTask);
}
Iterator<SourceRecords> res;
while ((res = sourceScanFetcher.pollSplitRecords()) != null) {
while (res.hasNext()) {
SourceRecords sourceRecords = res.next();
result.addAll(sourceRecords.getSourceRecordList());
}
}
}
sourceScanFetcher.close();
assertNotNull(sourceScanFetcher.getExecutorService());
assertTrue(sourceScanFetcher.getExecutorService().isTerminated());
return formatResult(result, dataType);
}
private List<String> formatResult(List<SourceRecord> records, DataType dataType) {
final RecordsFormatter formatter = new RecordsFormatter(dataType);
return formatter.format(records);
}
private List<SnapshotSplit> getSnapshotSplits(
PostgresSourceConfig sourceConfig, JdbcDataSourceDialect sourceDialect) {
List<TableId> discoverTables = sourceDialect.discoverDataCollections(sourceConfig);
final ChunkSplitter chunkSplitter = sourceDialect.createChunkSplitter(sourceConfig);
List<SnapshotSplit> snapshotSplitList = new ArrayList<>();
for (TableId table : discoverTables) {
Collection<SnapshotSplit> snapshotSplits = chunkSplitter.generateSplits(table);
snapshotSplitList.addAll(snapshotSplits);
}
return snapshotSplitList;
}
}

@ -0,0 +1,100 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.postgres.testutils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import com.ververica.cdc.connectors.base.utils.SourceRecordUtils;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.kafka.connect.source.SourceRecord;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/** Formatter that formats the {@link org.apache.kafka.connect.source.SourceRecord} to String. */
public class RecordsFormatter {
private final DataType dataType;
private final ZoneId zoneId;
private TypeInformation<RowData> typeInfo;
private DebeziumDeserializationSchema<RowData> deserializationSchema;
private SimpleCollector collector;
private RowRowConverter rowRowConverter;
public RecordsFormatter(DataType dataType) {
this(dataType, ZoneId.of("UTC"));
}
public RecordsFormatter(DataType dataType, ZoneId zoneId) {
this.dataType = dataType;
this.zoneId = zoneId;
this.typeInfo =
(TypeInformation<RowData>) TypeConversions.fromDataTypeToLegacyInfo(dataType);
this.deserializationSchema =
RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType((RowType) dataType.getLogicalType())
.setResultTypeInfo(typeInfo)
.build();
this.collector = new SimpleCollector();
this.rowRowConverter = RowRowConverter.create(dataType);
rowRowConverter.open(Thread.currentThread().getContextClassLoader());
}
public List<String> format(List<SourceRecord> records) {
records.stream()
// Keep DataChangeEvent only
.filter(SourceRecordUtils::isDataChangeRecord)
.forEach(
r -> {
try {
deserializationSchema.deserialize(r, collector);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return collector.list.stream()
.map(rowRowConverter::toExternal)
.map(Row::toString)
.collect(Collectors.toList());
}
private static class SimpleCollector implements Collector<RowData> {
private List<RowData> list = new ArrayList<>();
@Override
public void collect(RowData record) {
list.add(record);
}
@Override
public void close() {
// do nothing
}
}
}

@ -219,7 +219,7 @@ public class SqlServerSourceBuilder<T> {
*/
public SqlServerIncrementalSource<T> build() {
this.offsetFactory = new LsnFactory();
this.dialect = new SqlServerDialect(configFactory);
this.dialect = new SqlServerDialect(configFactory.create(0));
return new SqlServerIncrementalSource<T>(
configFactory, checkNotNull(deserializer), offsetFactory, dialect);
}

@ -27,7 +27,6 @@ import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.sqlserver.source.config.SqlServerSourceConfig;
import com.ververica.cdc.connectors.sqlserver.source.config.SqlServerSourceConfigFactory;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask;
@ -53,8 +52,8 @@ public class SqlServerDialect implements JdbcDataSourceDialect {
private final SqlServerSourceConfig sourceConfig;
private transient SqlServerSchema sqlserverSchema;
public SqlServerDialect(SqlServerSourceConfigFactory configFactory) {
this.sourceConfig = configFactory.create(0);
public SqlServerDialect(SqlServerSourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
}
@Override

@ -19,12 +19,15 @@ package com.ververica.cdc.connectors.sqlserver.source.read.fetch;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
import com.ververica.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceTestBase;
import com.ververica.cdc.connectors.sqlserver.source.config.SqlServerSourceConfig;
import com.ververica.cdc.connectors.sqlserver.source.config.SqlServerSourceConfigFactory;
@ -32,16 +35,8 @@ import com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerDialect;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
import com.ververica.cdc.connectors.sqlserver.testutils.RecordsFormatter;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerPartition;
import io.debezium.data.Envelope;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionSchema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
@ -51,7 +46,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
@ -72,7 +66,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 10);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory.create(0));
String tableId = databaseName + "." + tableName;
String[] changingDataSql =
@ -85,13 +79,15 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
"UPDATE " + tableId + " SET address = 'Hangzhou' where id = 111",
};
MakeChangeEventTaskContext makeChangeEventTaskContext =
new MakeChangeEventTaskContext(
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPostHighWatermarkAction(
(dialect, split) -> executeSql(sourceConfig, changingDataSql));
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sourceConfig,
sqlServerDialect,
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
() -> executeSql(sourceConfig, changingDataSql));
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
final DataType dataType =
DataTypes.ROW(
@ -115,7 +111,8 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
};
List<String> actual =
readTableSnapshotSplits(snapshotSplits, makeChangeEventTaskContext, 1, dataType);
readTableSnapshotSplits(
snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@ -129,7 +126,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 10);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory.create(0));
String tableId = databaseName + "." + tableName;
String[] insertDataSql =
@ -138,13 +135,15 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
"INSERT INTO " + tableId + " VALUES(113, 'user_13','Shanghai','123567891234')",
};
MakeChangeEventTaskContext makeChangeEventTaskContext =
new MakeChangeEventTaskContext(
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPostHighWatermarkAction(
(dialect, split) -> executeSql(sourceConfig, insertDataSql));
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sourceConfig,
sqlServerDialect,
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
() -> executeSql(sourceConfig, insertDataSql));
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
final DataType dataType =
DataTypes.ROW(
@ -170,7 +169,8 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
};
List<String> actual =
readTableSnapshotSplits(snapshotSplits, makeChangeEventTaskContext, 1, dataType);
readTableSnapshotSplits(
snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@ -184,7 +184,7 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 10);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfigFactory.create(0));
String tableId = databaseName + "." + tableName;
String[] deleteDataSql =
@ -193,13 +193,15 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
"DELETE FROM " + tableId + " where id = 102",
};
MakeChangeEventTaskContext makeChangeEventTaskContext =
new MakeChangeEventTaskContext(
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPostLowWatermarkAction(
(dialect, split) -> executeSql(sourceConfig, deleteDataSql));
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sourceConfig,
sqlServerDialect,
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
() -> executeSql(sourceConfig, deleteDataSql));
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
final DataType dataType =
DataTypes.ROW(
@ -221,7 +223,8 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
};
List<String> actual =
readTableSnapshotSplits(snapshotSplits, makeChangeEventTaskContext, 1, dataType);
readTableSnapshotSplits(
snapshotSplits, sqlServerSourceFetchTaskContext, 1, dataType, hooks);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
@ -231,6 +234,17 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
int scanSplitsNum,
DataType dataType)
throws Exception {
return readTableSnapshotSplits(
snapshotSplits, taskContext, scanSplitsNum, dataType, SnapshotPhaseHooks.empty());
}
private List<String> readTableSnapshotSplits(
List<SnapshotSplit> snapshotSplits,
SqlServerSourceFetchTaskContext taskContext,
int scanSplitsNum,
DataType dataType,
SnapshotPhaseHooks snapshotPhaseHooks)
throws Exception {
IncrementalSourceScanFetcher sourceScanFetcher =
new IncrementalSourceScanFetcher(taskContext, 0);
@ -238,8 +252,10 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
for (int i = 0; i < scanSplitsNum; i++) {
SnapshotSplit sqlSplit = snapshotSplits.get(i);
if (sourceScanFetcher.isFinished()) {
sourceScanFetcher.submitTask(
taskContext.getDataSourceDialect().createFetchTask(sqlSplit));
FetchTask<SourceSplitBase> fetchTask =
taskContext.getDataSourceDialect().createFetchTask(sqlSplit);
((AbstractScanFetchTask) fetchTask).setSnapshotPhaseHooks(snapshotPhaseHooks);
sourceScanFetcher.submitTask(fetchTask);
}
Iterator<SourceRecords> res;
while ((res = sourceScanFetcher.pollSplitRecords()) != null) {
@ -305,49 +321,4 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {
}
return true;
}
static class MakeChangeEventTaskContext extends SqlServerSourceFetchTaskContext {
private final Supplier<Boolean> makeChangeEventFunction;
public MakeChangeEventTaskContext(
JdbcSourceConfig jdbcSourceConfig,
SqlServerDialect sqlServerDialect,
SqlServerConnection connection,
SqlServerConnection metaDataConnection,
Supplier<Boolean> makeChangeEventFunction) {
super(jdbcSourceConfig, sqlServerDialect, connection, metaDataConnection);
this.makeChangeEventFunction = makeChangeEventFunction;
}
@Override
public EventDispatcher.SnapshotReceiver<SqlServerPartition> getSnapshotReceiver() {
EventDispatcher.SnapshotReceiver<SqlServerPartition> snapshotReceiver =
super.getSnapshotReceiver();
return new EventDispatcher.SnapshotReceiver<SqlServerPartition>() {
@Override
public void changeRecord(
SqlServerPartition partition,
DataCollectionSchema schema,
Envelope.Operation operation,
Object key,
Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
snapshotReceiver.changeRecord(
partition, schema, operation, key, value, offset, headers);
}
@Override
public void completeSnapshot() throws InterruptedException {
snapshotReceiver.completeSnapshot();
// make change events
makeChangeEventFunction.get();
Thread.sleep(10 * 1000);
}
};
}
}
}

Loading…
Cancel
Save