[mysql] Optimize the newly added table process from read binlog blocking to read binlog un-blocking

pull/1935/head
Leonard Xu 2 years ago committed by Hang Ruan
parent 12f700b159
commit 4b5f941442

@ -91,7 +91,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) { public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
this.statefulTaskContext = statefulTaskContext; this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory = ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build(); new ThreadFactoryBuilder().setNameFormat("binlog-reader-" + subTaskId).build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory); this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = true; this.currentTaskRunning = true;
this.pureBinlogPhaseTables = new HashSet<>(); this.pureBinlogPhaseTables = new HashSet<>();

@ -94,7 +94,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS
public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) { public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
this.statefulTaskContext = statefulTaskContext; this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory = ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subtaskId).build(); new ThreadFactoryBuilder().setNameFormat("snapshot-reader-" + subtaskId).build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory); this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = false; this.currentTaskRunning = false;
this.hasNextElement = new AtomicBoolean(false); this.hasNextElement = new AtomicBoolean(false);

@ -35,13 +35,13 @@ import static java.lang.String.format;
* INITIAL_ASSIGNING_FINISHED(end) * INITIAL_ASSIGNING_FINISHED(end)
* | * |
* | * |
* suspend() // found newly added tables * startAssignNewlyTables() // found newly added tables, assign newly added tables
* | * |
* *
* SUSPENDED --- wakeup() -- NEWLY_ADDED_ASSIGNING --- onFinish() -- NEWLY_ADDED_ASSIGNING_FINISHED(end) * NEWLY_ADDED_ASSIGNING ---onFinish()-- NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED---onBinlogSplitUpdated()---> NEWLY_ADDED_ASSIGNING_FINISHED(end)
* | * |
* | | * | |
* |----------------- suspend() //found newly added tables -----------| * |--------------- startAssignNewlyTables() //found newly added tables,, assign newly added tables ---------------|
* </pre> * </pre>
*/ */
public enum AssignerStatus { public enum AssignerStatus {
@ -61,49 +61,52 @@ public enum AssignerStatus {
INITIAL_ASSIGNING_FINISHED(1) { INITIAL_ASSIGNING_FINISHED(1) {
@Override @Override
public AssignerStatus getNextStatus() { public AssignerStatus getNextStatus() {
return SUSPENDED; return NEWLY_ADDED_ASSIGNING;
} }
@Override @Override
public AssignerStatus suspend() { public AssignerStatus startAssignNewlyTables() {
LOG.info("Assigner status changes from INITIAL_ASSIGNING_FINISHED to SUSPENDED"); LOG.info(
"Assigner status changes from INITIAL_ASSIGNING_FINISHED to NEW_ADDED_ASSIGNING");
return this.getNextStatus(); return this.getNextStatus();
} }
}, },
SUSPENDED(2) { NEWLY_ADDED_ASSIGNING(2) {
@Override @Override
public AssignerStatus getNextStatus() { public AssignerStatus getNextStatus() {
return NEWLY_ADDED_ASSIGNING; return NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED;
} }
@Override @Override
public AssignerStatus wakeup() { public AssignerStatus onFinish() {
LOG.info("Assigner status changes from SUSPENDED to NEWLY_ADDED_ASSIGNING"); LOG.info(
"Assigner status changes from NEWLY_ADDED_ASSIGNING to NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED");
return this.getNextStatus(); return this.getNextStatus();
} }
}, },
NEWLY_ADDED_ASSIGNING(3) { NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED(3) {
@Override @Override
public AssignerStatus getNextStatus() { public AssignerStatus getNextStatus() {
return NEWLY_ADDED_ASSIGNING_FINISHED; return NEWLY_ADDED_ASSIGNING_FINISHED;
} }
@Override @Override
public AssignerStatus onFinish() { public AssignerStatus onBinlogSplitUpdated() {
LOG.info( LOG.info(
"Assigner status changes from NEWLY_ADDED_ASSIGNING to NEWLY_ADDED_ASSIGNING_FINISHED"); "Assigner status changes from NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED to NEWLY_ADDED_ASSIGNING_FINISHED");
return this.getNextStatus(); return this.getNextStatus();
} }
}, },
NEWLY_ADDED_ASSIGNING_FINISHED(4) { NEWLY_ADDED_ASSIGNING_FINISHED(4) {
@Override @Override
public AssignerStatus getNextStatus() { public AssignerStatus getNextStatus() {
return SUSPENDED; return NEWLY_ADDED_ASSIGNING;
} }
@Override @Override
public AssignerStatus suspend() { public AssignerStatus startAssignNewlyTables() {
LOG.info("Assigner status changes from NEWLY_ADDED_ASSIGNING_FINISHED to SUSPENDED"); LOG.info(
"Assigner status changes from NEWLY_ADDED_ASSIGNING_FINISHED to NEWLY_ADDED_ASSIGNING");
return this.getNextStatus(); return this.getNextStatus();
} }
}; };
@ -128,17 +131,17 @@ public enum AssignerStatus {
fromStatusCode(this.getStatusCode()))); fromStatusCode(this.getStatusCode())));
} }
public AssignerStatus suspend() { public AssignerStatus startAssignNewlyTables() {
throw new IllegalStateException( throw new IllegalStateException(
format( format(
"Invalid call, assigner under %s state can not call suspend()", "Invalid call, assigner under %s state can not call startAssignNewlyTables()",
fromStatusCode(this.getStatusCode()))); fromStatusCode(this.getStatusCode())));
} }
public AssignerStatus wakeup() { public AssignerStatus onBinlogSplitUpdated() {
throw new IllegalStateException( throw new IllegalStateException(
format( format(
"Invalid call, assigner under %s state can not call wakeup()", "Invalid call, assigner under %s state can not call onBinlogSplitUpdated()",
fromStatusCode(this.getStatusCode()))); fromStatusCode(this.getStatusCode())));
} }
@ -154,9 +157,9 @@ public enum AssignerStatus {
case 1: case 1:
return INITIAL_ASSIGNING_FINISHED; return INITIAL_ASSIGNING_FINISHED;
case 2: case 2:
return SUSPENDED;
case 3:
return NEWLY_ADDED_ASSIGNING; return NEWLY_ADDED_ASSIGNING;
case 3:
return NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED;
case 4: case 4:
return NEWLY_ADDED_ASSIGNING_FINISHED; return NEWLY_ADDED_ASSIGNING_FINISHED;
default: default:
@ -167,30 +170,21 @@ public enum AssignerStatus {
} }
} }
/** Returns whether the split assigner state is suspended. */
public static boolean isSuspended(AssignerStatus assignerStatus) {
return assignerStatus == SUSPENDED;
}
/** /**
* Returns whether the split assigner has assigned all snapshot splits, which indicates there is * Returns whether the split assigner has assigned all snapshot splits, which indicates there is
* no more splits and all records of splits have been completely processed in the pipeline. * no more snapshot splits and all records of splits have been completely processed in the
* pipeline.
*/ */
public static boolean isAssigningFinished(AssignerStatus assignerStatus) { public static boolean isSnapshotAssigningFinished(AssignerStatus assignerStatus) {
return assignerStatus == INITIAL_ASSIGNING_FINISHED return assignerStatus == INITIAL_ASSIGNING_FINISHED
|| assignerStatus == NEWLY_ADDED_ASSIGNING_FINISHED; || assignerStatus == NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED;
} }
/** Returns whether the split assigner is assigning snapshot splits. */ /** Returns whether the split assigner is assigning snapshot splits. */
public static boolean isAssigning(AssignerStatus assignerStatus) { public static boolean isAssigningSnapshotSplits(AssignerStatus assignerStatus) {
return assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING; return assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING;
} }
/** Returns whether the split assigner is assigning newly added snapshot splits. */
public static boolean isNewlyAddedAssigning(AssignerStatus assignerStatus) {
return assignerStatus == NEWLY_ADDED_ASSIGNING;
}
/** Returns whether the split assigner has finished its initial tables assignment. */ /** Returns whether the split assigner has finished its initial tables assignment. */
public static boolean isInitialAssigningFinished(AssignerStatus assignerStatus) { public static boolean isInitialAssigningFinished(AssignerStatus assignerStatus) {
return assignerStatus == INITIAL_ASSIGNING_FINISHED; return assignerStatus == INITIAL_ASSIGNING_FINISHED;
@ -200,4 +194,11 @@ public enum AssignerStatus {
public static boolean isNewlyAddedAssigningFinished(AssignerStatus assignerStatus) { public static boolean isNewlyAddedAssigningFinished(AssignerStatus assignerStatus) {
return assignerStatus == NEWLY_ADDED_ASSIGNING_FINISHED; return assignerStatus == NEWLY_ADDED_ASSIGNING_FINISHED;
} }
/**
* Returns whether the split assigner has finished its newly added snapshot splits assignment.
*/
public static boolean isNewlyAddedAssigningSnapshotFinished(AssignerStatus assignerStatus) {
return assignerStatus == NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED;
}
} }

@ -37,7 +37,7 @@ import java.util.Optional;
/** A {@link MySqlSplitAssigner} which only read binlog from current binlog position. */ /** A {@link MySqlSplitAssigner} which only read binlog from current binlog position. */
public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner { public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner {
private static final String BINLOG_SPLIT_ID = "binlog-split"; public static final String BINLOG_SPLIT_ID = "binlog-split";
private final MySqlSourceConfig sourceConfig; private final MySqlSourceConfig sourceConfig;
@ -110,10 +110,10 @@ public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner {
} }
@Override @Override
public void suspend() {} public void startAssignNewlyAddedTables() {}
@Override @Override
public void wakeup() {} public void onBinlogSplitUpdated() {}
@Override @Override
public void close() {} public void close() {}

@ -39,7 +39,7 @@ import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isInitialAssigningFinished; import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isInitialAssigningFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isNewlyAddedAssigningFinished; import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isNewlyAddedAssigningFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended; import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isNewlyAddedAssigningSnapshotFinished;
/** /**
* A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key * A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key
@ -95,11 +95,11 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
@Override @Override
public Optional<MySqlSplit> getNext() { public Optional<MySqlSplit> getNext() {
if (isSuspended(getAssignerStatus())) { if (isNewlyAddedAssigningSnapshotFinished(getAssignerStatus())) {
// do not assign split until the assigner received SuspendBinlogReaderAckEvent // do not assign split until the adding table process finished
return Optional.empty(); return Optional.empty();
} }
if (snapshotSplitAssigner.noMoreSplits()) { if (snapshotSplitAssigner.noMoreSnapshotSplits()) {
// binlog split assigning // binlog split assigning
if (isBinlogSplitAssigned) { if (isBinlogSplitAssigned) {
// no more splits for the assigner // no more splits for the assigner
@ -170,13 +170,13 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
} }
@Override @Override
public void suspend() { public void startAssignNewlyAddedTables() {
snapshotSplitAssigner.suspend(); snapshotSplitAssigner.startAssignNewlyAddedTables();
} }
@Override @Override
public void wakeup() { public void onBinlogSplitUpdated() {
snapshotSplitAssigner.wakeup(); snapshotSplitAssigner.onBinlogSplitUpdated();
} }
@Override @Override

@ -56,8 +56,9 @@ import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables; import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection; import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isAssigningFinished; import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isAssigningSnapshotSplits;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended; import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isNewlyAddedAssigningSnapshotFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSnapshotAssigningFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState.NO_SPLITTING_TABLE_STATE; import static com.ververica.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
/** /**
@ -181,7 +182,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
} }
// when restore the job from legacy savepoint, the legacy state may haven't snapshot // when restore the job from legacy savepoint, the legacy state may haven't snapshot
// remaining tables, discovery remaining table here // remaining tables, discovery remaining table here
else if (!isRemainingTablesCheckpointed && !isAssigningFinished(assignerStatus)) { else if (!isRemainingTablesCheckpointed && !isSnapshotAssigningFinished(assignerStatus)) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> discoverTables = discoverCapturedTables(jdbc, sourceConfig); final List<TableId> discoverTables = discoverCapturedTables(jdbc, sourceConfig);
discoverTables.removeAll(alreadyProcessedTables); discoverTables.removeAll(alreadyProcessedTables);
@ -206,11 +207,11 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
// tables // tables
LOG.info("Found newly added tables, start capture newly added tables process"); LOG.info("Found newly added tables, start capture newly added tables process");
remainingTables.addAll(newlyAddedTables); remainingTables.addAll(newlyAddedTables);
if (isAssigningFinished(assignerStatus)) { if (isSnapshotAssigningFinished(assignerStatus)) {
// start the newly added tables process under binlog reading phase // start the newly added tables process under binlog reading phase
LOG.info( LOG.info(
"Found newly added tables, start capture newly added tables process under binlog reading phase"); "Found newly added tables, start capture newly added tables process under binlog reading phase");
this.suspend(); this.startAssignNewlyAddedTables();
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -305,7 +306,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
@Override @Override
public boolean waitingForFinishedSplits() { public boolean waitingForFinishedSplits() {
return !allSplitsFinished(); return !allSnapshotSplitsFinished();
} }
@Override @Override
@ -337,7 +338,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
@Override @Override
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) { public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
this.splitFinishedOffsets.putAll(splitFinishedOffsets); this.splitFinishedOffsets.putAll(splitFinishedOffsets);
if (allSplitsFinished() && AssignerStatus.isAssigning(assignerStatus)) { if (allSnapshotSplitsFinished() && isAssigningSnapshotSplits(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need // Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and binlog split. // to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) { if (currentParallelism == 1) {
@ -378,11 +379,10 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
true, true,
chunkSplitter.snapshotState(checkpointId)); chunkSplitter.snapshotState(checkpointId));
// we need a complete checkpoint before mark this assigner to be finished, to wait for // we need a complete checkpoint before mark this assigner to be finished, to wait for
// all // all records of snapshot splits are completely processed
// records of snapshot splits are completely processed
if (checkpointIdToFinish == null if (checkpointIdToFinish == null
&& !isAssigningFinished(assignerStatus) && !isSnapshotAssigningFinished(assignerStatus)
&& allSplitsFinished()) { && allSnapshotSplitsFinished()) {
checkpointIdToFinish = checkpointId; checkpointIdToFinish = checkpointId;
} }
return state; return state;
@ -393,8 +393,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
// we have waited for at-least one complete checkpoint after all snapshot-splits are // we have waited for at-least one complete checkpoint after all snapshot-splits are
// finished, then we can mark snapshot assigner as finished. // finished, then we can mark snapshot assigner as finished.
if (checkpointIdToFinish != null if (checkpointIdToFinish != null
&& !isAssigningFinished(assignerStatus) && isAssigningSnapshotSplits(assignerStatus)
&& allSplitsFinished()) { && allSnapshotSplitsFinished()) {
if (checkpointId >= checkpointIdToFinish) { if (checkpointId >= checkpointIdToFinish) {
assignerStatus = assignerStatus.onFinish(); assignerStatus = assignerStatus.onFinish();
} }
@ -408,17 +408,21 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
} }
@Override @Override
public void suspend() { public void startAssignNewlyAddedTables() {
Preconditions.checkState( Preconditions.checkState(
isAssigningFinished(assignerStatus), "Invalid assigner status {}", assignerStatus); isSnapshotAssigningFinished(assignerStatus),
assignerStatus = assignerStatus.suspend(); "Invalid assigner status {}",
assignerStatus);
assignerStatus = assignerStatus.startAssignNewlyTables();
} }
@Override @Override
public void wakeup() { public void onBinlogSplitUpdated() {
Preconditions.checkState( Preconditions.checkState(
isSuspended(assignerStatus), "Invalid assigner status {}", assignerStatus); isNewlyAddedAssigningSnapshotFinished(assignerStatus),
assignerStatus = assignerStatus.wakeup(); "Invalid assigner status {}",
assignerStatus);
assignerStatus = assignerStatus.onBinlogSplitUpdated();
} }
@Override @Override
@ -457,7 +461,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
} }
/** Indicates there is no more splits available in this assigner. */ /** Indicates there is no more splits available in this assigner. */
public boolean noMoreSplits() { public boolean noMoreSnapshotSplits() {
return !needToDiscoveryTables() && remainingTables.isEmpty() && remainingSplits.isEmpty(); return !needToDiscoveryTables() && remainingTables.isEmpty() && remainingSplits.isEmpty();
} }
@ -486,8 +490,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
* Returns whether all splits are finished which means no more splits and all assigned splits * Returns whether all splits are finished which means no more splits and all assigned splits
* are finished. * are finished.
*/ */
private boolean allSplitsFinished() { private boolean allSnapshotSplitsFinished() {
return noMoreSplits() && assignedSplits.size() == splitFinishedOffsets.size(); return noMoreSnapshotSplits() && assignedSplits.size() == splitFinishedOffsets.size();
} }
private void splitChunksForRemainingTables() { private void splitChunksForRemainingTables() {

@ -54,7 +54,7 @@ public interface MySqlSplitAssigner {
boolean waitingForFinishedSplits(); boolean waitingForFinishedSplits();
/** /**
* Gets the finished splits information. This is useful meta data to generate a binlog split * Gets the finished splits' information. This is useful metadata to generate a binlog split
* that considering finished snapshot splits. * that considering finished snapshot splits.
*/ */
List<FinishedSnapshotSplitInfo> getFinishedSplitInfos(); List<FinishedSnapshotSplitInfo> getFinishedSplitInfos();
@ -102,14 +102,14 @@ public interface MySqlSplitAssigner {
/** Gets the split assigner status, see {@code AssignerStatus}. */ /** Gets the split assigner status, see {@code AssignerStatus}. */
AssignerStatus getAssignerStatus(); AssignerStatus getAssignerStatus();
/** Starts assign newly added tables. */
void startAssignNewlyAddedTables();
/** /**
* Suspends the assigner under {@link AssignerStatus#INITIAL_ASSIGNING_FINISHED} or {@link * Callback to handle the binlog split has been updated in the newly added tables process. This
* AssignerStatus#NEWLY_ADDED_ASSIGNING_FINISHED}. * is useful to check the newly added tables has been finished or not.
*/ */
void suspend(); void onBinlogSplitUpdated();
/** Wakes up the assigner under {@link AssignerStatus#SUSPENDED}. */
void wakeup();
/** /**
* Called to close the assigner, in case it holds on to any resources, like threads or network * Called to close the assigner, in case it holds on to any resources, like threads or network

@ -30,14 +30,13 @@ import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsSt
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeEvent; import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent;
import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
@ -54,9 +53,7 @@ import java.util.Optional;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isAssigning; import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isNewlyAddedAssigningSnapshotFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isAssigningFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended;
/** /**
* A MySQL CDC source enumerator that enumerates receive the split request and assign the split to * A MySQL CDC source enumerator that enumerates receive the split request and assign the split to
@ -74,7 +71,6 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
// using TreeSet to prefer assigning binlog split to task-0 for easier debug // using TreeSet to prefer assigning binlog split to task-0 for easier debug
private final TreeSet<Integer> readersAwaitingSplit; private final TreeSet<Integer> readersAwaitingSplit;
private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta; private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta;
private boolean binlogReaderIsSuspended = false;
public MySqlSourceEnumerator( public MySqlSourceEnumerator(
SplitEnumeratorContext<MySqlSplit> context, SplitEnumeratorContext<MySqlSplit> context,
@ -84,21 +80,12 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
this.sourceConfig = sourceConfig; this.sourceConfig = sourceConfig;
this.splitAssigner = splitAssigner; this.splitAssigner = splitAssigner;
this.readersAwaitingSplit = new TreeSet<>(); this.readersAwaitingSplit = new TreeSet<>();
// when restored from state, if the split assigner is assigning snapshot
// splits or has already assigned all splits, send wakeup event to
// SourceReader, SourceReader can omit the event based on its own status.
if (isAssigning(splitAssigner.getAssignerStatus())
|| isAssigningFinished(splitAssigner.getAssignerStatus())) {
binlogReaderIsSuspended = true;
}
} }
@Override @Override
public void start() { public void start() {
splitAssigner.open(); splitAssigner.open();
suspendBinlogReaderIfNeed(); requestBinlogSplitUpdateIfNeed();
wakeupBinlogReaderIfNeed();
this.context.callAsync( this.context.callAsync(
this::getRegisteredReader, this::getRegisteredReader,
this::syncWithReaders, this::syncWithReaders,
@ -125,10 +112,10 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
@Override @Override
public void addReader(int subtaskId) { public void addReader(int subtaskId) {
// send SuspendBinlogReaderEvent to source reader if the assigner's status is // send BinlogSplitUpdateRequestEvent to source reader after newly added table
// suspended // snapshot splits finished.
if (isSuspended(splitAssigner.getAssignerStatus())) { if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
context.sendEventToSourceReader(subtaskId, new SuspendBinlogReaderEvent()); context.sendEventToSourceReader(subtaskId, new BinlogSplitUpdateRequestEvent());
} }
} }
@ -144,8 +131,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
Map<String, BinlogOffset> finishedOffsets = reportEvent.getFinishedOffsets(); Map<String, BinlogOffset> finishedOffsets = reportEvent.getFinishedOffsets();
splitAssigner.onFinishedSplits(finishedOffsets); splitAssigner.onFinishedSplits(finishedOffsets);
requestBinlogSplitUpdateIfNeed();
wakeupBinlogReaderIfNeed();
// send acknowledge event // send acknowledge event
FinishedSnapshotSplitsAckEvent ackEvent = FinishedSnapshotSplitsAckEvent ackEvent =
@ -156,13 +142,16 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
"The enumerator receives request for binlog split meta from subtask {}.", "The enumerator receives request for binlog split meta from subtask {}.",
subtaskId); subtaskId);
sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent); sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent);
} else if (sourceEvent instanceof SuspendBinlogReaderAckEvent) { } else if (sourceEvent instanceof BinlogSplitUpdateAckEvent) {
LOG.info(
"The enumerator receives event that the binlog split has been updated from subtask {}. ",
subtaskId);
splitAssigner.onBinlogSplitUpdated();
} else if (sourceEvent instanceof LatestFinishedSplitsNumberRequestEvent) {
LOG.info( LOG.info(
"The enumerator receives event that the binlog split reader has been suspended from subtask {}. ", "The enumerator receives request from subtask {} for the latest finished splits number after added newly tables. ",
subtaskId); subtaskId);
handleSuspendBinlogReaderAckEvent(subtaskId); handleLatestFinishedSplitNumberRequest(subtaskId);
} else if (sourceEvent instanceof LatestFinishedSplitsSizeRequestEvent) {
handleLatestFinishedSplitSizeRequest(subtaskId);
} }
} }
@ -206,7 +195,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting); LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
} else { } else {
// there is no available splits by now, skip assigning // there is no available splits by now, skip assigning
wakeupBinlogReaderIfNeed(); requestBinlogSplitUpdateIfNeed();
break; break;
} }
} }
@ -232,27 +221,14 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
} }
} }
suspendBinlogReaderIfNeed(); requestBinlogSplitUpdateIfNeed();
wakeupBinlogReaderIfNeed();
} }
private void suspendBinlogReaderIfNeed() { private void requestBinlogSplitUpdateIfNeed() {
if (isSuspended(splitAssigner.getAssignerStatus())) { if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
for (int subtaskId : getRegisteredReader()) { for (int subtaskId : getRegisteredReader()) {
context.sendEventToSourceReader(subtaskId, new SuspendBinlogReaderEvent()); context.sendEventToSourceReader(subtaskId, new BinlogSplitUpdateRequestEvent());
} }
binlogReaderIsSuspended = true;
}
}
private void wakeupBinlogReaderIfNeed() {
if (isAssigningFinished(splitAssigner.getAssignerStatus()) && binlogReaderIsSuspended) {
for (int subtaskId : getRegisteredReader()) {
context.sendEventToSourceReader(
subtaskId,
new WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.BINLOG_READER));
}
binlogReaderIsSuspended = false;
} }
} }
@ -291,25 +267,11 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
} }
} }
private void handleSuspendBinlogReaderAckEvent(int subTask) { private void handleLatestFinishedSplitNumberRequest(int subTask) {
LOG.info(
"Received event that the binlog split reader has been suspended from subtask {}. ",
subTask);
splitAssigner.wakeup();
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
for (int subtaskId : this.getRegisteredReader()) {
context.sendEventToSourceReader(
subtaskId,
new WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER));
}
}
}
private void handleLatestFinishedSplitSizeRequest(int subTask) {
if (splitAssigner instanceof MySqlHybridSplitAssigner) { if (splitAssigner instanceof MySqlHybridSplitAssigner) {
context.sendEventToSourceReader( context.sendEventToSourceReader(
subTask, subTask,
new LatestFinishedSplitsSizeEvent( new LatestFinishedSplitsNumberEvent(
splitAssigner.getFinishedSplitInfos().size())); splitAssigner.getFinishedSplitInfos().size()));
} }
} }

@ -26,7 +26,7 @@ import java.util.List;
/** /**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to * The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* pass binlog meta data, i.e. {@link FinishedSnapshotSplitInfo}. * pass binlog metadata, i.e. {@link FinishedSnapshotSplitInfo}.
*/ */
public class BinlogSplitMetaEvent implements SourceEvent { public class BinlogSplitMetaEvent implements SourceEvent {
@ -34,10 +34,10 @@ public class BinlogSplitMetaEvent implements SourceEvent {
private final String splitId; private final String splitId;
/** The meta data of binlog split is divided to multiple groups. */ /** The metadata of binlog split is divided to multiple groups. */
private final int metaGroupId; private final int metaGroupId;
/** /**
* The serialized meta data of binlog split, it's serialized/deserialize by {@link * The serialized metadata of binlog split, it's serialized/deserialize by {@link
* FinishedSnapshotSplitInfo#serialize(FinishedSnapshotSplitInfo)} and {@link * FinishedSnapshotSplitInfo#serialize(FinishedSnapshotSplitInfo)} and {@link
* FinishedSnapshotSplitInfo#deserialize(byte[])}. * FinishedSnapshotSplitInfo#deserialize(byte[])}.
*/ */

@ -23,7 +23,7 @@ import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/** /**
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to * The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
* pull binlog meta data, i.e. sending {@link BinlogSplitMetaEvent}. * pull binlog metadata, i.e. sending {@link BinlogSplitMetaEvent}.
*/ */
public class BinlogSplitMetaRequestEvent implements SourceEvent { public class BinlogSplitMetaRequestEvent implements SourceEvent {

@ -23,11 +23,11 @@ import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/** /**
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to * The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
* notify the binlog split reader has been suspended. * notify the binlog split reader has been updated.
*/ */
public class SuspendBinlogReaderAckEvent implements SourceEvent { public class BinlogSplitUpdateAckEvent implements SourceEvent {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public SuspendBinlogReaderAckEvent() {} public BinlogSplitUpdateAckEvent() {}
} }

@ -23,11 +23,12 @@ import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/** /**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} broadcasts to {@link * The {@link SourceEvent} that {@link MySqlSourceEnumerator} broadcasts to {@link
* MySqlSourceReader} to tell the source reader to suspend the binlog reader. * MySqlSourceReader} to tell the source reader to update the binlog split after newly added table
* snapshot splits finished.
*/ */
public class SuspendBinlogReaderEvent implements SourceEvent { public class BinlogSplitUpdateRequestEvent implements SourceEvent {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public SuspendBinlogReaderEvent() {} public BinlogSplitUpdateRequestEvent() {}
} }

@ -1,46 +0,0 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.events;
import org.apache.flink.api.connector.source.SourceEvent;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* wake up source reader to consume split again.
*/
public class WakeupReaderEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
/** Wake up target. */
public enum WakeUpTarget {
SNAPSHOT_READER,
BINLOG_READER
}
private WakeUpTarget target;
public WakeupReaderEvent(WakeUpTarget target) {
this.target = target;
}
public WakeUpTarget getTarget() {
return target;
}
}

@ -24,19 +24,19 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeEvent; import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent;
import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
@ -57,15 +57,18 @@ import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER; import static com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner.BINLOG_SPLIT_ID;
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId; import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
import static com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils.discoverSchemaForNewAddedTables;
/** The source reader for MySQL source splits. */ /** The source reader for MySQL source splits. */
public class MySqlSourceReader<T> public class MySqlSourceReader<T>
@ -73,13 +76,12 @@ public class MySqlSourceReader<T>
SourceRecords, T, MySqlSplit, MySqlSplitState> { SourceRecords, T, MySqlSplit, MySqlSplitState> {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class); private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class);
private final MySqlSourceConfig sourceConfig; private final MySqlSourceConfig sourceConfig;
private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits; private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits; private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
private final int subtaskId; private final int subtaskId;
private final MySqlSourceReaderContext mySqlSourceReaderContext; private final MySqlSourceReaderContext mySqlSourceReaderContext;
private MySqlBinlogSplit suspendedBinlogSplit; private volatile MySqlBinlogSplit suspendedBinlogSplit;
public MySqlSourceReader( public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue, FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue,
@ -104,7 +106,7 @@ public class MySqlSourceReader<T>
@Override @Override
public void start() { public void start() {
if (getNumberOfCurrentlyAssignedSplits() == 0) { if (getNumberOfCurrentlyAssignedSplits() <= 1) {
context.sendSplitRequest(); context.sendSplitRequest();
} }
} }
@ -128,9 +130,6 @@ public class MySqlSourceReader<T>
.filter(split -> !finishedUnackedSplits.containsKey(split.splitId())) .filter(split -> !finishedUnackedSplits.containsKey(split.splitId()))
.collect(Collectors.toList()); .collect(Collectors.toList());
// add finished snapshot splits that didn't receive ack yet
unfinishedSplits.addAll(finishedUnackedSplits.values());
// add binlog splits who are uncompleted // add binlog splits who are uncompleted
unfinishedSplits.addAll(uncompletedBinlogSplits.values()); unfinishedSplits.addAll(uncompletedBinlogSplits.values());
@ -147,29 +146,51 @@ public class MySqlSourceReader<T>
@Override @Override
protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) { protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
boolean requestNextSplit = true; boolean requestNextSplit = true;
for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) { if (isNewlyAddedTableSplitAndBinlogSplit(finishedSplitIds)) {
MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit(); MySqlSplitState mySqlBinlogSplitState = finishedSplitIds.remove(BINLOG_SPLIT_ID);
if (mySqlSplit.isBinlogSplit()) { finishedSplitIds
LOG.info( .values()
"binlog split reader suspended due to newly added table, offset {}", .forEach(
mySqlSplitState.asBinlogSplitState().getStartingOffset()); newAddedSplitState ->
finishedUnackedSplits.put(
mySqlSourceReaderContext.resetStopBinlogSplitReader(); newAddedSplitState.toMySqlSplit().splitId(),
suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit()); newAddedSplitState.toMySqlSplit().asSnapshotSplit()));
context.sendSourceEventToCoordinator(new SuspendBinlogReaderAckEvent()); this.addSplits(Collections.singletonList(mySqlBinlogSplitState.toMySqlSplit()));
// do not request next split when the reader is suspended, the suspended reader will } else {
// automatically request the next split after it has been wakeup Preconditions.checkState(finishedSplitIds.size() == 1);
requestNextSplit = false; for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
} else { MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit()); if (mySqlSplit.isBinlogSplit()) {
suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit());
LOG.info(
"binlog split reader suspended success after the newly added table process, current offset {}",
suspendedBinlogSplit.getStartingOffset());
context.sendSourceEventToCoordinator(
new LatestFinishedSplitsNumberRequestEvent());
// do not request next split when the reader is suspended
requestNextSplit = false;
} else {
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
}
} }
reportFinishedSnapshotSplitsIfNeed();
} }
reportFinishedSnapshotSplitsIfNeed();
if (requestNextSplit) { if (requestNextSplit) {
context.sendSplitRequest(); context.sendSplitRequest();
} }
} }
/**
* During the newly added table process, for the source reader who holds the binlog split, we
* return the latest finished snapshot split and binlog split as well, this design let us have
* opportunity to exchange binlog reading and snapshot reading, we put the binlog split back.
*/
private boolean isNewlyAddedTableSplitAndBinlogSplit(
Map<String, MySqlSplitState> finishedSplitIds) {
return finishedSplitIds.containsKey(BINLOG_SPLIT_ID) && finishedSplitIds.size() == 2;
}
@Override @Override
public void addSplits(List<MySqlSplit> splits) { public void addSplits(List<MySqlSplit> splits) {
// restore for finishedUnackedSplits // restore for finishedUnackedSplits
@ -185,6 +206,13 @@ public class MySqlSourceReader<T>
} }
} else { } else {
MySqlBinlogSplit binlogSplit = split.asBinlogSplit(); MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
// Try to discovery table schema once for newly added tables when source reader
// start or restore
boolean checkNewlyAddedTableSchema =
!mySqlSourceReaderContext.isHasAssignedBinlogSplit()
&& sourceConfig.isScanNewlyAddedTableEnabled();
mySqlSourceReaderContext.setHasAssignedBinlogSplit(true);
// the binlog split is suspended // the binlog split is suspended
if (binlogSplit.isSuspended()) { if (binlogSplit.isSuspended()) {
suspendedBinlogSplit = binlogSplit; suspendedBinlogSplit = binlogSplit;
@ -194,7 +222,10 @@ public class MySqlSourceReader<T>
} else { } else {
uncompletedBinlogSplits.remove(split.splitId()); uncompletedBinlogSplits.remove(split.splitId());
MySqlBinlogSplit mySqlBinlogSplit = MySqlBinlogSplit mySqlBinlogSplit =
discoverTableSchemasForBinlogSplit(split.asBinlogSplit()); discoverTableSchemasForBinlogSplit(
split.asBinlogSplit(),
sourceConfig,
checkNewlyAddedTableSchema);
unfinishedSplits.add(mySqlBinlogSplit); unfinishedSplits.add(mySqlBinlogSplit);
} }
} }
@ -207,26 +238,6 @@ public class MySqlSourceReader<T>
} }
} }
private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) {
final String splitId = split.splitId();
if (split.getTableSchemas().isEmpty()) {
try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc);
LOG.info("The table schema discovery for binlog split {} success", splitId);
return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
} catch (SQLException e) {
LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
throw new FlinkRuntimeException(e);
}
} else {
LOG.warn(
"The binlog split {} has table schemas yet, skip the table schema discovery",
split);
return split;
}
}
@Override @Override
public void handleSourceEvents(SourceEvent sourceEvent) { public void handleSourceEvents(SourceEvent sourceEvent) {
if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) { if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
@ -249,33 +260,38 @@ public class MySqlSourceReader<T>
"The subtask {} receives binlog meta with group id {}.", "The subtask {} receives binlog meta with group id {}.",
subtaskId, subtaskId,
((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); fillMetadataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
} else if (sourceEvent instanceof SuspendBinlogReaderEvent) { } else if (sourceEvent instanceof BinlogSplitUpdateRequestEvent) {
mySqlSourceReaderContext.setStopBinlogSplitReader(); handleBinlogSplitUpdateRequest();
} else if (sourceEvent instanceof WakeupReaderEvent) { } else if (sourceEvent instanceof LatestFinishedSplitsNumberEvent) {
WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent) sourceEvent; updateBinlogSplit((LatestFinishedSplitsNumberEvent) sourceEvent);
if (wakeupReaderEvent.getTarget() == SNAPSHOT_READER) {
context.sendSplitRequest();
} else {
if (suspendedBinlogSplit != null) {
context.sendSourceEventToCoordinator(
new LatestFinishedSplitsSizeRequestEvent());
}
}
} else if (sourceEvent instanceof LatestFinishedSplitsSizeEvent) {
if (suspendedBinlogSplit != null) {
final int finishedSplitsSize =
((LatestFinishedSplitsSizeEvent) sourceEvent).getLatestFinishedSplitsSize();
final MySqlBinlogSplit binlogSplit =
toNormalBinlogSplit(suspendedBinlogSplit, finishedSplitsSize);
suspendedBinlogSplit = null;
this.addSplits(Collections.singletonList(binlogSplit));
}
} else { } else {
super.handleSourceEvents(sourceEvent); super.handleSourceEvents(sourceEvent);
} }
} }
private void handleBinlogSplitUpdateRequest() {
mySqlSourceReaderContext.suspendBinlogSplitReader();
}
private void updateBinlogSplit(LatestFinishedSplitsNumberEvent sourceEvent) {
if (suspendedBinlogSplit != null) {
final int finishedSplitsSize = sourceEvent.getLatestFinishedSplitsNumber();
final MySqlBinlogSplit binlogSplit =
toNormalBinlogSplit(suspendedBinlogSplit, finishedSplitsSize);
suspendedBinlogSplit = null;
this.addSplits(Collections.singletonList(binlogSplit));
context.sendSourceEventToCoordinator(new BinlogSplitUpdateAckEvent());
LOG.info("Notify enumerator that binlog split has been updated.");
mySqlSourceReaderContext.wakeupSuspendedBinlogSplitReader();
LOG.info("Wake up suspended binlog reader as binlog split has been updated.");
} else {
LOG.warn("Unexpected event {}, this should not happen.", sourceEvent);
}
}
private void reportFinishedSnapshotSplitsIfNeed() { private void reportFinishedSnapshotSplitsIfNeed() {
if (!finishedUnackedSplits.isEmpty()) { if (!finishedUnackedSplits.isEmpty()) {
final Map<String, BinlogOffset> finishedOffsets = new HashMap<>(); final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
@ -308,7 +324,7 @@ public class MySqlSourceReader<T>
} }
} }
private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) { private void fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId()); MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId());
if (binlogSplit != null) { if (binlogSplit != null) {
final int receivedMetaGroupId = metadataEvent.getMetaGroupId(); final int receivedMetaGroupId = metadataEvent.getMetaGroupId();
@ -317,15 +333,22 @@ public class MySqlSourceReader<T>
binlogSplit.getFinishedSnapshotSplitInfos().size(), binlogSplit.getFinishedSnapshotSplitInfos().size(),
sourceConfig.getSplitMetaGroupSize()); sourceConfig.getSplitMetaGroupSize());
if (receivedMetaGroupId == expectedMetaGroupId) { if (receivedMetaGroupId == expectedMetaGroupId) {
List<FinishedSnapshotSplitInfo> metaDataGroup = List<FinishedSnapshotSplitInfo> newAddedMetadataGroup;
Set<String> existedSplitsOfLastGroup =
getExistedSplitsOfLastGroup(
binlogSplit.getFinishedSnapshotSplitInfos(),
sourceConfig.getSplitMetaGroupSize());
newAddedMetadataGroup =
metadataEvent.getMetaGroup().stream() metadataEvent.getMetaGroup().stream()
.map(FinishedSnapshotSplitInfo::deserialize) .map(FinishedSnapshotSplitInfo::deserialize)
.filter(r -> !existedSplitsOfLastGroup.contains(r.getSplitId()))
.collect(Collectors.toList()); .collect(Collectors.toList());
uncompletedBinlogSplits.put( uncompletedBinlogSplits.put(
binlogSplit.splitId(), binlogSplit.splitId(),
MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup)); MySqlBinlogSplit.appendFinishedSplitInfos(
binlogSplit, newAddedMetadataGroup));
LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size()); LOG.info("Fill metadata of group {} to binlog split", newAddedMetadataGroup.size());
} else { } else {
LOG.warn( LOG.warn(
"Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it", "Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it",
@ -341,6 +364,58 @@ public class MySqlSourceReader<T>
} }
} }
private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(
MySqlBinlogSplit split,
MySqlSourceConfig sourceConfig,
boolean checkNewlyAddedTableSchema) {
if (split.getTableSchemas().isEmpty() || checkNewlyAddedTableSchema) {
try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
Map<TableId, TableChanges.TableChange> tableSchemas;
if (split.getTableSchemas().isEmpty()) {
tableSchemas =
TableDiscoveryUtils.discoverSchemaForCapturedTables(sourceConfig, jdbc);
LOG.info(
"The table schema discovery for binlog split {} success",
split.splitId());
} else {
List<TableId> existedTables = new ArrayList<>(split.getTableSchemas().keySet());
tableSchemas =
discoverSchemaForNewAddedTables(existedTables, sourceConfig, jdbc);
LOG.info(
"The table schema discovery for new added tables of binlog split {} success",
split.splitId());
}
return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas);
} catch (SQLException e) {
LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
throw new FlinkRuntimeException(e);
}
} else {
LOG.warn(
"The binlog split {} has table schemas yet, skip the table schema discovery",
split);
return split;
}
}
private Set<String> getExistedSplitsOfLastGroup(
List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int metaGroupSize) {
Set<String> existedSplitsOfLastGroup = new HashSet<>();
int splitsNumOfLastGroup =
finishedSnapshotSplits.size() % sourceConfig.getSplitMetaGroupSize();
if (splitsNumOfLastGroup != 0) {
int lastGroupStart =
((int) (finishedSnapshotSplits.size() / sourceConfig.getSplitMetaGroupSize()))
* metaGroupSize;
existedSplitsOfLastGroup =
finishedSnapshotSplits
.subList(lastGroupStart, lastGroupStart + splitsNumOfLastGroup).stream()
.map(FinishedSnapshotSplitInfo::getSplitId)
.collect(Collectors.toSet());
}
return existedSplitsOfLastGroup;
}
private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) { private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) {
if (!LOG.isInfoEnabled()) { if (!LOG.isInfoEnabled()) {
return; return;

@ -25,26 +25,36 @@ import org.apache.flink.api.connector.source.SourceReaderContext;
public class MySqlSourceReaderContext { public class MySqlSourceReaderContext {
private final SourceReaderContext sourceReaderContext; private final SourceReaderContext sourceReaderContext;
private volatile boolean stopBinlogSplitReader; private volatile boolean isBinlogSplitReaderSuspended;
private volatile boolean hasAssignedBinlogSplit;
public MySqlSourceReaderContext(final SourceReaderContext sourceReaderContext) { public MySqlSourceReaderContext(final SourceReaderContext sourceReaderContext) {
this.sourceReaderContext = sourceReaderContext; this.sourceReaderContext = sourceReaderContext;
this.stopBinlogSplitReader = false; this.isBinlogSplitReaderSuspended = false;
this.hasAssignedBinlogSplit = false;
} }
public SourceReaderContext getSourceReaderContext() { public SourceReaderContext getSourceReaderContext() {
return sourceReaderContext; return sourceReaderContext;
} }
public boolean needStopBinlogSplitReader() { public boolean isBinlogSplitReaderSuspended() {
return stopBinlogSplitReader; return isBinlogSplitReaderSuspended;
} }
public void setStopBinlogSplitReader() { public void suspendBinlogSplitReader() {
this.stopBinlogSplitReader = true; this.isBinlogSplitReaderSuspended = true;
} }
public void resetStopBinlogSplitReader() { public void wakeupSuspendedBinlogSplitReader() {
this.stopBinlogSplitReader = false; this.isBinlogSplitReaderSuspended = false;
}
public boolean isHasAssignedBinlogSplit() {
return hasAssignedBinlogSplit;
}
public void setHasAssignedBinlogSplit(boolean hasAssignedBinlogSplit) {
this.hasAssignedBinlogSplit = hasAssignedBinlogSplit;
} }
} }

@ -28,7 +28,9 @@ import com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlRecords; import com.ververica.cdc.connectors.mysql.source.split.MySqlRecords;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; import com.ververica.cdc.connectors.mysql.source.split.SourceRecords;
import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnection;
@ -39,58 +41,161 @@ import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Queue; import java.util.Set;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient; import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.createBinaryClient;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection;
import static com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner.BINLOG_SPLIT_ID;
/** The {@link SplitReader} implementation for the {@link MySqlSource}. */ /** The {@link SplitReader} implementation for the {@link MySqlSource}. */
public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit> { public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit> {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSplitReader.class); private static final Logger LOG = LoggerFactory.getLogger(MySqlSplitReader.class);
private final Queue<MySqlSplit> splits; private final ArrayDeque<MySqlSnapshotSplit> snapshotSplits;
private final ArrayDeque<MySqlBinlogSplit> binlogSplits;
private final MySqlSourceConfig sourceConfig; private final MySqlSourceConfig sourceConfig;
private final int subtaskId; private final int subtaskId;
private final MySqlSourceReaderContext context; private final MySqlSourceReaderContext context;
@Nullable private DebeziumReader<SourceRecords, MySqlSplit> currentReader;
@Nullable private String currentSplitId; @Nullable private String currentSplitId;
@Nullable private DebeziumReader<SourceRecords, MySqlSplit> currentReader;
@Nullable private SnapshotSplitReader reusedSnapshotReader;
@Nullable private BinlogSplitReader reusedBinlogReader;
public MySqlSplitReader( public MySqlSplitReader(
MySqlSourceConfig sourceConfig, int subtaskId, MySqlSourceReaderContext context) { MySqlSourceConfig sourceConfig, int subtaskId, MySqlSourceReaderContext context) {
this.sourceConfig = sourceConfig; this.sourceConfig = sourceConfig;
this.subtaskId = subtaskId; this.subtaskId = subtaskId;
this.splits = new ArrayDeque<>(); this.snapshotSplits = new ArrayDeque<>();
this.binlogSplits = new ArrayDeque<>(1);
this.context = context; this.context = context;
} }
@Override @Override
public RecordsWithSplitIds<SourceRecords> fetch() throws IOException { public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
checkSplitOrStartNext();
checkNeedStopBinlogReader();
Iterator<SourceRecords> dataIt;
try { try {
dataIt = currentReader.pollSplitRecords(); suspendBinlogReaderIfNeed();
return pollSplitRecords();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("fetch data failed.", e); LOG.warn("fetch data failed.", e);
throw new IOException(e); throw new IOException(e);
} }
return dataIt == null
? finishedSnapshotSplit()
: MySqlRecords.forRecords(currentSplitId, dataIt);
} }
private void checkNeedStopBinlogReader() { /** Suspends binlog reader until updated binlog split join again. */
if (currentReader instanceof BinlogSplitReader private void suspendBinlogReaderIfNeed() {
&& context.needStopBinlogSplitReader() if (currentReader != null
&& currentReader instanceof BinlogSplitReader
&& context.isBinlogSplitReaderSuspended()
&& !currentReader.isFinished()) { && !currentReader.isFinished()) {
((BinlogSplitReader) currentReader).stopBinlogReadTask(); ((BinlogSplitReader) currentReader).stopBinlogReadTask();
LOG.info("Suspend binlog reader to wait the binlog split update.");
}
}
private MySqlRecords pollSplitRecords() throws InterruptedException {
Iterator<SourceRecords> dataIt;
if (currentReader == null) {
// (1) Reads binlog split firstly and then read snapshot split
if (binlogSplits.size() > 0) {
// the binlog split may come from:
// (a) the initial binlog split
// (b) added back binlog-split in newly added table process
MySqlSplit nextSplit = binlogSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getBinlogSplitReader();
currentReader.submitSplit(nextSplit);
} else if (snapshotSplits.size() > 0) {
MySqlSplit nextSplit = snapshotSplits.poll();
currentSplitId = nextSplit.splitId();
currentReader = getSnapshotSplitReader();
currentReader.submitSplit(nextSplit);
} else {
LOG.info("No available split to read.");
}
dataIt = currentReader.pollSplitRecords();
return dataIt == null ? finishedSplit() : forRecords(dataIt);
} else if (currentReader instanceof SnapshotSplitReader) {
// (2) try to switch to binlog split reading util current snapshot split finished
dataIt = currentReader.pollSplitRecords();
if (dataIt != null) {
// first fetch data of snapshot split, return and emit the records of snapshot split
MySqlRecords records;
if (context.isHasAssignedBinlogSplit()) {
records = forNewAddedTableFinishedSplit(currentSplitId, dataIt);
closeSnapshotReader();
closeBinlogReader();
} else {
records = forRecords(dataIt);
MySqlSplit nextSplit = snapshotSplits.poll();
if (nextSplit != null) {
currentSplitId = nextSplit.splitId();
currentReader.submitSplit(nextSplit);
} else {
closeSnapshotReader();
}
}
return records;
} else {
return finishedSplit();
}
} else if (currentReader instanceof BinlogSplitReader) {
// (3) switch to snapshot split reading if there are newly added snapshot splits
dataIt = currentReader.pollSplitRecords();
if (dataIt != null) {
// try to switch to read snapshot split if there are new added snapshot
MySqlSplit nextSplit = snapshotSplits.poll();
if (nextSplit != null) {
closeBinlogReader();
LOG.info("It's turn to switch next fetch reader to snapshot split reader");
currentSplitId = nextSplit.splitId();
currentReader = getSnapshotSplitReader();
currentReader.submitSplit(nextSplit);
}
return MySqlRecords.forBinlogRecords(BINLOG_SPLIT_ID, dataIt);
} else {
// null will be returned after receiving suspend binlog event
// finish current binlog split reading
closeBinlogReader();
return finishedSplit();
}
} else {
throw new IllegalStateException("Unsupported reader type.");
}
}
private MySqlRecords finishedSplit() {
final MySqlRecords finishedRecords = MySqlRecords.forFinishedSplit(currentSplitId);
currentSplitId = null;
return finishedRecords;
}
private MySqlRecords forRecords(Iterator<SourceRecords> dataIt) {
if (currentReader instanceof SnapshotSplitReader) {
final MySqlRecords finishedRecords =
MySqlRecords.forSnapshotRecords(currentSplitId, dataIt);
closeSnapshotReader();
return finishedRecords;
} else {
return MySqlRecords.forBinlogRecords(currentSplitId, dataIt);
} }
} }
/**
* Finishes new added snapshot split, mark the binlog split as finished too, we will add the
* binlog split back in {@code MySqlSourceReader}.
*/
private MySqlRecords forNewAddedTableFinishedSplit(
final String splitId, final Iterator<SourceRecords> recordsForSplit) {
final Set<String> finishedSplits = new HashSet<>();
finishedSplits.add(splitId);
finishedSplits.add(BINLOG_SPLIT_ID);
currentSplitId = null;
return new MySqlRecords(splitId, recordsForSplit, finishedSplits);
}
@Override @Override
public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) { public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
if (!(splitsChanges instanceof SplitsAddition)) { if (!(splitsChanges instanceof SplitsAddition)) {
@ -100,8 +205,14 @@ public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit>
splitsChanges.getClass())); splitsChanges.getClass()));
} }
LOG.debug("Handling split change {}", splitsChanges); LOG.info("Handling split change {}", splitsChanges);
splits.addAll(splitsChanges.splits()); for (MySqlSplit mySqlSplit : splitsChanges.splits()) {
if (mySqlSplit.isSnapshotSplit()) {
snapshotSplits.add(mySqlSplit.asSnapshotSplit());
} else {
binlogSplits.add(mySqlSplit.asBinlogSplit());
}
}
} }
@Override @Override
@ -109,64 +220,54 @@ public class MySqlSplitReader implements SplitReader<SourceRecords, MySqlSplit>
@Override @Override
public void close() throws Exception { public void close() throws Exception {
if (currentReader != null) { closeSnapshotReader();
LOG.info( closeBinlogReader();
"Close current debezium reader {}",
currentReader.getClass().getCanonicalName());
currentReader.close();
currentSplitId = null;
}
} }
private void checkSplitOrStartNext() throws IOException { private SnapshotSplitReader getSnapshotSplitReader() {
if (canAssignNextSplit()) { if (reusedSnapshotReader == null) {
MySqlSplit nextSplit = splits.poll(); final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig);
if (nextSplit == null) { final BinaryLogClient binaryLogClient =
return; createBinaryClient(sourceConfig.getDbzConfiguration());
} final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
currentSplitId = nextSplit.splitId(); reusedSnapshotReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
}
return reusedSnapshotReader;
}
if (nextSplit.isSnapshotSplit()) { private BinlogSplitReader getBinlogSplitReader() {
if (currentReader instanceof BinlogSplitReader) { if (reusedBinlogReader == null) {
LOG.info( final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig);
"This is the point from binlog split reading change to snapshot split reading"); final BinaryLogClient binaryLogClient =
currentReader.close(); createBinaryClient(sourceConfig.getDbzConfiguration());
currentReader = null; final StatefulTaskContext statefulTaskContext =
} new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
if (currentReader == null) { reusedBinlogReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig);
final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
}
} else {
// point from snapshot split to binlog split
if (currentReader != null) {
LOG.info("It's turn to read binlog split, close current snapshot reader");
currentReader.close();
}
final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig);
final BinaryLogClient binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
LOG.info("BinlogSplitReader is created.");
}
currentReader.submitSplit(nextSplit);
} }
return reusedBinlogReader;
} }
private boolean canAssignNextSplit() { private void closeSnapshotReader() {
return currentReader == null || currentReader.isFinished(); if (reusedSnapshotReader != null) {
LOG.debug(
"Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName());
reusedSnapshotReader.close();
if (reusedSnapshotReader == currentReader) {
currentReader = null;
}
reusedSnapshotReader = null;
}
} }
private MySqlRecords finishedSnapshotSplit() { private void closeBinlogReader() {
final MySqlRecords finishedRecords = MySqlRecords.forFinishedSplit(currentSplitId); if (reusedBinlogReader != null) {
currentSplitId = null; LOG.debug("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName());
return finishedRecords; reusedBinlogReader.close();
if (reusedBinlogReader == currentReader) {
currentReader = null;
}
reusedBinlogReader = null;
}
} }
} }

