[mysql] Improve newly added tables process

pull/842/head
Leonard Xu 3 years ago
parent c21aa91e5d
commit c94791fd14

@ -274,7 +274,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
}
public void finishBinlogSplit() {
public void stopBinlogReadTask() {
this.currentTaskRunning = false;
}
}

@ -190,9 +190,9 @@ public class MySqlSourceBuilder<T> {
return this;
}
/** Whether the {@link MySqlSource} should capture the newly added tables or not. */
public MySqlSourceBuilder<T> captureNewTables(boolean captureNewTables) {
this.configFactory.captureNewTables(captureNewTables);
/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);
return this;
}

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.assigners;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.String.format;
/**
* The state of split assigner finite state machine, tips: we use word status instead of word state
* to avoid conflict with Flink state keyword. The assigner finite state machine goes this way.
*
* <pre>
* INITIAL_ASSIGNING(start)
* |
* |
* onFinish()
* |
*
* INITIAL_ASSIGNING_FINISHED(end)
* |
* |
* suspend() // found newly added tables
* |
*
* SUSPENDED --- wakeup() -- NEWLY_ADDED_ASSIGNING --- onFinish() -- NEWLY_ADDED_ASSIGNING_FINISHED(end)
* |
* | |
* |----------------- suspend() //found newly added tables -----------|
* </pre>
*/
public enum AssignerStatus {
INITIAL_ASSIGNING(0) {
@Override
public AssignerStatus getNextStatus() {
return INITIAL_ASSIGNING_FINISHED;
}
@Override
public AssignerStatus onFinish() {
LOG.info(
"Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED");
return this.getNextStatus();
}
},
INITIAL_ASSIGNING_FINISHED(1) {
@Override
public AssignerStatus getNextStatus() {
return SUSPENDED;
}
@Override
public AssignerStatus suspend() {
LOG.info("Assigner status changes from INITIAL_ASSIGNING_FINISHED to SUSPENDED");
return this.getNextStatus();
}
},
SUSPENDED(2) {
@Override
public AssignerStatus getNextStatus() {
return NEWLY_ADDED_ASSIGNING;
}
@Override
public AssignerStatus wakeup() {
LOG.info("Assigner status changes from SUSPENDED to NEWLY_ADDED_ASSIGNING");
return this.getNextStatus();
}
},
NEWLY_ADDED_ASSIGNING(3) {
@Override
public AssignerStatus getNextStatus() {
return NEWLY_ADDED_ASSIGNING_FINISHED;
}
@Override
public AssignerStatus onFinish() {
LOG.info(
"Assigner status changes from NEWLY_ADDED_ASSIGNING to NEWLY_ADDED_ASSIGNING_FINISHED");
return this.getNextStatus();
}
},
NEWLY_ADDED_ASSIGNING_FINISHED(4) {
@Override
public AssignerStatus getNextStatus() {
return SUSPENDED;
}
@Override
public AssignerStatus suspend() {
LOG.info("Assigner status changes from NEWLY_ADDED_ASSIGNING_FINISHED to SUSPENDED");
return this.getNextStatus();
}
};
private static final Logger LOG = LoggerFactory.getLogger(AssignerStatus.class);
private final int statusCode;
AssignerStatus(int statusCode) {
this.statusCode = statusCode;
}
public int getStatusCode() {
return statusCode;
}
public abstract AssignerStatus getNextStatus();
public AssignerStatus onFinish() {
throw new IllegalStateException(
format(
"Invalid call, assigner under %s state can not call onFinish()",
fromStatusCode(this.getStatusCode())));
}
public AssignerStatus suspend() {
throw new IllegalStateException(
format(
"Invalid call, assigner under %s state can not call suspend()",
fromStatusCode(this.getStatusCode())));
}
public AssignerStatus wakeup() {
throw new IllegalStateException(
format(
"Invalid call, assigner under %s state can not call wakeup()",
fromStatusCode(this.getStatusCode())));
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
/** Gets the {@link AssignerStatus} from status code. */
public static AssignerStatus fromStatusCode(int statusCode) {
switch (statusCode) {
case 0:
return INITIAL_ASSIGNING;
case 1:
return INITIAL_ASSIGNING_FINISHED;
case 2:
return SUSPENDED;
case 3:
return NEWLY_ADDED_ASSIGNING;
case 4:
return NEWLY_ADDED_ASSIGNING_FINISHED;
default:
throw new IllegalStateException(
format(
"Invalid status code %s,the valid code range is [0, 4]",
statusCode));
}
}
/** Returns whether the split assigner state is suspended. */
public static boolean isSuspended(AssignerStatus assignerStatus) {
return assignerStatus == SUSPENDED;
}
/**
* Returns whether the split assigner has assigned all snapshot splits, which indicates there is
* no more splits and all records of splits have been completely processed in the pipeline.
*/
public static boolean isAssigningFinished(AssignerStatus assignerStatus) {
return assignerStatus == INITIAL_ASSIGNING_FINISHED
|| assignerStatus == NEWLY_ADDED_ASSIGNING_FINISHED;
}
/** Returns whether the split assigner is assigning snapshot splits. */
public static boolean isAssigning(AssignerStatus assignerStatus) {
return assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING;
}
/** Returns whether the split assigner has finished its initial tables assignment. */
public static boolean isInitialAssigningFinished(AssignerStatus assignerStatus) {
return assignerStatus == INITIAL_ASSIGNING_FINISHED;
}
/** Returns whether the split assigner has finished its newly added tables assignment. */
public static boolean isNewlyAddedAssigningFinished(AssignerStatus assignerStatus) {
return assignerStatus == NEWLY_ADDED_ASSIGNING_FINISHED;
}
}

@ -112,21 +112,19 @@ public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner {
}
@Override
public void close() {}
@Override
public boolean isAssignerSuspended() {
return false;
public AssignerStatus getAssignerStatus() {
return AssignerStatus.INITIAL_ASSIGNING_FINISHED;
}
@Override
public int getTotalFinishedSplitSize() {
return 0;
}
public void suspend() {}
@Override
public void wakeup() {}
@Override
public void close() {}
// ------------------------------------------------------------------------------------------
private MySqlBinlogSplit createBinlogSplit() {

@ -39,6 +39,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isInitialAssigningFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isNewlyAddedAssigningFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended;
/**
* A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key
* range and chunk size and also continue with a binlog split.
@ -51,7 +55,6 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
private final int splitMetaGroupSize;
private boolean isBinlogSplitAssigned;
private boolean shouldWakeupBinlogSplitReader = false;
private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;
@ -94,8 +97,8 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
@Override
public Optional<MySqlSplit> getNext() {
if (snapshotSplitAssigner.isAssignerSuspended()) {
// do not assign split until Assigner receive SuspendBinlogSplitReaderResponseEvent
if (isSuspended(getAssignerStatus())) {
// do not assign split until the assigner received SuspendBinlogReaderAckEvent
return Optional.empty();
}
if (snapshotSplitAssigner.noMoreSplits()) {
@ -103,18 +106,15 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
if (isBinlogSplitAssigned) {
// no more splits for the assigner
return Optional.empty();
} else if (snapshotSplitAssigner.getAssignerState()
== SnapshotAssignerStatus.INIT_FINISH) {
} else if (isInitialAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) {
// we need to wait snapshot-assigner to be finished before
// assigning the binlog split. Otherwise, records emitted from binlog split
// might be out-of-order in terms of same primary key with snapshot splits.
isBinlogSplitAssigned = true;
return Optional.of(createBinlogSplit());
} else if (snapshotSplitAssigner.getAssignerState()
== SnapshotAssignerStatus.RESUMED_FINISH) {
// do not need to create binlog, but send event to wake up the binlog
} else if (isNewlyAddedAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) {
// do not need to create binlog, but send event to wake up the binlog reader
isBinlogSplitAssigned = true;
shouldWakeupBinlogSplitReader = true;
return Optional.empty();
} else {
// binlog split is not ready by now
@ -167,32 +167,23 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
}
@Override
public void close() {
snapshotSplitAssigner.close();
public AssignerStatus getAssignerStatus() {
return snapshotSplitAssigner.getAssignerStatus();
}
@Override
public boolean isAssignerSuspended() {
return snapshotSplitAssigner.isAssignerSuspended();
public void suspend() {
snapshotSplitAssigner.suspend();
}
@Override
public void wakeup() {
isBinlogSplitAssigned = false;
snapshotSplitAssigner.wakeup();
}
public boolean isShouldWakeupBinlogSplitReader() {
return shouldWakeupBinlogSplitReader;
}
public void setShouldWakeupBinlogSplitReader(final boolean shouldWakeupBinlogSplitReader) {
this.shouldWakeupBinlogSplitReader = shouldWakeupBinlogSplitReader;
}
@Override
public int getTotalFinishedSplitSize() {
return snapshotSplitAssigner.getTotalFinishedSplitSize();
public void close() {
snapshotSplitAssigner.close();
}
// --------------------------------------------------------------------------------------------

@ -50,6 +50,8 @@ import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables;
import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isAssigningFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended;
/**
* A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key
@ -64,12 +66,12 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private final List<MySqlSnapshotSplit> remainingSplits;
private final Map<String, MySqlSnapshotSplit> assignedSplits;
private final Map<String, BinlogOffset> splitFinishedOffsets;
private SnapshotAssignerStatus assignerState;
private final MySqlSourceConfig sourceConfig;
private final int currentParallelism;
private final LinkedList<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
private AssignerStatus assignerStatus;
private ChunkSplitter chunkSplitter;
private boolean isTableIdCaseSensitive;
@ -87,7 +89,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
new ArrayList<>(),
new HashMap<>(),
new HashMap<>(),
SnapshotAssignerStatus.INIT,
AssignerStatus.INITIAL_ASSIGNING,
remainingTables,
isTableIdCaseSensitive,
true);
@ -104,7 +106,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
checkpoint.getRemainingSplits(),
checkpoint.getAssignedSplits(),
checkpoint.getSplitFinishedOffsets(),
checkpoint.getAssignerState(),
checkpoint.getSnapshotAssignerStatus(),
checkpoint.getRemainingTables(),
checkpoint.isTableIdCaseSensitive(),
checkpoint.isRemainingTablesCheckpointed());
@ -117,7 +119,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
List<MySqlSnapshotSplit> remainingSplits,
Map<String, MySqlSnapshotSplit> assignedSplits,
Map<String, BinlogOffset> splitFinishedOffsets,
SnapshotAssignerStatus assignerState,
AssignerStatus assignerStatus,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed) {
@ -127,7 +129,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerState = assignerState;
this.assignerStatus = assignerStatus;
this.remainingTables = new LinkedList<>(remainingTables);
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
@ -138,7 +140,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive);
// the legacy state didn't snapshot remaining tables, discovery remaining table here
if (!isRemainingTablesCheckpointed && !isFinished()) {
if (!isRemainingTablesCheckpointed && !isAssigningFinished(assignerStatus)) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> discoverTables = discoverCapturedTables(jdbc, sourceConfig);
discoverTables.removeAll(alreadyProcessedTables);
@ -149,29 +151,32 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
"Failed to discover remaining tables to capture", e);
}
}
captureNewlyAddedTables();
}
// If the config do not need to capture newly added tables
if (!sourceConfig.isCaptureNewTables()) {
return;
}
// check whether we got newly added tables
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> discoverTables = discoverCapturedTables(jdbc, sourceConfig);
discoverTables.removeAll(alreadyProcessedTables);
// got some newly added tables
if (!discoverTables.isEmpty()) {
if (isFinished()) {
Preconditions.checkState(remainingTables.isEmpty());
assignerState = assignerState.nextState();
} else {
// if job is in snapshot phase, directly add the new table to remaining table
this.remainingTables.removeAll(discoverTables);
private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled()) {
// check whether we got newly added tables
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> newlyAddedTables = discoverCapturedTables(jdbc, sourceConfig);
newlyAddedTables.removeAll(alreadyProcessedTables);
newlyAddedTables.removeAll(remainingTables);
if (!newlyAddedTables.isEmpty()) {
// if job is still in snapshot reading phase, directly add all newly added
// tables
LOG.info("Found newly added tables, start capture newly added tables process");
remainingTables.addAll(newlyAddedTables);
if (isAssigningFinished(assignerStatus)) {
// start the newly added tables process under binlog reading phase
LOG.info(
"Found newly added tables, start capture newly added tables process under binlog reading phase");
this.suspend();
}
}
remainingTables.addAll(discoverTables);
} catch (Exception e) {
throw new FlinkRuntimeException(
"Failed to discover remaining tables to capture", e);
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to discover remaining tables to capture", e);
}
}
@ -185,7 +190,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
assignedSplits.put(split.splitId(), split);
return Optional.of(split);
} else {
// it's turn for new table
// it's turn for next table
TableId nextTable = remainingTables.pollFirst();
if (nextTable != null) {
// split the given table into chunks (snapshot splits)
@ -233,14 +238,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
@Override
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
this.splitFinishedOffsets.putAll(splitFinishedOffsets);
if (allSplitsFinished()) {
if (allSplitsFinished() && AssignerStatus.isAssigning(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerState = assignerState.nextState();
assignerStatus = assignerStatus.onFinish();
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
} else {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
@ -267,13 +271,15 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
remainingSplits,
assignedSplits,
splitFinishedOffsets,
assignerState,
assignerStatus,
remainingTables,
isTableIdCaseSensitive,
true);
// we need a complete checkpoint before mark this assigner to be finished, to wait for all
// records of snapshot splits are completely processed
if (checkpointIdToFinish == null && !isFinished() && allSplitsFinished()) {
if (checkpointIdToFinish == null
&& !isAssigningFinished(assignerStatus)
&& allSplitsFinished()) {
checkpointIdToFinish = checkpointId;
}
return state;
@ -283,51 +289,43 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
public void notifyCheckpointComplete(long checkpointId) {
// we have waited for at-least one complete checkpoint after all snapshot-splits are
// finished, then we can mark snapshot assigner as finished.
if (checkpointIdToFinish != null && !isFinished() && allSplitsFinished()) {
if (checkpointIdToFinish != null
&& !isAssigningFinished(assignerStatus)
&& allSplitsFinished()) {
if (checkpointId >= checkpointIdToFinish) {
assignerState = assignerState.nextState();
assignerStatus = assignerStatus.onFinish();
}
LOG.info("Snapshot split assigner is turn into finished status.");
}
}
@Override
public void close() {}
public AssignerStatus getAssignerStatus() {
return assignerStatus;
}
@Override
public boolean isAssignerSuspended() {
return assignerState == SnapshotAssignerStatus.SUSPENDED;
public void suspend() {
Preconditions.checkState(
isAssigningFinished(assignerStatus), "Invalid assigner status {}", assignerStatus);
assignerStatus = assignerStatus.suspend();
}
@Override
public void wakeup() {
Preconditions.checkState(assignerState == SnapshotAssignerStatus.SUSPENDED);
assignerState = assignerState.nextState();
}
public SnapshotAssignerStatus getAssignerState() {
return assignerState;
Preconditions.checkState(
isSuspended(assignerStatus), "Invalid assigner status {}", assignerStatus);
assignerStatus = assignerStatus.wakeup();
}
@Override
public int getTotalFinishedSplitSize() {
return splitFinishedOffsets.size();
}
public void close() {}
/** Indicates there is no more splits available in this assigner. */
public boolean noMoreSplits() {
return remainingTables.isEmpty() && remainingSplits.isEmpty();
}
/**
* Returns whether the snapshot split assigner is finished, which indicates there is no more
* splits and all records of splits have been completely processed in the pipeline.
*/
public boolean isFinished() {
return assignerState == SnapshotAssignerStatus.INIT_FINISH
|| assignerState == SnapshotAssignerStatus.RESUMED_FINISH;
}
public Map<String, MySqlSnapshotSplit> getAssignedSplits() {
return assignedSplits;
}

@ -101,17 +101,21 @@ public interface MySqlSplitAssigner {
*/
void notifyCheckpointComplete(long checkpointId);
/** Gets the split assigner status, see {@code AssignerStatus}. */
AssignerStatus getAssignerStatus();
/**
* Suspends the assigner under {@link AssignerStatus#INITIAL_ASSIGNING_FINISHED} or {@link
* AssignerStatus#NEWLY_ADDED_ASSIGNING_FINISHED}.
*/
void suspend();
/** Wakes up the assigner under {@link AssignerStatus#SUSPENDED}. */
void wakeup();
/**
* Called to close the assigner, in case it holds on to any resources, like threads or network
* connections.
*/
void close();
/** Whether the assigner is suspended. */
boolean isAssignerSuspended();
/** Get the total finished split count. */
int getTotalFinishedSplitSize();
void wakeup();
}

@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.assigners;
import org.apache.flink.util.Preconditions;
/** The snapshot assigner state machine. */
public enum SnapshotAssignerStatus {
/**
* The assigner state machine goes this way.
*
* <pre>
* INIT -> INIT_FINISH -> SUSPENDED -> RESUMED -> RESUMED_FINISH
* ^ |
* |_________________________|
* </pre>
*/
INIT {
@Override
public int getValue() {
return 0;
}
@Override
public SnapshotAssignerStatus nextState() {
return INIT_FINISH;
}
},
INIT_FINISH {
@Override
public int getValue() {
return 1;
}
@Override
public SnapshotAssignerStatus nextState() {
return SUSPENDED;
}
},
SUSPENDED {
@Override
public int getValue() {
return 2;
}
@Override
public SnapshotAssignerStatus nextState() {
return RESUMED;
}
},
RESUMED {
@Override
public int getValue() {
return 3;
}
@Override
public SnapshotAssignerStatus nextState() {
return RESUMED_FINISH;
}
},
RESUMED_FINISH {
@Override
public int getValue() {
return 4;
}
@Override
public SnapshotAssignerStatus nextState() {
return SUSPENDED;
}
};
public abstract int getValue();
public abstract SnapshotAssignerStatus nextState();
public static SnapshotAssignerStatus fromInteger(int x) {
Preconditions.checkState(x >= 0 && x < 5);
switch (x) {
case 0:
return INIT;
case 1:
return INIT_FINISH;
case 2:
return SUSPENDED;
case 3:
return RESUMED;
case 4:
return RESUMED_FINISH;
}
return null;
}
}

@ -22,7 +22,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.ververica.cdc.connectors.mysql.source.assigners.SnapshotAssignerStatus;
import com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
@ -152,7 +152,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
writeMySqlSplits(state.getRemainingSplits(), out);
writeAssignedSnapshotSplits(state.getAssignedSplits(), out);
writeFinishedOffsets(state.getSplitFinishedOffsets(), out);
out.writeInt(state.getAssignerState().getValue());
out.writeInt(state.getSnapshotAssignerStatus().getStatusCode());
writeTableIds(state.getRemainingTables(), out);
out.writeBoolean(state.isTableIdCaseSensitive());
}
@ -179,12 +179,12 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
readAssignedSnapshotSplits(splitVersion, in);
Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(splitVersion, in);
SnapshotAssignerStatus assignerState;
AssignerStatus assignerStatus;
boolean isAssignerFinished = in.readBoolean();
if (isAssignerFinished) {
assignerState = SnapshotAssignerStatus.INIT_FINISH;
assignerStatus = AssignerStatus.INITIAL_ASSIGNING_FINISHED;
} else {
assignerState = SnapshotAssignerStatus.INIT;
assignerStatus = AssignerStatus.INITIAL_ASSIGNING;
}
return new SnapshotPendingSplitsState(
@ -192,7 +192,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
remainingSplits,
assignedSnapshotSplits,
finishedOffsets,
assignerState,
assignerStatus,
new ArrayList<>(),
false,
false);
@ -213,16 +213,16 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
readAssignedSnapshotSplits(splitVersion, in);
Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(splitVersion, in);
SnapshotAssignerStatus assignerState;
AssignerStatus assignerStatus;
if (splitVersion < 4) {
boolean isAssignerFinished = in.readBoolean();
if (isAssignerFinished) {
assignerState = SnapshotAssignerStatus.INIT_FINISH;
assignerStatus = AssignerStatus.INITIAL_ASSIGNING_FINISHED;
} else {
assignerState = SnapshotAssignerStatus.INIT;
assignerStatus = AssignerStatus.INITIAL_ASSIGNING;
}
} else {
assignerState = SnapshotAssignerStatus.fromInteger(in.readInt());
assignerStatus = AssignerStatus.fromStatusCode(in.readInt());
}
List<TableId> remainingTableIds = readTableIds(in);
boolean isTableIdCaseSensitive = in.readBoolean();
@ -231,7 +231,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
remainingSplits,
assignedSnapshotSplits,
finishedOffsets,
assignerState,
assignerStatus,
remainingTableIds,
isTableIdCaseSensitive,
true);

@ -18,7 +18,7 @@
package com.ververica.cdc.connectors.mysql.source.assigners.state;
import com.ververica.cdc.connectors.mysql.source.assigners.SnapshotAssignerStatus;
import com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
@ -56,13 +56,10 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
*/
private final Map<String, BinlogOffset> splitFinishedOffsets;
/**
* Whether the snapshot split assigner is finished, which indicates there is no more splits and
* all records of splits have been completely processed in the pipeline.
*/
private final SnapshotAssignerStatus assignerState;
/** The {@link AssignerStatus} that indicates the snapshot assigner status. */
private final AssignerStatus assignerStatus;
/** Whether the table identifier is case sensitive. */
/** Whether the table identifier is case-sensitive. */
private final boolean isTableIdCaseSensitive;
/** Whether the remaining tables are keep when snapshot state. */
@ -73,7 +70,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
List<MySqlSnapshotSplit> remainingSplits,
Map<String, MySqlSnapshotSplit> assignedSplits,
Map<String, BinlogOffset> splitFinishedOffsets,
SnapshotAssignerStatus assignerState,
AssignerStatus assignerStatus,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed) {
@ -81,7 +78,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerState = assignerState;
this.assignerStatus = assignerStatus;
this.remainingTables = remainingTables;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
@ -103,8 +100,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
return splitFinishedOffsets;
}
public SnapshotAssignerStatus getAssignerState() {
return assignerState;
public AssignerStatus getSnapshotAssignerStatus() {
return assignerStatus;
}
public List<TableId> getRemainingTables() {
@ -128,7 +125,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
return false;
}
SnapshotPendingSplitsState that = (SnapshotPendingSplitsState) o;
return assignerState == that.assignerState
return assignerStatus == that.assignerStatus
&& isTableIdCaseSensitive == that.isTableIdCaseSensitive
&& isRemainingTablesCheckpointed == that.isRemainingTablesCheckpointed
&& Objects.equals(remainingTables, that.remainingTables)
@ -146,7 +143,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
remainingSplits,
assignedSplits,
splitFinishedOffsets,
assignerState,
assignerStatus,
isTableIdCaseSensitive,
isRemainingTablesCheckpointed);
}
@ -164,8 +161,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
+ assignedSplits
+ ", splitFinishedOffsets="
+ splitFinishedOffsets
+ ", assignerState="
+ assignerState
+ ", assignerStatus="
+ assignerStatus
+ ", isTableIdCaseSensitive="
+ isTableIdCaseSensitive
+ ", isRemainingTablesCheckpointed="

