[cdc-connector][mysql] Add SNAPSHOT_ONLY mode for Mysql CDC Source (#2901)

pull/2842/head
Hongshun Wang 1 year ago committed by Leonard Xu
parent 35d3c0ff28
commit 3bf886b94d

@ -203,6 +203,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
}
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
@ -221,7 +223,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_SNAPSHOT:
return StartupOptions.snapshot();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
@ -238,9 +241,10 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s",
"Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_SNAPSHOT,
SCAN_STARTUP_MODE_VALUE_LATEST,
SCAN_STARTUP_MODE_VALUE_EARLIEST,
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,

@ -37,7 +37,6 @@ import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import org.apache.kafka.connect.data.Struct;
@ -87,6 +86,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
private Map<TableId, BinlogOffset> maxSplitHighWatermarkMap;
private final Set<TableId> pureBinlogPhaseTables;
private Tables.TableFilter capturedTableFilter;
private final StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
private static final long READER_CLOSE_TIMEOUT = 30L;
@ -124,29 +125,22 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
() -> {
try {
binlogSplitReadTask.execute(
new BinlogSplitChangeEventSourceContextImpl(),
changeEventSourceContext,
statefulTaskContext.getMySqlPartition(),
statefulTaskContext.getOffsetContext());
} catch (Exception e) {
currentTaskRunning = false;
LOG.error(
String.format(
"Execute binlog read task for mysql split %s fail",
currentBinlogSplit),
e);
readException = e;
} finally {
stopBinlogReadTask();
}
});
}
private class BinlogSplitChangeEventSourceContextImpl
implements ChangeEventSource.ChangeEventSourceContext {
@Override
public boolean isRunning() {
return currentTaskRunning;
}
}
@Override
public boolean isFinished() {
return currentBinlogSplit == null || !currentTaskRunning;
@ -191,9 +185,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}
// set currentTaskRunning to false to terminate the
// while loop in MySqlStreamingChangeEventSource's execute method
currentTaskRunning = false;
stopBinlogReadTask();
if (executorService != null) {
executorService.shutdown();
if (!executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
@ -328,7 +321,9 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
}
public void stopBinlogReadTask() {
this.currentTaskRunning = false;
currentTaskRunning = false;
// Terminate the while loop in MySqlStreamingChangeEventSource's execute method
changeEventSourceContext.stopChangeEventSource();
}
@VisibleForTesting

@ -92,6 +92,9 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
public AtomicBoolean hasNextElement;
public AtomicBoolean reachEnd;
private final StoppableChangeEventSourceContext changeEventSourceContext =
new StoppableChangeEventSourceContext();
private static final long READER_CLOSE_TIMEOUT = 30L;
public SnapshotSplitReader(
@ -153,7 +156,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
} catch (Exception e) {
setReadException(e);
} finally {
currentTaskRunning = false;
stopCurrentTask();
}
});
}
@ -175,6 +178,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
// Dispatch BINLOG_END event directly is backfill is not required
if (!isBackfillRequired(backfillBinlogSplit)) {
dispatchBinlogEndEvent(backfillBinlogSplit);
stopCurrentTask();
return;
}
@ -188,7 +192,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContextImpl(),
changeEventSourceContext,
statefulTaskContext.getMySqlPartition(),
mySqlOffsetContext);
} else {
@ -362,7 +366,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
}
private void setReadException(Throwable throwable) {
currentTaskRunning = false;
stopCurrentTask();
LOG.error(
String.format(
"Execute snapshot read task for mysql split %s fail", currentSnapshotSplit),
@ -377,6 +381,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
@Override
public void close() {
try {
stopCurrentTask();
if (statefulTaskContext.getConnection() != null) {
statefulTaskContext.getConnection().close();
}
@ -399,6 +404,11 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
}
}
private void stopCurrentTask() {
currentTaskRunning = false;
changeEventSourceContext.stopChangeEventSource();
}
@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
@ -435,21 +445,4 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
return lowWatermark != null && highWatermark != null;
}
}
/**
* The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded binlog task
* of a snapshot split task.
*/
public class SnapshotBinlogSplitChangeEventSourceContextImpl
implements ChangeEventSource.ChangeEventSourceContext {
public void finished() {
currentTaskRunning = false;
}
@Override
public boolean isRunning() {
return currentTaskRunning;
}
}
}