@ -23,7 +23,6 @@ import io.debezium.relational.history.TableChanges.TableChange;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -153,10 +152,17 @@ public class MySqlBinlogSplit extends MySqlSplit {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
public static MySqlBinlogSplit appendFinishedSplitInfos( public static MySqlBinlogSplit appendFinishedSplitInfos(
MySqlBinlogSplit binlogSplit, List<FinishedSnapshotSplitInfo> splitInfos) { MySqlBinlogSplit binlogSplit, List<FinishedSnapshotSplitInfo> splitInfos) {
// re-calculate the starting binlog offset after the new table added
BinlogOffset startingOffset = binlogSplit.getStartingOffset();
for (FinishedSnapshotSplitInfo splitInfo : splitInfos) {
if (splitInfo.getHighWatermark().isBefore(startingOffset)) {
startingOffset = splitInfo.getHighWatermark();
}
}
splitInfos.addAll(binlogSplit.getFinishedSnapshotSplitInfos()); splitInfos.addAll(binlogSplit.getFinishedSnapshotSplitInfos());
return new MySqlBinlogSplit( return new MySqlBinlogSplit(
binlogSplit.splitId, binlogSplit.splitId,
binlogSplit.getStartingOffset(), startingOffset,
binlogSplit.getEndingOffset(), binlogSplit.getEndingOffset(),
splitInfos, splitInfos,
binlogSplit.getTableSchemas(), binlogSplit.getTableSchemas(),
@ -194,9 +200,43 @@ public class MySqlBinlogSplit extends MySqlSplit {
normalBinlogSplit.splitId, normalBinlogSplit.splitId,
normalBinlogSplit.getStartingOffset(), normalBinlogSplit.getStartingOffset(),
normalBinlogSplit.getEndingOffset(), normalBinlogSplit.getEndingOffset(),
new ArrayList<>(), forwardHighWatermarkToStartingOffset(
new HashMap<>(), normalBinlogSplit.getFinishedSnapshotSplitInfos(),
normalBinlogSplit.getStartingOffset()),
normalBinlogSplit.getTableSchemas(),
normalBinlogSplit.getTotalFinishedSplitSize(), normalBinlogSplit.getTotalFinishedSplitSize(),
true); true);
} }
/**
* Forwards {@link FinishedSnapshotSplitInfo#getHighWatermark()} to current binlog reading
* offset for these snapshot-splits have started the binlog reading, this is pretty useful for
* newly added table process that we can continue to consume binlog for these splits from the
* updated high watermark.
*
* @param existedSplitInfos
* @param currentBinlogReadingOffset
*/
private static List<FinishedSnapshotSplitInfo> forwardHighWatermarkToStartingOffset(
List<FinishedSnapshotSplitInfo> existedSplitInfos,
BinlogOffset currentBinlogReadingOffset) {
List<FinishedSnapshotSplitInfo> updatedSnapshotSplitInfos = new ArrayList<>();
for (FinishedSnapshotSplitInfo existedSplitInfo : existedSplitInfos) {
// for split has started read binlog, forward its high watermark to current binlog
// reading offset
if (existedSplitInfo.getHighWatermark().isBefore(currentBinlogReadingOffset)) {
FinishedSnapshotSplitInfo forwardHighWatermarkSnapshotSplitInfo =
new FinishedSnapshotSplitInfo(
existedSplitInfo.getTableId(),
existedSplitInfo.getSplitId(),
existedSplitInfo.getSplitStart(),
existedSplitInfo.getSplitEnd(),
currentBinlogReadingOffset);
updatedSnapshotSplitInfos.add(forwardHighWatermarkSnapshotSplitInfo);
} else {
updatedSnapshotSplitInfos.add(existedSplitInfo);
}
}
return updatedSnapshotSplitInfos;
}
} }