@ -55,7 +55,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
private final boolean captureNewTables;
private final boolean scanNewlyAddedTableEnabled;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@ -83,7 +83,7 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
boolean captureNewTable,
boolean scanNewlyAddedTableEnabled,
Properties dbzProperties) {
this.hostname = checkNotNull(hostname);
this.port = port;
@ -103,7 +103,7 @@ public class MySqlSourceConfig implements Serializable {
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.captureNewTables = captureNewTable;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
@ -182,8 +182,8 @@ public class MySqlSourceConfig implements Serializable {
return includeSchemaChanges;
}
public boolean isCaptureNewTables() {
return captureNewTables;
public boolean isScanNewlyAddedTableEnabled() {
return scanNewlyAddedTableEnabled;
}
public Properties getDbzProperties() {

@ -68,7 +68,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private double distributionFactorLower =
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean captureNewTables = false;
private boolean scanNewlyAddedTableEnabled = false;
private Properties dbzProperties;
public MySqlSourceConfigFactory hostname(String hostname) {
@ -208,9 +208,9 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
/** Whether the {@link MySqlSource} should capture the newly added tables or not. */
public MySqlSourceConfigFactory captureNewTables(boolean captureNewTables) {
this.captureNewTables = captureNewTables;
/** Whether the {@link MySqlSource} should scan the newly added tables or not. */
public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) {
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
return this;
}
@ -310,7 +310,7 @@ public class MySqlSourceConfigFactory implements Serializable {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
captureNewTables,
scanNewlyAddedTableEnabled,
props);
}
}

@ -200,9 +200,10 @@ public class MySqlSourceOptions {
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
@Experimental
public static final ConfigOption<Boolean> CAPTURE_NEW_TABLES =
ConfigOptions.key("capture-new-tables")
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether capture the snapshot of newly add tables.");
.withDescription(
"Whether capture the scan the newly added tables or not, by default is false.");
}

@ -23,7 +23,6 @@ import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@ -33,13 +32,13 @@ import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsSt
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitReaderSuspendedReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent;
import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeResponseEvent;
import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
@ -57,6 +56,9 @@ import java.util.Optional;
import java.util.TreeSet;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isAssigningFinished;
import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended;
/**
* A MySQL CDC source enumerator that enumerates receive the split request and assign the split to
* source readers.
@ -73,6 +75,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
// using TreeSet to prefer assigning binlog split to task-0 for easier debug
private final TreeSet<Integer> readersAwaitingSplit;
private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta;
private boolean binlogReaderIsSuspended = false;
public MySqlSourceEnumerator(
SplitEnumeratorContext<MySqlSplit> context,
@ -87,6 +90,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
@Override
public void start() {
splitAssigner.open();
suspendBinlogReaderIfNeed();
this.context.callAsync(
this::getRegisteredReader,
this::syncWithReaders,
@ -126,7 +130,11 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
FinishedSnapshotSplitsReportEvent reportEvent =
(FinishedSnapshotSplitsReportEvent) sourceEvent;
Map<String, BinlogOffset> finishedOffsets = reportEvent.getFinishedOffsets();
splitAssigner.onFinishedSplits(finishedOffsets);
wakeupBinlogReaderIfNeed();
// send acknowledge event
FinishedSnapshotSplitsAckEvent ackEvent =
new FinishedSnapshotSplitsAckEvent(new ArrayList<>(finishedOffsets.keySet()));
@ -136,15 +144,13 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
"The enumerator receives request for binlog split meta from subtask {}.",
subtaskId);
sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent);
} else if (sourceEvent instanceof BinlogSplitReaderSuspendedReportEvent) {
LOG.debug(
"The enumerator receives the binlog split reader suspended from subtask {}. ",
} else if (sourceEvent instanceof SuspendBinlogReaderAckEvent) {
LOG.info(
"The enumerator receives event that the binlog split reader has been suspended from subtask {}. ",
subtaskId);
handleBinlogSplitReaderSuspended(
subtaskId, (BinlogSplitReaderSuspendedReportEvent) sourceEvent);
} else if (sourceEvent instanceof TotalFinishedSplitSizeRequestEvent) {
handleTotalFinishedSplitSizeRequest(
subtaskId, (TotalFinishedSplitSizeRequestEvent) sourceEvent);
handleSuspendBinlogReaderAckEvent(subtaskId);
} else if (sourceEvent instanceof LatestFinishedSplitsSizeRequestEvent) {
handleLatestFinishedSplitSizeRequest(subtaskId);
}
}
@ -212,19 +218,28 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
subtaskId, new FinishedSnapshotSplitsRequestEvent());
}
}
if (splitAssigner.isAssignerSuspended()) {
for (int subtaskId : subtaskIds) {
suspendBinlogReaderIfNeed();
wakeupBinlogReaderIfNeed();
}
private void suspendBinlogReaderIfNeed() {
if (isSuspended(splitAssigner.getAssignerStatus())) {
for (int subtaskId : getRegisteredReader()) {
context.sendEventToSourceReader(subtaskId, new SuspendBinlogReaderEvent());
}
binlogReaderIsSuspended = true;
}
}
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
if (((MySqlHybridSplitAssigner) splitAssigner).isShouldWakeupBinlogSplitReader()) {
for (int subtaskId : subtaskIds) {
context.sendEventToSourceReader(
subtaskId, new WakeupReaderEvent(WakeupReaderEvent.WakeUpType.BINLOG));
}
private void wakeupBinlogReaderIfNeed() {
if (isAssigningFinished(splitAssigner.getAssignerStatus()) && binlogReaderIsSuspended) {
for (int subtaskId : getRegisteredReader()) {
context.sendEventToSourceReader(
subtaskId,
new WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.BINLOG_READER));
}
binlogReaderIsSuspended = false;
}
}
@ -263,28 +278,26 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
}
}
private void handleBinlogSplitReaderSuspended(
int subTask, BinlogSplitReaderSuspendedReportEvent responseEvent) {
private void handleSuspendBinlogReaderAckEvent(int subTask) {
LOG.info(
"The enumerator receives response for suspend binlog split from subtask {}. ",
"Received event that the binlog split reader has been suspended from subtask {}. ",
subTask);
splitAssigner.wakeup();
for (int subtaskId : getRegisteredReader()) {
context.sendEventToSourceReader(
subtaskId, new WakeupReaderEvent(WakeupReaderEvent.WakeUpType.SNAPSHOT));
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
for (int subtaskId : this.getRegisteredReader()) {
context.sendEventToSourceReader(
subtaskId,
new WakeupReaderEvent(WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER));
}
}
}
private void handleTotalFinishedSplitSizeRequest(
int subTask, TotalFinishedSplitSizeRequestEvent event) {
Preconditions.checkState(!splitAssigner.isAssignerSuspended());
Preconditions.checkState(!splitAssigner.waitingForFinishedSplits());
private void handleLatestFinishedSplitSizeRequest(int subTask) {
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
context.sendEventToSourceReader(
subTask,
new TotalFinishedSplitSizeResponseEvent(
splitAssigner.getTotalFinishedSplitSize()));
((MySqlHybridSplitAssigner) splitAssigner).setShouldWakeupBinlogSplitReader(false);
new LatestFinishedSplitsSizeEvent(
splitAssigner.getFinishedSplitInfos().size()));
}
}
}

@ -24,26 +24,27 @@ import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerato
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
* ask if the offset has been propagated to Enumerator.
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* pass the latest finished snapshot splits size.
*/
public class TotalFinishedSplitSizeResponseEvent implements SourceEvent {
public class LatestFinishedSplitsSizeEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
private final int totalFinishedSplitSize;
private final int latestFinishedSplitsSize;
public TotalFinishedSplitSizeResponseEvent(final int totalFinishedSplitSize) {
this.totalFinishedSplitSize = totalFinishedSplitSize;
public LatestFinishedSplitsSizeEvent(int latestFinishedSplitsSize) {
this.latestFinishedSplitsSize = latestFinishedSplitsSize;
}
public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
public int getLatestFinishedSplitsSize() {
return latestFinishedSplitsSize;
}
@Override
public String toString() {
return "TotalFinishedSplitSizeResponseEvent{"
+ "totalFinishedSplitSize="
+ totalFinishedSplitSize
return "LatestFinishedSplitsSizeEvent{"
+ "latestFinishedSplitsSize="
+ latestFinishedSplitsSize
+ '}';
}
}

@ -24,11 +24,12 @@ import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerato
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* notify the source reader to the offset has been propagated to Enumerator.
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
* ask the latest finished snapshot splits size.
*/
public class TotalFinishedSplitSizeRequestEvent implements SourceEvent {
public class LatestFinishedSplitsSizeRequestEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
public TotalFinishedSplitSizeRequestEvent() {}
public LatestFinishedSplitsSizeRequestEvent() {}
}

@ -27,8 +27,9 @@ import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
* notify the binlog split reader has been suspended.
*/
public class BinlogSplitReaderSuspendedReportEvent implements SourceEvent {
public class SuspendBinlogReaderAckEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
public BinlogSplitReaderSuspendedReportEvent() {}
public SuspendBinlogReaderAckEvent() {}
}

@ -24,10 +24,11 @@ import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerato
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* tell the source reader to suspend.
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} broadcasts to {@link
* MySqlSourceReader} to tell the source reader to suspend the binlog reader.
*/
public class SuspendBinlogReaderEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
public SuspendBinlogReaderEvent() {}

@ -25,24 +25,24 @@ import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* wake up source reader to request split again.
* wake up source reader to consume split again.
*/
public class WakeupReaderEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
/** Wake up type. */
public enum WakeUpType {
SNAPSHOT,
BINLOG
/** Wake up target. */
public enum WakeUpTarget {
SNAPSHOT_READER,
BINLOG_READER
}
private WakeUpType type;
private WakeUpTarget target;
public WakeupReaderEvent(WakeUpType type) {
this.type = type;
public WakeupReaderEvent(WakeUpTarget target) {
this.target = target;
}
public WakeUpType getType() {
return type;
public WakeUpTarget getTarget() {
return target;
}
}

@ -26,19 +26,18 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitReaderSuspendedReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeEvent;
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent;
import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeResponseEvent;
import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
@ -66,6 +65,9 @@ import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER;
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
/** The source reader for MySQL source splits. */
@ -80,7 +82,7 @@ public class MySqlSourceReader<T>
private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
private final int subtaskId;
private final MySqlSourceReaderContext mySqlSourceReaderContext;
private MySqlBinlogSplit suspendedSplit;
private MySqlBinlogSplit suspendedBinlogSplit;
public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
@ -100,7 +102,7 @@ public class MySqlSourceReader<T>
this.uncompletedBinlogSplits = new HashMap<>();
this.subtaskId = context.getSourceReaderContext().getIndexOfSubtask();
this.mySqlSourceReaderContext = context;
this.suspendedSplit = null;
this.suspendedBinlogSplit = null;
}
@Override
@ -121,19 +123,24 @@ public class MySqlSourceReader<T>
@Override
public List<MySqlSplit> snapshotState(long checkpointId) {
// unfinished splits
List<MySqlSplit> stateSplits = super.snapshotState(checkpointId);
// unfinished splits
List<MySqlSplit> unfinishedSplits =
stateSplits.stream()
.filter(split -> !finishedUnackedSplits.containsKey(split.splitId()))
.collect(Collectors.toList());
// add finished snapshot splits that didn't receive ack yet
stateSplits.addAll(finishedUnackedSplits.values());
unfinishedSplits.addAll(finishedUnackedSplits.values());
// add binlog splits who are uncompleted
stateSplits.addAll(uncompletedBinlogSplits.values());
unfinishedSplits.addAll(uncompletedBinlogSplits.values());
if (suspendedSplit != null) {
stateSplits.add(suspendedSplit);
// add suspended BinlogSplit
if (suspendedBinlogSplit != null) {
unfinishedSplits.add(suspendedBinlogSplit);
}
return stateSplits;
}
@ -143,16 +150,12 @@ public class MySqlSourceReader<T>
MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
if (mySqlSplit.isBinlogSplit()) {
LOG.info(
"binlog split finished due to new table added, offset {}",
"binlog split reader suspended due to newly added table, offset {}",
mySqlSplitState.asBinlogSplitState().getStartingOffset());
context.sendSourceEventToCoordinator(new BinlogSplitReaderSuspendedReportEvent());
suspendedSplit =
MySqlBinlogSplit.updateIsSuspended(mySqlSplit.asBinlogSplit(), true);
suspendedSplit = MySqlBinlogSplit.emptyFinishedSplitInfos(suspendedSplit);
suspendedSplit = MySqlBinlogSplit.emptyTableSchemas(suspendedSplit);
mySqlSourceReaderContext.setShouldBinlogSplitReaderStopped(false);
mySqlSourceReaderContext.resetStopBinlogSplitReader();
suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit());
context.sendSourceEventToCoordinator(new SuspendBinlogReaderAckEvent());
} else {
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
}
@ -166,7 +169,7 @@ public class MySqlSourceReader<T>
// restore for finishedUnackedSplits
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
LOG.info("Received Split: " + split);
LOG.info("Add Split: " + split);
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
@ -176,9 +179,9 @@ public class MySqlSourceReader<T>
}
} else {
MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
// the binlog split is uncompleted
// the binlog split is suspended
if (binlogSplit.isSuspended()) {
suspendedSplit = binlogSplit;
suspendedBinlogSplit = binlogSplit;
} else if (!binlogSplit.isCompletedSplit()) {
uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
@ -241,34 +244,26 @@ public class MySqlSourceReader<T>
((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
} else if (sourceEvent instanceof SuspendBinlogReaderEvent) {
if (!mySqlSourceReaderContext.isShouldBinlogSplitReaderStopped()) {
mySqlSourceReaderContext.setShouldBinlogSplitReaderStopped(true);
}
mySqlSourceReaderContext.setStopBinlogSplitReader();
} else if (sourceEvent instanceof WakeupReaderEvent) {
WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent) sourceEvent;
if (wakeupReaderEvent.getType() == WakeupReaderEvent.WakeUpType.SNAPSHOT) {
if (wakeupReaderEvent.getTarget() == SNAPSHOT_READER) {
context.sendSplitRequest();
} else {
if (suspendedSplit == null) {
return;
if (suspendedBinlogSplit != null) {
context.sendSourceEventToCoordinator(
new LatestFinishedSplitsSizeRequestEvent());
}
context.sendSourceEventToCoordinator(new TotalFinishedSplitSizeRequestEvent());
}
} else if (sourceEvent instanceof TotalFinishedSplitSizeResponseEvent) {
if (suspendedSplit == null) {
return;
} else if (sourceEvent instanceof LatestFinishedSplitsSizeEvent) {
if (suspendedBinlogSplit != null) {
final int finishedSplitsSize =
((LatestFinishedSplitsSizeEvent) sourceEvent).getLatestFinishedSplitsSize();
final MySqlBinlogSplit binlogSplit =
toNormalBinlogSplit(suspendedBinlogSplit, finishedSplitsSize);
suspendedBinlogSplit = null;
this.addSplits(Collections.singletonList(binlogSplit));
}
Preconditions.checkState(suspendedSplit.isSuspended());
MySqlBinlogSplit split =
MySqlBinlogSplit.updateTotalFinishedSplitSize(
suspendedSplit,
((TotalFinishedSplitSizeResponseEvent) sourceEvent)
.getTotalFinishedSplitSize());
split = MySqlBinlogSplit.updateIsSuspended(split, false);
// Can flink make sure the following two sentense happend in the same checkpoint?
suspendedSplit = null;
this.addSplits(Collections.singletonList(split));
} else {
super.handleSourceEvents(sourceEvent);
}

@ -21,31 +21,32 @@ package com.ververica.cdc.connectors.mysql.source.reader;
import org.apache.flink.api.connector.source.SourceReaderContext;
/**
* A wrapper class that wraps SourceReaderContext and some data shared by {@link MySqlSourceReader}
* and {@link MySqlSplitReader}.
* A wrapper class that wraps {@link SourceReaderContext} for sharing message between {@link
* MySqlSourceReader} and {@link MySqlSplitReader}.
*/
public class MySqlSourceReaderContext {
SourceReaderContext sourceReaderContext;
boolean shouldBinlogSplitReaderStopped;
private final SourceReaderContext sourceReaderContext;
private volatile boolean stopBinlogSplitReader;
public MySqlSourceReaderContext(final SourceReaderContext sourceReaderContext) {
this.sourceReaderContext = sourceReaderContext;
this.shouldBinlogSplitReaderStopped = false;
this.stopBinlogSplitReader = false;
}
public SourceReaderContext getSourceReaderContext() {
return sourceReaderContext;
}
public void setSourceReaderContext(final SourceReaderContext sourceReaderContext) {
this.sourceReaderContext = sourceReaderContext;
public boolean needStopBinlogSplitReader() {
return stopBinlogSplitReader;
}
public boolean isShouldBinlogSplitReaderStopped() {
return shouldBinlogSplitReaderStopped;
public void setStopBinlogSplitReader() {
this.stopBinlogSplitReader = true;
}
public void setShouldBinlogSplitReaderStopped(final boolean shouldBinlogSplitReaderStopped) {
this.shouldBinlogSplitReaderStopped = shouldBinlogSplitReaderStopped;
public void resetStopBinlogSplitReader() {
this.stopBinlogSplitReader = false;
}
}

@ -69,15 +69,13 @@ public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
@Override
public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
checkSplitOrStartNext();
checkNeedStopBinlogReader();
Iterator<SourceRecord> dataIt = null;
Iterator<SourceRecord> dataIt;
try {
dataIt = currentReader.pollSplitRecords();
if (context.isShouldBinlogSplitReaderStopped()
&& currentReader instanceof BinlogSplitReader) {
((BinlogSplitReader) currentReader).finishBinlogSplit();
}
} catch (InterruptedException e) {
LOG.warn("fetch data failed.", e);
throw new IOException(e);
@ -87,6 +85,14 @@ public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
: MySqlRecords.forRecords(currentSplitId, dataIt);
}
private void checkNeedStopBinlogReader() {
if (currentReader instanceof BinlogSplitReader
&& context.needStopBinlogSplitReader()
&& !currentReader.isFinished()) {
((BinlogSplitReader) currentReader).stopBinlogReadTask();
}
}
@Override
public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
if (!(splitsChanges instanceof SplitsAddition)) {
@ -125,7 +131,8 @@ public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
if (nextSplit.isSnapshotSplit()) {
if (currentReader instanceof BinlogSplitReader) {
LOG.info("This is the point from binlog split back to snapshot split");
LOG.info(
"This is the point from binlog split reading change to snapshot split reading");
currentReader.close();
currentReader = null;
}

@ -166,28 +166,6 @@ public class MySqlBinlogSplit extends MySqlSplit {
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit emptyFinishedSplitInfos(MySqlBinlogSplit binlogSplit) {
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
new ArrayList<>(),
binlogSplit.getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize(),
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit emptyTableSchemas(MySqlBinlogSplit binlogSplit) {
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
binlogSplit.getFinishedSnapshotSplitInfos(),
new HashMap<>(),
binlogSplit.getTotalFinishedSplitSize(),
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit fillTableSchemas(
MySqlBinlogSplit binlogSplit, Map<TableId, TableChange> tableSchemas) {
tableSchemas.putAll(binlogSplit.getTableSchemas());
@ -201,27 +179,26 @@ public class MySqlBinlogSplit extends MySqlSplit {
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit updateTotalFinishedSplitSize(
MySqlBinlogSplit binlogSplit, int totalFinishedSplitSize) {
public static MySqlBinlogSplit toNormalBinlogSplit(
MySqlBinlogSplit suspendedBinlogSplit, int totalFinishedSplitSize) {
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
binlogSplit.getFinishedSnapshotSplitInfos(),
binlogSplit.getTableSchemas(),
suspendedBinlogSplit.splitId,
suspendedBinlogSplit.getStartingOffset(),
suspendedBinlogSplit.getEndingOffset(),
suspendedBinlogSplit.getFinishedSnapshotSplitInfos(),
suspendedBinlogSplit.getTableSchemas(),
totalFinishedSplitSize,
binlogSplit.isSuspended());
false);
}
public static MySqlBinlogSplit updateIsSuspended(
MySqlBinlogSplit binlogSplit, boolean isSuspended) {
public static MySqlBinlogSplit toSuspendedBinlogSplit(MySqlBinlogSplit normalBinlogSplit) {
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
binlogSplit.getFinishedSnapshotSplitInfos(),
binlogSplit.getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize(),
isSuspended);
normalBinlogSplit.splitId,
normalBinlogSplit.getStartingOffset(),
normalBinlogSplit.getEndingOffset(),
new ArrayList<>(),
new HashMap<>(),
normalBinlogSplit.getTotalFinishedSplitSize(),
true);
}
}

@ -78,7 +78,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final StartupOptions startupOptions;
private final boolean captureNewTables;
private final boolean scanNewlyAddedTableEnabled;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -156,7 +156,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
double distributionFactorUpper,
double distributionFactorLower,
StartupOptions startupOptions,
boolean captureNewTable) {
boolean scanNewlyAddedTableEnabled) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@ -177,7 +177,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.startupOptions = startupOptions;
this.captureNewTables = captureNewTable;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
@ -232,7 +232,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer)
.captureNewTables(captureNewTables)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.build();
return SourceProvider.of(parallelSource);
} else {
@ -308,7 +308,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
startupOptions);
startupOptions,
scanNewlyAddedTableEnabled);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@ -330,6 +331,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
&& fetchSize == that.fetchSize
&& distributionFactorUpper == that.distributionFactorUpper
&& distributionFactorLower == that.distributionFactorLower
&& scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled
&& Objects.equals(physicalSchema, that.physicalSchema)
&& Objects.equals(hostname, that.hostname)
&& Objects.equals(database, that.database)
@ -371,7 +373,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
distributionFactorLower,
startupOptions,
producedDataType,
metadataKeys);
metadataKeys,
scanNewlyAddedTableEnabled);
}
@Override

@ -38,7 +38,6 @@ import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CAPTURE_NEW_TABLES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
@ -49,6 +48,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_MODE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
@ -98,7 +98,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean captureNewTables = config.get(CAPTURE_NEW_TABLES);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
if (enableParallelRead) {
@ -134,7 +134,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
distributionFactorUpper,
distributionFactorLower,
startupOptions,
captureNewTables);
scanNewlyAddedTableEnabled);
}
@Override
@ -172,7 +172,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(CONNECT_MAX_RETRIES);
options.add(CAPTURE_NEW_TABLES);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
return options;
}

@ -105,7 +105,7 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
remainingSplits,
assignedSplits,
splitFinishedOffsets,
SnapshotAssignerStatus.INIT_FINISH,
AssignerStatus.INITIAL_ASSIGNING_FINISHED,
new ArrayList<>(),
false,
true);

@ -21,7 +21,7 @@ package com.ververica.cdc.connectors.mysql.source.assigners.state;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.mysql.source.assigners.SnapshotAssignerStatus;
import com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
@ -125,7 +125,7 @@ public class PendingSplitsStateSerializerTest {
remainingSplits,
assignedSnapshotSplits,
finishedOffsets,
SnapshotAssignerStatus.INIT,
AssignerStatus.INITIAL_ASSIGNING,
remainingTables,
false,
true);

Loading…
Cancel
Save