@ -0,0 +1,38 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.debezium.reader;
import io.debezium.pipeline.source.spi.ChangeEventSource;
/**
* A change event source context that can stop the running source by invoking {@link
* #stopChangeEventSource()}.
*/
public class StoppableChangeEventSourceContext
implements ChangeEventSource.ChangeEventSourceContext {
private volatile boolean isRunning = true;
public void stopChangeEventSource() {
isRunning = false;
}
@Override
public boolean isRunning() {
return isRunning;
}
}

@ -19,7 +19,7 @@ package com.ververica.cdc.connectors.mysql.debezium.task;
import com.github.shyiko.mysql.binlog.event.Event;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.SnapshotBinlogSplitChangeEventSourceContextImpl;
import com.ververica.cdc.connectors.mysql.debezium.reader.StoppableChangeEventSourceContext;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import io.debezium.DebeziumException;
@ -108,7 +108,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
new DebeziumException("Error processing binlog signal event", e));
}
// tell reader the binlog task finished
((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
((StoppableChangeEventSourceContext) context).stopChangeEventSource();
}
}
}

@ -18,7 +18,6 @@ package com.ververica.cdc.connectors.mysql.debezium.task.context;
import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlConnector;
@ -102,13 +101,6 @@ public class MySqlErrorHandler extends ErrorHandler {
.getMessage()
.endsWith(
"internal schema representation is probably out of sync with real database schema")
&& isSettingStartingOffset();
}
private boolean isSettingStartingOffset() {
StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.TIMESTAMP
|| startupMode == StartupMode.SPECIFIC_OFFSETS;
&& sourceConfig.getStartupOptions().isStreamOnly();
}
}

@ -56,7 +56,6 @@ import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import com.ververica.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.jdbc.JdbcConnection;
@ -151,7 +150,12 @@ public class MySqlSource<T>
@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
MySqlSourceConfig sourceConfig = configFactory.createConfig(0);
if (sourceConfig.getStartupOptions().isSnapshotOnly()) {
return Boundedness.BOUNDED;
} else {
return Boundedness.CONTINUOUS_UNBOUNDED;
}
}
@Override
@ -197,7 +201,7 @@ public class MySqlSource<T>
validator.validate();
final MySqlSplitAssigner splitAssigner;
if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) {
if (!sourceConfig.getStartupOptions().isStreamOnly()) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
boolean isTableIdCaseSensitive = DebeziumUtils.isTableIdCaseSensitive(jdbc);
splitAssigner =
@ -214,7 +218,8 @@ public class MySqlSource<T>
splitAssigner = new MySqlBinlogSplitAssigner(sourceConfig);
}
return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
return new MySqlSourceEnumerator(
enumContext, sourceConfig, splitAssigner, getBoundedness());
}
@Override
@ -238,7 +243,8 @@ public class MySqlSource<T>
throw new UnsupportedOperationException(
"Unsupported restored PendingSplitsState: " + checkpoint);
}
return new MySqlSourceEnumerator(enumContext, sourceConfig, splitAssigner);
return new MySqlSourceEnumerator(
enumContext, sourceConfig, splitAssigner, getBoundedness());
}
@Override