@ -75,11 +75,16 @@ public final class MySqlRecords implements RecordsWithSplitIds<SourceRecords> {
return finishedSnapshotSplits; return finishedSnapshotSplits;
} }
public static MySqlRecords forRecords( public static MySqlRecords forBinlogRecords(
final String splitId, final Iterator<SourceRecords> recordsForSplit) { final String splitId, final Iterator<SourceRecords> recordsForSplit) {
return new MySqlRecords(splitId, recordsForSplit, Collections.emptySet()); return new MySqlRecords(splitId, recordsForSplit, Collections.emptySet());
} }
public static MySqlRecords forSnapshotRecords(
final String splitId, final Iterator<SourceRecords> recordsForSplit) {
return new MySqlRecords(splitId, recordsForSplit, Collections.singleton(splitId));
}
public static MySqlRecords forFinishedSplit(final String splitId) { public static MySqlRecords forFinishedSplit(final String splitId) {
return new MySqlRecords(null, null, Collections.singleton(splitId)); return new MySqlRecords(null, null, Collections.singleton(splitId));
} }

@ -24,7 +24,7 @@ import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalTableFilters; import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges; import io.debezium.relational.history.TableChanges.TableChange;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,11 +33,13 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote; import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote;
/** Utilities to discovery matched tables. */ /** Utilities to discovery matched tables. */
public class TableDiscoveryUtils { public class TableDiscoveryUtils {
private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class); private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class);
public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters) public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters)
@ -93,7 +95,7 @@ public class TableDiscoveryUtils {
return capturedTableIds; return capturedTableIds;
} }
public static Map<TableId, TableChanges.TableChange> discoverCapturedTableSchemas( public static Map<TableId, TableChange> discoverSchemaForCapturedTables(
MySqlSourceConfig sourceConfig, MySqlConnection jdbc) { MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
final List<TableId> capturedTableIds; final List<TableId> capturedTableIds;
try { try {
@ -101,18 +103,38 @@ public class TableDiscoveryUtils {
} catch (SQLException e) { } catch (SQLException e) {
throw new FlinkRuntimeException("Failed to discover captured tables", e); throw new FlinkRuntimeException("Failed to discover captured tables", e);
} }
return discoverSchemaForCapturedTables(capturedTableIds, sourceConfig, jdbc);
}
public static Map<TableId, TableChange> discoverSchemaForNewAddedTables(
List<TableId> existedTables, MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
final List<TableId> capturedTableIds;
try {
capturedTableIds =
listTables(jdbc, sourceConfig.getTableFilters()).stream()
.filter(tableId -> !existedTables.contains(tableId))
.collect(Collectors.toList());
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to discover captured tables", e);
}
return capturedTableIds.isEmpty()
? new HashMap<>()
: discoverSchemaForCapturedTables(capturedTableIds, sourceConfig, jdbc);
}
public static Map<TableId, TableChange> discoverSchemaForCapturedTables(
List<TableId> capturedTableIds, MySqlSourceConfig sourceConfig, MySqlConnection jdbc) {
if (capturedTableIds.isEmpty()) { if (capturedTableIds.isEmpty()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
String.format( String.format(
"Can't find any matched tables, please check your configured database-name: %s and table-name: %s", "Can't find any matched tables, please check your configured database-name: %s and table-name: %s",
sourceConfig.getDatabaseList(), sourceConfig.getTableList())); sourceConfig.getDatabaseList(), sourceConfig.getTableList()));
} }
// fetch table schemas // fetch table schemas
MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive()); MySqlSchema mySqlSchema = new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive());
Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>(); Map<TableId, TableChange> tableSchemas = new HashMap<>();
for (TableId tableId : capturedTableIds) { for (TableId tableId : capturedTableIds) {
TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(jdbc, tableId); TableChange tableSchema = mySqlSchema.getTableSchema(jdbc, tableId);
tableSchemas.put(tableId, tableSchema); tableSchemas.put(tableId, tableSchema);
} }
return tableSchemas; return tableSchemas;

@ -731,7 +731,7 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
binlogSplitAssigner.open(); binlogSplitAssigner.open();
try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
Map<TableId, TableChanges.TableChange> tableSchemas = Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); TableDiscoveryUtils.discoverSchemaForCapturedTables(sourceConfig, jdbc);
return MySqlBinlogSplit.fillTableSchemas( return MySqlBinlogSplit.fillTableSchemas(
binlogSplitAssigner.getNext().get().asBinlogSplit(), tableSchemas); binlogSplitAssigner.getNext().get().asBinlogSplit(), tableSchemas);
} }

@ -115,7 +115,7 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
MySqlSplit binlogSplit; MySqlSplit binlogSplit;
try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) {
Map<TableId, TableChanges.TableChange> tableSchemas = Map<TableId, TableChanges.TableChange> tableSchemas =
TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); TableDiscoveryUtils.discoverSchemaForCapturedTables(sourceConfig, jdbc);
binlogSplit = binlogSplit =
MySqlBinlogSplit.fillTableSchemas( MySqlBinlogSplit.fillTableSchemas(
createBinlogSplit(sourceConfig).asBinlogSplit(), tableSchemas); createBinlogSplit(sourceConfig).asBinlogSplit(), tableSchemas);

Loading…
Cancel
Save