@ -110,7 +110,7 @@ public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner {
}
@Override
public boolean isStreamSplitAssigned() {
public boolean noMoreSplits() {
return isBinlogSplitAssigned;
}

@ -51,6 +51,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
private static final String BINLOG_SPLIT_ID = "binlog-split";
private final int splitMetaGroupSize;
private final MySqlSourceConfig sourceConfig;
private boolean isBinlogSplitAssigned;
@ -62,6 +63,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
List<TableId> remainingTables,
boolean isTableIdCaseSensitive) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
false,
@ -73,6 +75,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
int currentParallelism,
HybridPendingSplitsState checkpoint) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
checkpoint.isBinlogSplitAssigned(),
@ -80,9 +83,11 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
}
private MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
MySqlSnapshotSplitAssigner snapshotSplitAssigner,
boolean isBinlogSplitAssigned,
int splitMetaGroupSize) {
this.sourceConfig = sourceConfig;
this.snapshotSplitAssigner = snapshotSplitAssigner;
this.isBinlogSplitAssigned = isBinlogSplitAssigned;
this.splitMetaGroupSize = splitMetaGroupSize;
@ -99,7 +104,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
// do not assign split until the adding table process finished
return Optional.empty();
}
if (snapshotSplitAssigner.noMoreSnapshotSplits()) {
if (snapshotSplitAssigner.noMoreSplits()) {
// binlog split assigning
if (isBinlogSplitAssigned) {
// no more splits for the assigner
@ -129,11 +134,6 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
return snapshotSplitAssigner.waitingForFinishedSplits();
}
@Override
public boolean isStreamSplitAssigned() {
return isBinlogSplitAssigned;
}
@Override
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
return snapshotSplitAssigner.getFinishedSplitInfos();
@ -174,6 +174,11 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
return snapshotSplitAssigner.getAssignerStatus();
}
@Override
public boolean noMoreSplits() {
return snapshotSplitAssigner.noMoreSplits() && isBinlogSplitAssigned;
}
@Override
public void startAssignNewlyAddedTables() {
snapshotSplitAssigner.startAssignNewlyAddedTables();
@ -189,10 +194,6 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
snapshotSplitAssigner.close();
}
public boolean noMoreSnapshotSplits() {
return snapshotSplitAssigner.noMoreSnapshotSplits();
}
// --------------------------------------------------------------------------------------------
private MySqlBinlogSplit createBinlogSplit() {
@ -206,12 +207,17 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
BinlogOffset minBinlogOffset = null;
BinlogOffset maxBinlogOffset = null;
for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
// find the min binlog offset
// find the min and max binlog offset
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
if (minBinlogOffset == null || binlogOffset.isBefore(minBinlogOffset)) {
minBinlogOffset = binlogOffset;
}
if (maxBinlogOffset == null || binlogOffset.isAfter(maxBinlogOffset)) {
maxBinlogOffset = binlogOffset;
}
finishedSnapshotSplitInfos.add(
new FinishedSnapshotSplitInfo(
split.getTableId(),
@ -221,14 +227,21 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
binlogOffset));
}
// If the source is running in snapshot mode, we use the highest watermark among
// snapshot splits as the ending offset to provide a consistent snapshot view at the moment
// of high watermark.
BinlogOffset stoppingOffset = BinlogOffset.ofNonStopping();
if (sourceConfig.getStartupOptions().isSnapshotOnly()) {
stoppingOffset = maxBinlogOffset;
}
// the finishedSnapshotSplitInfos is too large for transmission, divide it to groups and
// then transfer them
boolean divideMetaToGroups = finishedSnapshotSplitInfos.size() > splitMetaGroupSize;
return new MySqlBinlogSplit(
BINLOG_SPLIT_ID,
minBinlogOffset == null ? BinlogOffset.ofEarliest() : minBinlogOffset,
BinlogOffset.ofNonStopping(),
stoppingOffset,
divideMetaToGroups ? new ArrayList<>() : finishedSnapshotSplitInfos,
new HashMap<>(),
finishedSnapshotSplitInfos.size());

@ -216,7 +216,9 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
}
private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled()) {
// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
|| !sourceConfig.getStartupOptions().isSnapshotOnly()) {
// check whether we got newly added tables
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> currentCapturedTables =
@ -517,7 +519,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
}
/** Indicates there is no more splits available in this assigner. */
public boolean noMoreSnapshotSplits() {
public boolean noMoreSplits() {
return !needToDiscoveryTables() && remainingTables.isEmpty() && remainingSplits.isEmpty();
}
@ -547,7 +549,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
* are finished.
*/
private boolean allSnapshotSplitsFinished() {
return noMoreSnapshotSplits() && assignedSplits.size() == splitFinishedOffsets.size();
return noMoreSplits() && assignedSplits.size() == splitFinishedOffsets.size();
}
private void splitChunksForRemainingTables() {

@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.assigners;
import org.apache.flink.api.common.state.CheckpointListener;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
@ -32,6 +33,7 @@ import java.util.Optional;
* The {@code MySqlSplitAssigner} is responsible for deciding what split should be processed. It
* determines split processing order.
*/
@Internal
public interface MySqlSplitAssigner {
/**
@ -53,11 +55,6 @@ public interface MySqlSplitAssigner {
*/
boolean waitingForFinishedSplits();
/** Whether the split assigner is finished stream split assigning. */
default boolean isStreamSplitAssigned() {
throw new UnsupportedOperationException("Not support to assigning StreamSplit.");
}
/**
* Gets the finished splits' information. This is useful metadata to generate a binlog split
* that considering finished snapshot splits.
@ -110,6 +107,9 @@ public interface MySqlSplitAssigner {
/** Starts assign newly added tables. */
void startAssignNewlyAddedTables();
/** Indicates there is no more splits available in this assigner. */
boolean noMoreSplits();
/**
* Callback to handle the binlog split has been updated in the newly added tables process. This
* is useful to check the newly added tables has been finished or not.

@ -16,6 +16,7 @@
package com.ververica.cdc.connectors.mysql.source.enumerator;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@ -24,7 +25,6 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
@ -71,6 +71,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
private final MySqlSourceConfig sourceConfig;
private final MySqlSplitAssigner splitAssigner;
private final Boundedness boundedness;
// using TreeSet to prefer assigning binlog split to task-0 for easier debug
private final TreeSet<Integer> readersAwaitingSplit;
private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta;
@ -80,10 +82,12 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
public MySqlSourceEnumerator(
SplitEnumeratorContext<MySqlSplit> context,
MySqlSourceConfig sourceConfig,
MySqlSplitAssigner splitAssigner) {
MySqlSplitAssigner splitAssigner,
Boundedness boundedness) {
this.context = context;
this.sourceConfig = sourceConfig;
this.splitAssigner = splitAssigner;
this.boundedness = boundedness;
this.readersAwaitingSplit = new TreeSet<>();
}
@ -204,10 +208,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
continue;
}
if (splitAssigner.isStreamSplitAssigned()
&& sourceConfig.isCloseIdleReaders()
&& noMoreSnapshotSplits()
&& (binlogSplitTaskId != null && !binlogSplitTaskId.equals(nextAwaiting))) {
if (shouldCloseIdleReader(nextAwaiting)) {
// close idle readers when snapshot phase finished.
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
@ -232,14 +233,18 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
}
}
private boolean noMoreSnapshotSplits() {
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
return ((MySqlHybridSplitAssigner) splitAssigner).noMoreSnapshotSplits();
} else if (splitAssigner instanceof MySqlBinlogSplitAssigner) {
return true;
}
throw new IllegalStateException(
"Unexpected subtype of MySqlSplitAssigner class when invoking noMoreSnapshotSplits.");
private boolean shouldCloseIdleReader(int nextAwaiting) {
// When no unassigned split anymore, Signal NoMoreSplitsEvent to awaiting reader in two
// situations:
// 1. When Set StartupMode = snapshot mode(also bounded), there's no more splits in the
// assigner.
// 2. When set scan.incremental.close-idle-reader.enabled = true, there's no more splits in
// the assigner.
return splitAssigner.noMoreSplits()
&& (boundedness == Boundedness.BOUNDED
|| (sourceConfig.isCloseIdleReaders()
&& (binlogSplitTaskId != null
&& !binlogSplitTaskId.equals(nextAwaiting))));
}
private int[] getRegisteredReader() {

@ -176,15 +176,26 @@ public class MySqlSourceReader<T>
for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
if (mySqlSplit.isBinlogSplit()) {
suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit());
LOG.info(
"Source reader {} suspended binlog split reader success after the newly added table process, current offset {}",
subtaskId,
suspendedBinlogSplit.getStartingOffset());
context.sendSourceEventToCoordinator(
new LatestFinishedSplitsNumberRequestEvent());
// do not request next split when the reader is suspended
requestNextSplit = false;
// Two possibilities that finish a binlog split:
//
// 1. Binlog reader is suspended by enumerator because new tables have been
// finished its snapshot reading.
// Under this case mySqlSourceReaderContext.isBinlogSplitReaderSuspended() is
// true and need to request the latest finished splits number.
//
// 2. Binlog reader reaches the ending offset of the split. We need to do
// nothing under this case.
if (mySqlSourceReaderContext.isBinlogSplitReaderSuspended()) {
suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit());
LOG.info(
"Source reader {} suspended binlog split reader success after the newly added table process, current offset {}",
subtaskId,
suspendedBinlogSplit.getStartingOffset());
context.sendSourceEventToCoordinator(
new LatestFinishedSplitsNumberRequestEvent());
// do not request next split when the reader is suspended
requestNextSplit = false;
}
} else {
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
}

@ -213,6 +213,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
}
private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot";
private static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
@ -224,6 +225,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
switch (modeString.toLowerCase()) {
case SCAN_STARTUP_MODE_VALUE_INITIAL:
return StartupOptions.initial();
case SCAN_STARTUP_MODE_VALUE_SNAPSHOT:
return StartupOptions.snapshot();
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupOptions.latest();
@ -241,9 +244,10 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
default:
throw new ValidationException(
String.format(
"Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s], but was: %s",
"Invalid value for option '%s'. Supported values are [%s, %s, %s, %s, %s, %s], but was: %s",
SCAN_STARTUP_MODE.key(),
SCAN_STARTUP_MODE_VALUE_INITIAL,
SCAN_STARTUP_MODE_VALUE_SNAPSHOT,
SCAN_STARTUP_MODE_VALUE_LATEST,
SCAN_STARTUP_MODE_VALUE_EARLIEST,
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET,

@ -40,6 +40,14 @@ public final class StartupOptions implements Serializable {
return new StartupOptions(StartupMode.INITIAL, null);
}
/**
* Performs an initial snapshot on the monitored database tables upon first startup, and not
* read the binlog anymore .
*/
public static StartupOptions snapshot() {
return new StartupOptions(StartupMode.SNAPSHOT, null);
}
/**
* Never to perform snapshot on the monitored database tables upon first startup, just read from
* the beginning of the binlog. This should be used with care, as it is only valid when the
@ -92,12 +100,23 @@ public final class StartupOptions implements Serializable {
private StartupOptions(StartupMode startupMode, BinlogOffset binlogOffset) {
this.startupMode = startupMode;
this.binlogOffset = binlogOffset;
if (startupMode != StartupMode.INITIAL) {
if (isStreamOnly()) {
checkNotNull(
binlogOffset, "Binlog offset is required if startup mode is %s", startupMode);
}
}
public boolean isStreamOnly() {
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.LATEST_OFFSET
|| startupMode == StartupMode.SPECIFIC_OFFSETS
|| startupMode == StartupMode.TIMESTAMP;
}
public boolean isSnapshotOnly() {
return startupMode == StartupMode.SNAPSHOT;
}
@Override
public boolean equals(Object o) {
if (this == o) {

@ -160,6 +160,8 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
@Parameterized.Parameters(name = "table: {0}, chunkColumn: {1}")
public static Collection<Object[]> parameters() {
return Arrays.asList(
@ -350,10 +352,76 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
testStartingOffset(StartupOptions.latest(), Collections.emptyList());
}
@Test
public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(
false, 21, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot());
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[1019, user_20, Shanghai, 123567891234]",
"+I[2000, user_21, Shanghai, 123567891234]");
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
List<String> records =
testBackfillWhenWritingEvents(
false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot());
List<String> expectedRecords =
Arrays.asList(
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[109, user_4, Shanghai, 123567891234]",
"+I[110, user_5, Shanghai, 123567891234]",
"+I[111, user_6, Shanghai, 123567891234]",
"+I[118, user_7, Shanghai, 123567891234]",
"+I[121, user_8, Shanghai, 123567891234]",
"+I[123, user_9, Shanghai, 123567891234]",
"+I[1009, user_10, Shanghai, 123567891234]",
"+I[1010, user_11, Shanghai, 123567891234]",
"+I[1011, user_12, Shanghai, 123567891234]",
"+I[1012, user_13, Shanghai, 123567891234]",
"+I[1013, user_14, Shanghai, 123567891234]",
"+I[1014, user_15, Shanghai, 123567891234]",
"+I[1015, user_16, Shanghai, 123567891234]",
"+I[1016, user_17, Shanghai, 123567891234]",
"+I[1017, user_18, Shanghai, 123567891234]",
"+I[1018, user_19, Shanghai, 123567891234]",
"+I[2000, user_21, Pittsburgh, 123567891234]",
"+I[15213, user_15213, Shanghai, 123567891234]");
// when enable backfill, the wal log between (snapshot, high_watermark) will be
// applied as snapshot image
assertEqualsInAnyOrder(expectedRecords, records);
}
@Test
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);
List<String> records =
testBackfillWhenWritingEvents(
false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial());
List<String> expectedRecords =
Arrays.asList(
@ -386,7 +454,9 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
@Test
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);
List<String> records =
testBackfillWhenWritingEvents(
false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial());
List<String> expectedRecords =
Arrays.asList(
@ -419,7 +489,9 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
@Test
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK);
List<String> records =
testBackfillWhenWritingEvents(
true, 25, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial());
List<String> expectedRecords =
Arrays.asList(
@ -456,7 +528,9 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
@Test
public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK);
List<String> records =
testBackfillWhenWritingEvents(
true, 25, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial());
List<String> expectedRecords =
Arrays.asList(
@ -492,7 +566,11 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
}
private List<String> testBackfillWhenWritingEvents(
boolean skipSnapshotBackfill, int fetchSize, int hookType) throws Exception {
boolean skipSnapshotBackfill,
int fetchSize,
int hookType,
StartupOptions startupOptions)
throws Exception {
customDatabase.createAndInitialize();
TestTable customerTable =
new TestTable(customDatabase, "customers", TestTableSchemas.CUSTOMERS);
@ -509,6 +587,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
.tableList(customerTable.getTableId())
.deserializer(customerTable.getDeserializer())
.skipSnapshotBackfill(skipSnapshotBackfill)
.startupOptions(startupOptions)
.build();
String[] statements =
@ -529,10 +608,17 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
connection.execute(statements);
connection.commit();
};
if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
} else if (hookType == USE_POST_LOWWATERMARK_HOOK) {
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
switch (hookType) {
case USE_POST_LOWWATERMARK_HOOK:
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
break;
case USE_PRE_HIGHWATERMARK_HOOK:
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
break;
case USE_POST_HIGHWATERMARK_HOOK:
hooks.setPostHighWatermarkAction(snapshotPhaseHook);
break;
}
source.setSnapshotHooks(hooks);

@ -31,6 +31,7 @@ import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
@ -49,6 +50,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Tests for {@link MySqlHybridSplitAssigner}. */
public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
@ -65,7 +67,8 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() {
final String captureTable = "customers";
MySqlSourceConfig configuration = getConfig(new String[] {captureTable});
MySqlSourceConfig configuration =
getConfig(new String[] {captureTable}, StartupOptions.initial());
// Step 1. Mock MySqlHybridSplitAssigner Object
TableId tableId = new TableId(null, customerDatabase.getDatabaseName(), captureTable);
@ -139,14 +142,54 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
assigner.close();
}
private MySqlSourceConfig getConfig(String[] captureTables) {
@Test
public void testAssigningInSnapshotOnlyMode() {
final String captureTable = "customers";
MySqlSourceConfig sourceConfig =
getConfig(new String[] {captureTable}, StartupOptions.snapshot());
// Create and initialize assigner
MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(sourceConfig, 1, new ArrayList<>(), false);
assigner.open();
// Get all snapshot splits
List<MySqlSnapshotSplit> snapshotSplits = drainSnapshotSplits(assigner);
// Generate fake finished offsets from 0 to snapshotSplits.size() - 1
int i = 0;
Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) {
BinlogOffset binlogOffset =
BinlogOffset.builder().setBinlogFilePosition("foo", i++).build();
finishedOffsets.put(snapshotSplit.splitId(), binlogOffset);
}
assigner.onFinishedSplits(finishedOffsets);
// Get the binlog split
Optional<MySqlSplit> split = assigner.getNext();
assertTrue(split.isPresent());
assertTrue(split.get() instanceof MySqlBinlogSplit);
MySqlBinlogSplit binlogSplit = split.get().asBinlogSplit();
// Validate if the stopping offset of the binlog split is the maximum among all finished
// offsets, which should be snapshotSplits.size() - 1
assertEquals(
BinlogOffset.builder()
.setBinlogFilePosition("foo", snapshotSplits.size() - 1)
.build(),
binlogSplit.getEndingOffset());
}
private MySqlSourceConfig getConfig(String[] captureTables, StartupOptions startupOptions) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.toArray(String[]::new);
return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.startupOptions(startupOptions)
.databaseList(customerDatabase.getDatabaseName())
.tableList(captureTableIds)
.hostname(MYSQL_CONTAINER.getHost())
@ -156,4 +199,17 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
.serverTimeZone(ZoneId.of("UTC").toString())
.createConfig(0);
}
private List<MySqlSnapshotSplit> drainSnapshotSplits(MySqlHybridSplitAssigner assigner) {
List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
while (true) {
Optional<MySqlSplit> split = assigner.getNext();
if (!split.isPresent()) {
break;
}
assertTrue(split.get() instanceof MySqlSnapshotSplit);
snapshotSplits.add(split.get().asSnapshotSplit());
}
return snapshotSplits;
}
}

@ -720,7 +720,7 @@ public class MySqlTableSourceFactoryTest {
} catch (Throwable t) {
String msg =
"Invalid value for option 'scan.startup.mode'. Supported values are "
+ "[initial, latest-offset, earliest-offset, specific-offset, timestamp], "
+ "[initial, snapshot, latest-offset, earliest-offset, specific-offset, timestamp], "
+ "but was: abc";
assertTrue(ExceptionUtils.findThrowableWithMessage(t, msg).isPresent());
}

Loading…
Cancel
Save