[mysql] Support capture newly added tables for existed pipeline

pull/846/head
元组 3 years ago committed by Leonard Xu
parent df295d80b5
commit c21aa91e5d

@ -84,7 +84,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = false;
this.currentTaskRunning = true;
}
public void submitSplit(MySqlSplit mySqlSplit) {
@ -112,7 +112,6 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
executor.submit(
() -> {
try {
currentTaskRunning = true;
binlogSplitReadTask.execute(new BinlogSplitChangeEventSourceContextImpl());
} catch (Exception e) {
currentTaskRunning = false;
@ -151,8 +150,10 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
sourceRecords.add(event.getRecord());
}
}
return sourceRecords.iterator();
} else {
return null;
}
return sourceRecords.iterator();
}
private void checkReadException() {
@ -272,4 +273,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
}
public void finishBinlogSplit() {
this.currentTaskRunning = false;
}
}

@ -48,6 +48,7 @@ import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerato
import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
@ -137,8 +138,14 @@ public class MySqlSource<T>
final MySqlSourceReaderMetrics sourceReaderMetrics =
new MySqlSourceReaderMetrics(readerContext.metricGroup());
sourceReaderMetrics.registerMetrics();
MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext);
Supplier<MySqlSplitReader> splitReaderSupplier =
() -> new MySqlSplitReader(sourceConfig, readerContext.getIndexOfSubtask());
() ->
new MySqlSplitReader(
sourceConfig,
readerContext.getIndexOfSubtask(),
mySqlSourceReaderContext);
return new MySqlSourceReader<>(
elementsQueue,
splitReaderSupplier,
@ -147,7 +154,7 @@ public class MySqlSource<T>
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges()),
readerContext.getConfiguration(),
readerContext,
mySqlSourceReaderContext,
sourceConfig);
}

@ -190,6 +190,12 @@ public class MySqlSourceBuilder<T> {
return this;
}
/** Whether the {@link MySqlSource} should capture the newly added tables or not. */
public MySqlSourceBuilder<T> captureNewTables(boolean captureNewTables) {
this.configFactory.captureNewTables(captureNewTables);
return this;
}
/** Specifies the startup options. */
public MySqlSourceBuilder<T> startupOptions(StartupOptions startupOptions) {
this.configFactory.startupOptions(startupOptions);

@ -114,6 +114,19 @@ public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner {
@Override
public void close() {}
@Override
public boolean isAssignerSuspended() {
return false;
}
@Override
public int getTotalFinishedSplitSize() {
return 0;
}
@Override
public void wakeup() {}
// ------------------------------------------------------------------------------------------
private MySqlBinlogSplit createBinlogSplit() {

@ -51,6 +51,7 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
private final int splitMetaGroupSize;
private boolean isBinlogSplitAssigned;
private boolean shouldWakeupBinlogSplitReader = false;
private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;
@ -93,17 +94,28 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
@Override
public Optional<MySqlSplit> getNext() {
if (snapshotSplitAssigner.isAssignerSuspended()) {
// do not assign split until Assigner receive SuspendBinlogSplitReaderResponseEvent
return Optional.empty();
}
if (snapshotSplitAssigner.noMoreSplits()) {
// binlog split assigning
if (isBinlogSplitAssigned) {
// no more splits for the assigner
return Optional.empty();
} else if (snapshotSplitAssigner.isFinished()) {
} else if (snapshotSplitAssigner.getAssignerState()
== SnapshotAssignerStatus.INIT_FINISH) {
// we need to wait snapshot-assigner to be finished before
// assigning the binlog split. Otherwise, records emitted from binlog split
// might be out-of-order in terms of same primary key with snapshot splits.
isBinlogSplitAssigned = true;
return Optional.of(createBinlogSplit());
} else if (snapshotSplitAssigner.getAssignerState()
== SnapshotAssignerStatus.RESUMED_FINISH) {
// do not need to create binlog, but send event to wake up the binlog
isBinlogSplitAssigned = true;
shouldWakeupBinlogSplitReader = true;
return Optional.empty();
} else {
// binlog split is not ready by now
return Optional.empty();
@ -159,6 +171,30 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
snapshotSplitAssigner.close();
}
@Override
public boolean isAssignerSuspended() {
return snapshotSplitAssigner.isAssignerSuspended();
}
@Override
public void wakeup() {
isBinlogSplitAssigned = false;
snapshotSplitAssigner.wakeup();
}
public boolean isShouldWakeupBinlogSplitReader() {
return shouldWakeupBinlogSplitReader;
}
public void setShouldWakeupBinlogSplitReader(final boolean shouldWakeupBinlogSplitReader) {
this.shouldWakeupBinlogSplitReader = shouldWakeupBinlogSplitReader;
}
@Override
public int getTotalFinishedSplitSize() {
return snapshotSplitAssigner.getTotalFinishedSplitSize();
}
// --------------------------------------------------------------------------------------------
private MySqlBinlogSplit createBinlogSplit() {

@ -19,6 +19,7 @@
package com.ververica.cdc.connectors.mysql.source.assigners;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.schema.MySqlSchema;
@ -63,8 +64,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private final List<MySqlSnapshotSplit> remainingSplits;
private final Map<String, MySqlSnapshotSplit> assignedSplits;
private final Map<String, BinlogOffset> splitFinishedOffsets;
private boolean assignerFinished;
private SnapshotAssignerStatus assignerState;
private final MySqlSourceConfig sourceConfig;
private final int currentParallelism;
private final LinkedList<TableId> remainingTables;
@ -87,7 +87,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
new ArrayList<>(),
new HashMap<>(),
new HashMap<>(),
false,
SnapshotAssignerStatus.INIT,
remainingTables,
isTableIdCaseSensitive,
true);
@ -104,7 +104,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
checkpoint.getRemainingSplits(),
checkpoint.getAssignedSplits(),
checkpoint.getSplitFinishedOffsets(),
checkpoint.isAssignerFinished(),
checkpoint.getAssignerState(),
checkpoint.getRemainingTables(),
checkpoint.isTableIdCaseSensitive(),
checkpoint.isRemainingTablesCheckpointed());
@ -117,7 +117,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
List<MySqlSnapshotSplit> remainingSplits,
Map<String, MySqlSnapshotSplit> assignedSplits,
Map<String, BinlogOffset> splitFinishedOffsets,
boolean assignerFinished,
SnapshotAssignerStatus assignerState,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed) {
@ -127,7 +127,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerFinished = assignerFinished;
this.assignerState = assignerState;
this.remainingTables = new LinkedList<>(remainingTables);
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
@ -138,7 +138,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive);
// the legacy state didn't snapshot remaining tables, discovery remaining table here
if (!isRemainingTablesCheckpointed && !assignerFinished) {
if (!isRemainingTablesCheckpointed && !isFinished()) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> discoverTables = discoverCapturedTables(jdbc, sourceConfig);
discoverTables.removeAll(alreadyProcessedTables);
@ -149,6 +149,30 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
"Failed to discover remaining tables to capture", e);
}
}
// If the config do not need to capture newly added tables
if (!sourceConfig.isCaptureNewTables()) {
return;
}
// check whether we got newly added tables
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
final List<TableId> discoverTables = discoverCapturedTables(jdbc, sourceConfig);
discoverTables.removeAll(alreadyProcessedTables);
// got some newly added tables
if (!discoverTables.isEmpty()) {
if (isFinished()) {
Preconditions.checkState(remainingTables.isEmpty());
assignerState = assignerState.nextState();
} else {
// if job is in snapshot phase, directly add the new table to remaining table
this.remainingTables.removeAll(discoverTables);
}
remainingTables.addAll(discoverTables);
}
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to discover remaining tables to capture", e);
}
}
@Override
@ -213,7 +237,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerFinished = true;
assignerState = assignerState.nextState();
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
@ -243,13 +267,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
remainingSplits,
assignedSplits,
splitFinishedOffsets,
assignerFinished,
assignerState,
remainingTables,
isTableIdCaseSensitive,
true);
// we need a complete checkpoint before mark this assigner to be finished, to wait for all
// records of snapshot splits are completely processed
if (checkpointIdToFinish == null && !assignerFinished && allSplitsFinished()) {
if (checkpointIdToFinish == null && !isFinished() && allSplitsFinished()) {
checkpointIdToFinish = checkpointId;
}
return state;
@ -259,8 +283,10 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
public void notifyCheckpointComplete(long checkpointId) {
// we have waited for at-least one complete checkpoint after all snapshot-splits are
// finished, then we can mark snapshot assigner as finished.
if (checkpointIdToFinish != null && !assignerFinished && allSplitsFinished()) {
assignerFinished = checkpointId >= checkpointIdToFinish;
if (checkpointIdToFinish != null && !isFinished() && allSplitsFinished()) {
if (checkpointId >= checkpointIdToFinish) {
assignerState = assignerState.nextState();
}
LOG.info("Snapshot split assigner is turn into finished status.");
}
}
@ -268,6 +294,26 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
@Override
public void close() {}
@Override
public boolean isAssignerSuspended() {
return assignerState == SnapshotAssignerStatus.SUSPENDED;
}
@Override
public void wakeup() {
Preconditions.checkState(assignerState == SnapshotAssignerStatus.SUSPENDED);
assignerState = assignerState.nextState();
}
public SnapshotAssignerStatus getAssignerState() {
return assignerState;
}
@Override
public int getTotalFinishedSplitSize() {
return splitFinishedOffsets.size();
}
/** Indicates there is no more splits available in this assigner. */
public boolean noMoreSplits() {
return remainingTables.isEmpty() && remainingSplits.isEmpty();
@ -278,7 +324,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
* splits and all records of splits have been completely processed in the pipeline.
*/
public boolean isFinished() {
return assignerFinished;
return assignerState == SnapshotAssignerStatus.INIT_FINISH
|| assignerState == SnapshotAssignerStatus.RESUMED_FINISH;
}
public Map<String, MySqlSnapshotSplit> getAssignedSplits() {

@ -106,4 +106,12 @@ public interface MySqlSplitAssigner {
* connections.
*/
void close();
/** Whether the assigner is suspended. */
boolean isAssignerSuspended();
/** Get the total finished split count. */
int getTotalFinishedSplitSize();
void wakeup();
}

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.assigners;
import org.apache.flink.util.Preconditions;
/** The snapshot assigner state machine. */
public enum SnapshotAssignerStatus {
/**
* The assigner state machine goes this way.
*
* <pre>
* INIT -> INIT_FINISH -> SUSPENDED -> RESUMED -> RESUMED_FINISH
* ^ |
* |_________________________|
* </pre>
*/
INIT {
@Override
public int getValue() {
return 0;
}
@Override
public SnapshotAssignerStatus nextState() {
return INIT_FINISH;
}
},
INIT_FINISH {
@Override
public int getValue() {
return 1;
}
@Override
public SnapshotAssignerStatus nextState() {
return SUSPENDED;
}
},
SUSPENDED {
@Override
public int getValue() {
return 2;
}
@Override
public SnapshotAssignerStatus nextState() {
return RESUMED;
}
},
RESUMED {
@Override
public int getValue() {
return 3;
}
@Override
public SnapshotAssignerStatus nextState() {
return RESUMED_FINISH;
}
},
RESUMED_FINISH {
@Override
public int getValue() {
return 4;
}
@Override
public SnapshotAssignerStatus nextState() {
return SUSPENDED;
}
};
public abstract int getValue();
public abstract SnapshotAssignerStatus nextState();
public static SnapshotAssignerStatus fromInteger(int x) {
Preconditions.checkState(x >= 0 && x < 5);
switch (x) {
case 0:
return INIT;
case 1:
return INIT_FINISH;
case 2:
return SUSPENDED;
case 3:
return RESUMED;
case 4:
return RESUMED_FINISH;
}
return null;
}
}

@ -22,6 +22,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import com.ververica.cdc.connectors.mysql.source.assigners.SnapshotAssignerStatus;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
@ -101,6 +102,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
case 2:
return deserializeLegacyPendingSplitsState(serialized);
case 3:
case 4:
return deserializePendingSplitsState(serialized);
default:
throw new IOException("Unknown version: " + version);
@ -150,7 +152,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
writeMySqlSplits(state.getRemainingSplits(), out);
writeAssignedSnapshotSplits(state.getAssignedSplits(), out);
writeFinishedOffsets(state.getSplitFinishedOffsets(), out);
out.writeBoolean(state.isAssignerFinished());
out.writeInt(state.getAssignerState().getValue());
writeTableIds(state.getRemainingTables(), out);
out.writeBoolean(state.isTableIdCaseSensitive());
}
@ -177,13 +179,20 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
readAssignedSnapshotSplits(splitVersion, in);
Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(splitVersion, in);
SnapshotAssignerStatus assignerState;
boolean isAssignerFinished = in.readBoolean();
if (isAssignerFinished) {
assignerState = SnapshotAssignerStatus.INIT_FINISH;
} else {
assignerState = SnapshotAssignerStatus.INIT;
}
return new SnapshotPendingSplitsState(
alreadyProcessedTables,
remainingSplits,
assignedSnapshotSplits,
finishedOffsets,
isAssignerFinished,
assignerState,
new ArrayList<>(),
false,
false);
@ -204,7 +213,17 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
Map<String, MySqlSnapshotSplit> assignedSnapshotSplits =
readAssignedSnapshotSplits(splitVersion, in);
Map<String, BinlogOffset> finishedOffsets = readFinishedOffsets(splitVersion, in);
boolean isAssignerFinished = in.readBoolean();
SnapshotAssignerStatus assignerState;
if (splitVersion < 4) {
boolean isAssignerFinished = in.readBoolean();
if (isAssignerFinished) {
assignerState = SnapshotAssignerStatus.INIT_FINISH;
} else {
assignerState = SnapshotAssignerStatus.INIT;
}
} else {
assignerState = SnapshotAssignerStatus.fromInteger(in.readInt());
}
List<TableId> remainingTableIds = readTableIds(in);
boolean isTableIdCaseSensitive = in.readBoolean();
return new SnapshotPendingSplitsState(
@ -212,7 +231,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<P
remainingSplits,
assignedSnapshotSplits,
finishedOffsets,
isAssignerFinished,
assignerState,
remainingTableIds,
isTableIdCaseSensitive,
true);

@ -18,6 +18,7 @@
package com.ververica.cdc.connectors.mysql.source.assigners.state;
import com.ververica.cdc.connectors.mysql.source.assigners.SnapshotAssignerStatus;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader;
@ -59,7 +60,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
* Whether the snapshot split assigner is finished, which indicates there is no more splits and
* all records of splits have been completely processed in the pipeline.
*/
private final boolean isAssignerFinished;
private final SnapshotAssignerStatus assignerState;
/** Whether the table identifier is case sensitive. */
private final boolean isTableIdCaseSensitive;
@ -72,7 +73,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
List<MySqlSnapshotSplit> remainingSplits,
Map<String, MySqlSnapshotSplit> assignedSplits,
Map<String, BinlogOffset> splitFinishedOffsets,
boolean isAssignerFinished,
SnapshotAssignerStatus assignerState,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed) {
@ -80,7 +81,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
this.remainingSplits = remainingSplits;
this.assignedSplits = assignedSplits;
this.splitFinishedOffsets = splitFinishedOffsets;
this.isAssignerFinished = isAssignerFinished;
this.assignerState = assignerState;
this.remainingTables = remainingTables;
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
@ -102,8 +103,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
return splitFinishedOffsets;
}
public boolean isAssignerFinished() {
return isAssignerFinished;
public SnapshotAssignerStatus getAssignerState() {
return assignerState;
}
public List<TableId> getRemainingTables() {
@ -127,7 +128,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
return false;
}
SnapshotPendingSplitsState that = (SnapshotPendingSplitsState) o;
return isAssignerFinished == that.isAssignerFinished
return assignerState == that.assignerState
&& isTableIdCaseSensitive == that.isTableIdCaseSensitive
&& isRemainingTablesCheckpointed == that.isRemainingTablesCheckpointed
&& Objects.equals(remainingTables, that.remainingTables)
@ -145,7 +146,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
remainingSplits,
assignedSplits,
splitFinishedOffsets,
isAssignerFinished,
assignerState,
isTableIdCaseSensitive,
isRemainingTablesCheckpointed);
}
@ -163,8 +164,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
+ assignedSplits
+ ", splitFinishedOffsets="
+ splitFinishedOffsets
+ ", isAssignerFinished="
+ isAssignerFinished
+ ", assignerState="
+ assignerState
+ ", isTableIdCaseSensitive="
+ isTableIdCaseSensitive
+ ", isRemainingTablesCheckpointed="

@ -55,6 +55,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
private final boolean captureNewTables;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@ -82,6 +83,7 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
boolean captureNewTable,
Properties dbzProperties) {
this.hostname = checkNotNull(hostname);
this.port = port;
@ -101,6 +103,7 @@ public class MySqlSourceConfig implements Serializable {
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.captureNewTables = captureNewTable;
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
@ -179,6 +182,10 @@ public class MySqlSourceConfig implements Serializable {
return includeSchemaChanges;
}
public boolean isCaptureNewTables() {
return captureNewTables;
}
public Properties getDbzProperties() {
return dbzProperties;
}

@ -68,6 +68,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private double distributionFactorLower =
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean captureNewTables = false;
private Properties dbzProperties;
public MySqlSourceConfigFactory hostname(String hostname) {
@ -207,6 +208,12 @@ public class MySqlSourceConfigFactory implements Serializable {
return this;
}
/** Whether the {@link MySqlSource} should capture the newly added tables or not. */
public MySqlSourceConfigFactory captureNewTables(boolean captureNewTables) {
this.captureNewTables = captureNewTables;
return this;
}
/** Specifies the startup options. */
public MySqlSourceConfigFactory startupOptions(StartupOptions startupOptions) {
switch (startupOptions.startupMode) {
@ -303,6 +310,7 @@ public class MySqlSourceConfigFactory implements Serializable {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
captureNewTables,
props);
}
}

@ -198,4 +198,11 @@ public class MySqlSourceOptions {
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
@Experimental
public static final ConfigOption<Boolean> CAPTURE_NEW_TABLES =
ConfigOptions.key("capture-new-tables")
.booleanType()
.defaultValue(false)
.withDescription("Whether capture the snapshot of newly add tables.");
}

@ -23,17 +23,24 @@ import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitReaderSuspendedReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent;
import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeResponseEvent;
import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
@ -129,6 +136,15 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
"The enumerator receives request for binlog split meta from subtask {}.",
subtaskId);
sendBinlogMeta(subtaskId, (BinlogSplitMetaRequestEvent) sourceEvent);
} else if (sourceEvent instanceof BinlogSplitReaderSuspendedReportEvent) {
LOG.debug(
"The enumerator receives the binlog split reader suspended from subtask {}. ",
subtaskId);
handleBinlogSplitReaderSuspended(
subtaskId, (BinlogSplitReaderSuspendedReportEvent) sourceEvent);
} else if (sourceEvent instanceof TotalFinishedSplitSizeRequestEvent) {
handleTotalFinishedSplitSizeRequest(
subtaskId, (TotalFinishedSplitSizeRequestEvent) sourceEvent);
}
}
@ -196,6 +212,20 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
subtaskId, new FinishedSnapshotSplitsRequestEvent());
}
}
if (splitAssigner.isAssignerSuspended()) {
for (int subtaskId : subtaskIds) {
context.sendEventToSourceReader(subtaskId, new SuspendBinlogReaderEvent());
}
}
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
if (((MySqlHybridSplitAssigner) splitAssigner).isShouldWakeupBinlogSplitReader()) {
for (int subtaskId : subtaskIds) {
context.sendEventToSourceReader(
subtaskId, new WakeupReaderEvent(WakeupReaderEvent.WakeUpType.BINLOG));
}
}
}
}
private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent requestEvent) {
@ -232,4 +262,29 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
binlogSplitMeta.size() - 1);
}
}
private void handleBinlogSplitReaderSuspended(
int subTask, BinlogSplitReaderSuspendedReportEvent responseEvent) {
LOG.info(
"The enumerator receives response for suspend binlog split from subtask {}. ",
subTask);
splitAssigner.wakeup();
for (int subtaskId : getRegisteredReader()) {
context.sendEventToSourceReader(
subtaskId, new WakeupReaderEvent(WakeupReaderEvent.WakeUpType.SNAPSHOT));
}
}
private void handleTotalFinishedSplitSizeRequest(
int subTask, TotalFinishedSplitSizeRequestEvent event) {
Preconditions.checkState(!splitAssigner.isAssignerSuspended());
Preconditions.checkState(!splitAssigner.waitingForFinishedSplits());
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
context.sendEventToSourceReader(
subTask,
new TotalFinishedSplitSizeResponseEvent(
splitAssigner.getTotalFinishedSplitSize()));
((MySqlHybridSplitAssigner) splitAssigner).setShouldWakeupBinlogSplitReader(false);
}
}
}

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.events;
import org.apache.flink.api.connector.source.SourceEvent;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
* notify the binlog split reader has been suspended.
*/
public class BinlogSplitReaderSuspendedReportEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
public BinlogSplitReaderSuspendedReportEvent() {}
}

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.events;
import org.apache.flink.api.connector.source.SourceEvent;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* tell the source reader to suspend.
*/
public class SuspendBinlogReaderEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
public SuspendBinlogReaderEvent() {}
}

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.events;
import org.apache.flink.api.connector.source.SourceEvent;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* notify the source reader to the offset has been propagated to Enumerator.
*/
public class TotalFinishedSplitSizeRequestEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
public TotalFinishedSplitSizeRequestEvent() {}
}

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.events;
import org.apache.flink.api.connector.source.SourceEvent;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
* ask if the offset has been propagated to Enumerator.
*/
public class TotalFinishedSplitSizeResponseEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
private final int totalFinishedSplitSize;
public TotalFinishedSplitSizeResponseEvent(final int totalFinishedSplitSize) {
this.totalFinishedSplitSize = totalFinishedSplitSize;
}
public int getTotalFinishedSplitSize() {
return totalFinishedSplitSize;
}
@Override
public String toString() {
return "TotalFinishedSplitSizeResponseEvent{"
+ "totalFinishedSplitSize="
+ totalFinishedSplitSize
+ '}';
}
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.events;
import org.apache.flink.api.connector.source.SourceEvent;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
/**
* The {@link SourceEvent} that {@link MySqlSourceEnumerator} sends to {@link MySqlSourceReader} to
* wake up source reader to request split again.
*/
public class WakeupReaderEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
/** Wake up type. */
public enum WakeUpType {
SNAPSHOT,
BINLOG
}
private WakeUpType type;
public WakeupReaderEvent(WakeUpType type) {
this.type = type;
}
public WakeUpType getType() {
return type;
}
}

@ -19,7 +19,6 @@
package com.ververica.cdc.connectors.mysql.source.reader;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@ -27,14 +26,20 @@ import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSource
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitReaderSuspendedReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent;
import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeResponseEvent;
import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
@ -54,6 +59,7 @@ import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -61,7 +67,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
import static org.apache.flink.util.Preconditions.checkState;
/** The source reader for MySQL source splits. */
public class MySqlSourceReader<T>
@ -74,24 +79,28 @@ public class MySqlSourceReader<T>
private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits;
private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits;
private final int subtaskId;
private final MySqlSourceReaderContext mySqlSourceReaderContext;
private MySqlBinlogSplit suspendedSplit;
public MySqlSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>> elementQueue,
Supplier<MySqlSplitReader> splitReaderSupplier,
RecordEmitter<SourceRecord, T, MySqlSplitState> recordEmitter,
Configuration config,
SourceReaderContext context,
MySqlSourceReaderContext context,
MySqlSourceConfig sourceConfig) {
super(
elementQueue,
new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get),
recordEmitter,
config,
context);
context.getSourceReaderContext());
this.sourceConfig = sourceConfig;
this.finishedUnackedSplits = new HashMap<>();
this.uncompletedBinlogSplits = new HashMap<>();
this.subtaskId = context.getIndexOfSubtask();
this.subtaskId = context.getSourceReaderContext().getIndexOfSubtask();
this.mySqlSourceReaderContext = context;
this.suspendedSplit = null;
}
@Override
@ -121,6 +130,10 @@ public class MySqlSourceReader<T>
// add binlog splits who are uncompleted
stateSplits.addAll(uncompletedBinlogSplits.values());
if (suspendedSplit != null) {
stateSplits.add(suspendedSplit);
}
return stateSplits;
}
@ -128,12 +141,21 @@ public class MySqlSourceReader<T>
protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
checkState(
mySqlSplit.isSnapshotSplit(),
String.format(
"Only snapshot split could finish, but the actual split is binlog split %s",
mySqlSplit));
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
if (mySqlSplit.isBinlogSplit()) {
LOG.info(
"binlog split finished due to new table added, offset {}",
mySqlSplitState.asBinlogSplitState().getStartingOffset());
context.sendSourceEventToCoordinator(new BinlogSplitReaderSuspendedReportEvent());
suspendedSplit =
MySqlBinlogSplit.updateIsSuspended(mySqlSplit.asBinlogSplit(), true);
suspendedSplit = MySqlBinlogSplit.emptyFinishedSplitInfos(suspendedSplit);
suspendedSplit = MySqlBinlogSplit.emptyTableSchemas(suspendedSplit);
mySqlSourceReaderContext.setShouldBinlogSplitReaderStopped(false);
} else {
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
}
}
reportFinishedSnapshotSplitsIfNeed();
context.sendSplitRequest();
@ -144,6 +166,7 @@ public class MySqlSourceReader<T>
// restore for finishedUnackedSplits
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
LOG.info("Received Split: " + split);
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
if (snapshotSplit.isSnapshotReadFinished()) {
@ -152,8 +175,11 @@ public class MySqlSourceReader<T>
unfinishedSplits.add(split);
}
} else {
MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
// the binlog split is uncompleted
if (!split.asBinlogSplit().isCompletedSplit()) {
if (binlogSplit.isSuspended()) {
suspendedSplit = binlogSplit;
} else if (!binlogSplit.isCompletedSplit()) {
uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
} else {
@ -214,6 +240,35 @@ public class MySqlSourceReader<T>
subtaskId,
((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId());
fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent);
} else if (sourceEvent instanceof SuspendBinlogReaderEvent) {
if (!mySqlSourceReaderContext.isShouldBinlogSplitReaderStopped()) {
mySqlSourceReaderContext.setShouldBinlogSplitReaderStopped(true);
}
} else if (sourceEvent instanceof WakeupReaderEvent) {
WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent) sourceEvent;
if (wakeupReaderEvent.getType() == WakeupReaderEvent.WakeUpType.SNAPSHOT) {
context.sendSplitRequest();
} else {
if (suspendedSplit == null) {
return;
}
context.sendSourceEventToCoordinator(new TotalFinishedSplitSizeRequestEvent());
}
} else if (sourceEvent instanceof TotalFinishedSplitSizeResponseEvent) {
if (suspendedSplit == null) {
return;
}
Preconditions.checkState(suspendedSplit.isSuspended());
MySqlBinlogSplit split =
MySqlBinlogSplit.updateTotalFinishedSplitSize(
suspendedSplit,
((TotalFinishedSplitSizeResponseEvent) sourceEvent)
.getTotalFinishedSplitSize());
split = MySqlBinlogSplit.updateIsSuspended(split, false);
// Can flink make sure the following two sentense happend in the same checkpoint?
suspendedSplit = null;
this.addSplits(Collections.singletonList(split));
} else {
super.handleSourceEvents(sourceEvent);
}

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.reader;
import org.apache.flink.api.connector.source.SourceReaderContext;
/**
* A wrapper class that wraps SourceReaderContext and some data shared by {@link MySqlSourceReader}
* and {@link MySqlSplitReader}.
*/
public class MySqlSourceReaderContext {
SourceReaderContext sourceReaderContext;
boolean shouldBinlogSplitReaderStopped;
public MySqlSourceReaderContext(final SourceReaderContext sourceReaderContext) {
this.sourceReaderContext = sourceReaderContext;
this.shouldBinlogSplitReaderStopped = false;
}
public SourceReaderContext getSourceReaderContext() {
return sourceReaderContext;
}
public void setSourceReaderContext(final SourceReaderContext sourceReaderContext) {
this.sourceReaderContext = sourceReaderContext;
}
public boolean isShouldBinlogSplitReaderStopped() {
return shouldBinlogSplitReaderStopped;
}
public void setShouldBinlogSplitReaderStopped(final boolean shouldBinlogSplitReaderStopped) {
this.shouldBinlogSplitReaderStopped = shouldBinlogSplitReaderStopped;
}
}

@ -54,22 +54,30 @@ public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
private final Queue<MySqlSplit> splits;
private final MySqlSourceConfig sourceConfig;
private final int subtaskId;
private final MySqlSourceReaderContext context;
@Nullable private DebeziumReader<SourceRecord, MySqlSplit> currentReader;
@Nullable private String currentSplitId;
public MySqlSplitReader(MySqlSourceConfig sourceConfig, int subtaskId) {
public MySqlSplitReader(
MySqlSourceConfig sourceConfig, int subtaskId, MySqlSourceReaderContext context) {
this.sourceConfig = sourceConfig;
this.subtaskId = subtaskId;
this.splits = new ArrayDeque<>();
this.context = context;
}
@Override
public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
checkSplitOrStartNext();
Iterator<SourceRecord> dataIt = null;
try {
dataIt = currentReader.pollSplitRecords();
if (context.isShouldBinlogSplitReaderStopped()
&& currentReader instanceof BinlogSplitReader) {
((BinlogSplitReader) currentReader).finishBinlogSplit();
}
} catch (InterruptedException e) {
LOG.warn("fetch data failed.", e);
throw new IOException(e);
@ -107,19 +115,20 @@ public class MySqlSplitReader implements SplitReader<SourceRecord, MySqlSplit> {
}
private void checkSplitOrStartNext() throws IOException {
// the binlog reader should keep alive
if (currentReader instanceof BinlogSplitReader) {
return;
}
if (canAssignNextSplit()) {
final MySqlSplit nextSplit = splits.poll();
MySqlSplit nextSplit = splits.poll();
if (nextSplit == null) {
throw new IOException("Cannot fetch from another split - no split remaining");
return;
}
currentSplitId = nextSplit.splitId();
if (nextSplit.isSnapshotSplit()) {
if (currentReader instanceof BinlogSplitReader) {
LOG.info("This is the point from binlog split back to snapshot split");
currentReader.close();
currentReader = null;
}
if (currentReader == null) {
final MySqlConnection jdbcConnection =
createMySqlConnection(sourceConfig.getDbzConfiguration());

@ -24,6 +24,8 @@ import io.debezium.relational.history.TableChanges.TableChange;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -36,8 +38,26 @@ public class MySqlBinlogSplit extends MySqlSplit {
private final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos;
private final Map<TableId, TableChange> tableSchemas;
private final int totalFinishedSplitSize;
private final boolean isSuspended;
@Nullable transient byte[] serializedFormCache;
public MySqlBinlogSplit(
String splitId,
BinlogOffset startingOffset,
BinlogOffset endingOffset,
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos,
Map<TableId, TableChange> tableSchemas,
int totalFinishedSplitSize,
boolean isSuspended) {
super(splitId);
this.startingOffset = startingOffset;
this.endingOffset = endingOffset;
this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = isSuspended;
}
public MySqlBinlogSplit(
String splitId,
BinlogOffset startingOffset,
@ -51,6 +71,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
this.finishedSnapshotSplitInfos = finishedSnapshotSplitInfos;
this.tableSchemas = tableSchemas;
this.totalFinishedSplitSize = totalFinishedSplitSize;
this.isSuspended = false;
}
public BinlogOffset getStartingOffset() {
@ -74,6 +95,10 @@ public class MySqlBinlogSplit extends MySqlSplit {
return totalFinishedSplitSize;
}
public boolean isSuspended() {
return isSuspended;
}
public boolean isCompletedSplit() {
return totalFinishedSplitSize == finishedSnapshotSplitInfos.size();
}
@ -91,6 +116,7 @@ public class MySqlBinlogSplit extends MySqlSplit {
}
MySqlBinlogSplit that = (MySqlBinlogSplit) o;
return totalFinishedSplitSize == that.totalFinishedSplitSize
&& isSuspended == that.isSuspended
&& Objects.equals(startingOffset, that.startingOffset)
&& Objects.equals(endingOffset, that.endingOffset)
&& Objects.equals(finishedSnapshotSplitInfos, that.finishedSnapshotSplitInfos)
@ -105,7 +131,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
endingOffset,
finishedSnapshotSplitInfos,
tableSchemas,
totalFinishedSplitSize);
totalFinishedSplitSize,
isSuspended);
}
@Override
@ -118,6 +145,8 @@ public class MySqlBinlogSplit extends MySqlSplit {
+ startingOffset
+ ", endOffset="
+ endingOffset
+ ", isSuspended="
+ isSuspended
+ '}';
}
@ -133,7 +162,30 @@ public class MySqlBinlogSplit extends MySqlSplit {
binlogSplit.getEndingOffset(),
splitInfos,
binlogSplit.getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize());
binlogSplit.getTotalFinishedSplitSize(),
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit emptyFinishedSplitInfos(MySqlBinlogSplit binlogSplit) {
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
new ArrayList<>(),
binlogSplit.getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize(),
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit emptyTableSchemas(MySqlBinlogSplit binlogSplit) {
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
binlogSplit.getFinishedSnapshotSplitInfos(),
new HashMap<>(),
binlogSplit.getTotalFinishedSplitSize(),
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit fillTableSchemas(
@ -145,6 +197,31 @@ public class MySqlBinlogSplit extends MySqlSplit {
binlogSplit.getEndingOffset(),
binlogSplit.getFinishedSnapshotSplitInfos(),
tableSchemas,
binlogSplit.getTotalFinishedSplitSize());
binlogSplit.getTotalFinishedSplitSize(),
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit updateTotalFinishedSplitSize(
MySqlBinlogSplit binlogSplit, int totalFinishedSplitSize) {
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
binlogSplit.getFinishedSnapshotSplitInfos(),
binlogSplit.getTableSchemas(),
totalFinishedSplitSize,
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit updateIsSuspended(
MySqlBinlogSplit binlogSplit, boolean isSuspended) {
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
binlogSplit.getFinishedSnapshotSplitInfos(),
binlogSplit.getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize(),
isSuspended);
}
}

@ -74,7 +74,8 @@ public class MySqlBinlogSplitState extends MySqlSplitState {
getEndingOffset(),
binlogSplit.asBinlogSplit().getFinishedSnapshotSplitInfos(),
getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize());
binlogSplit.getTotalFinishedSplitSize(),
binlogSplit.isSuspended());
}
@Override

@ -49,7 +49,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
public static final MySqlSplitSerializer INSTANCE = new MySqlSplitSerializer();
private static final int VERSION = 3;
private static final int VERSION = 4;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
@ -119,6 +119,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
case 1:
case 2:
case 3:
case 4:
return deserializeSplit(version, serialized);
default:
throw new IOException("Unknown version: " + version);
@ -156,7 +157,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
readFinishedSplitsInfo(version, in);
Map<TableId, TableChange> tableChangeMap = readTableSchemas(version, in);
int totalFinishedSplitSize = finishedSplitsInfo.size();
if (version == 3) {
if (version > 3) {
totalFinishedSplitSize = in.readInt();
}
in.releaseArrays();
@ -202,6 +203,7 @@ public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MyS
break;
case 2:
case 3:
case 4:
final int len = in.readInt();
final byte[] bytes = new byte[len];
in.read(bytes);

@ -54,6 +54,7 @@ public class SerializerUtils {
return in.readBoolean() ? new BinlogOffset(in.readUTF(), in.readLong()) : null;
case 2:
case 3:
case 4:
return readBinlogPosition(in);
default:
throw new IOException("Unknown version: " + offsetVersion);

@ -78,6 +78,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final StartupOptions startupOptions;
private final boolean captureNewTables;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@ -110,6 +111,52 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
double distributionFactorUpper,
double distributionFactorLower,
StartupOptions startupOptions) {
this(
physicalSchema,
port,
hostname,
database,
tableName,
username,
password,
serverTimeZone,
dbzProperties,
serverId,
enableParallelRead,
splitSize,
splitMetaGroupSize,
fetchSize,
connectTimeout,
connectMaxRetries,
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
startupOptions,
false);
}
public MySqlTableSource(
TableSchema physicalSchema,
int port,
String hostname,
String database,
String tableName,
String username,
String password,
ZoneId serverTimeZone,
Properties dbzProperties,
@Nullable String serverId,
boolean enableParallelRead,
int splitSize,
int splitMetaGroupSize,
int fetchSize,
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
double distributionFactorUpper,
double distributionFactorLower,
StartupOptions startupOptions,
boolean captureNewTable) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@ -130,6 +177,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.startupOptions = startupOptions;
this.captureNewTables = captureNewTable;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
@ -184,6 +232,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer)
.captureNewTables(captureNewTables)
.build();
return SourceProvider.of(parallelSource);
} else {

@ -38,6 +38,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CAPTURE_NEW_TABLES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
@ -97,6 +98,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean captureNewTables = config.get(CAPTURE_NEW_TABLES);
boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
if (enableParallelRead) {
@ -131,7 +133,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
startupOptions);
startupOptions,
captureNewTables);
}
@Override
@ -169,6 +172,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(CONNECT_MAX_RETRIES);
options.add(CAPTURE_NEW_TABLES);
return options;
}

@ -120,6 +120,96 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"customers"});
}
@Test
public void testJobManagerFailoverInBinlogPhaseWithNewlyAddTable() throws Exception {
customDatabase.createAndInitialize();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(200L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
String sourceDDL =
String.format(
"CREATE TABLE address ("
+ " id BIGINT NOT NULL,"
+ " country STRING,"
+ " city STRING,"
+ " detail_address STRING,"
+ " primary key (id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' = 'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = 'address.*',"
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
+ " 'server-id' = '%s',"
+ " 'capture-new-tables' = 'true'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customDatabase.getUsername(),
customDatabase.getPassword(),
customDatabase.getDatabaseName(),
getServerId());
// first step: check the snapshot data
String[] snapshotForSingleTable =
new String[] {
"+I[416874195632735147, China, Beijing, West Town address 1]",
"+I[416927583791428523, China, Beijing, West Town address 2]",
"+I[417022095255614379, China, Beijing, West Town address 3]",
"+I[417111867899200427, America, New York, East Town address 1]",
"+I[417271541558096811, America, New York, East Town address 2]",
"+I[417272886855938987, America, New York, East Town address 3]",
"+I[417420106184475563, Germany, Berlin, West Town address 1]",
"+I[418161258277847979, Germany, Berlin, West Town address 2]"
};
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from address");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = Arrays.asList(snapshotForSingleTable);
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
createNewTableAndInsertData(
getConnection(), customDatabase.getDatabaseName() + ".address1");
List<String> expectedBinlogData = expectedSnapshotData;
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
// after this fail, we hope to fetch address1 snapshot data
triggerFailover(
FailoverType.JM, jobId, miniClusterResource.getMiniCluster(), () -> sleepMs(200));
assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
String[] binlogForSingleTable =
new String[] {
"+I[826874195632735147, China, Beijing, West Town address 1]",
"+I[826927583791428523, China, Beijing, West Town address 2]",
"+I[827022095255614379, China, Beijing, West Town address 3]",
"+I[827111867899200427, America, New York, East Town address 1]",
"+I[827271541558096811, America, New York, East Town address 2]",
"+I[827272886855938987, America, New York, East Town address 3]",
"+I[827420106184475563, Germany, Berlin, West Town address 1]",
"+I[828161258277847979, Germany, Berlin, West Town address 2]"
};
expectedBinlogData = Arrays.asList(binlogForSingleTable);
insertDataToGenerateSomeBinlog(
getConnection(), customDatabase.getDatabaseName() + ".address1");
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
tableResult.getJobClient().get().cancel().get();
}
private void testMySqlParallelSource(
FailoverType failoverType, FailoverPhase failoverPhase, String[] captureCustomerTables)
throws Exception {
@ -318,6 +408,62 @@ public class MySqlSourceITCase extends MySqlSourceTestBase {
}
}
private void createNewTableAndInsertData(JdbcConnection connection, String tableId)
throws SQLException {
try {
connection.setAutoCommit(false);
connection.execute(
"CREATE TABLE "
+ tableId
+ "("
+ "id BIGINT UNSIGNED NOT NULL PRIMARY KEY,"
+ "country VARCHAR(255) NOT NULL,"
+ "city VARCHAR(255) NOT NULL,"
+ "detail_address VARCHAR(1024))");
connection.commit();
// insert some data
connection.execute(
"INSERT INTO "
+ tableId
+ " VALUES(416874195632735147, 'China', 'Beijing', 'West Town address 1'),"
+ "(416927583791428523, 'China', 'Beijing', 'West Town address 2'),"
+ "(417022095255614379, 'China', 'Beijing', 'West Town address 3'),"
+ "(417111867899200427, 'America', 'New York', 'East Town address 1'),"
+ "(417271541558096811, 'America', 'New York', 'East Town address 2'),"
+ "(417272886855938987, 'America', 'New York', 'East Town address 3'),"
+ "(417420106184475563, 'Germany', 'Berlin', 'West Town address 1'),"
+ "(418161258277847979, 'Germany', 'Berlin', 'West Town address 2')");
connection.commit();
} finally {
connection.close();
}
}
private void insertDataToGenerateSomeBinlog(JdbcConnection connection, String tableId)
throws SQLException {
try {
connection.setAutoCommit(false);
// insert some data
connection.execute(
"INSERT INTO "
+ tableId
+ " VALUES(826874195632735147, 'China', 'Beijing', 'West Town address 1'),"
+ "(826927583791428523, 'China', 'Beijing', 'West Town address 2'),"
+ "(827022095255614379, 'China', 'Beijing', 'West Town address 3'),"
+ "(827111867899200427, 'America', 'New York', 'East Town address 1'),"
+ "(827271541558096811, 'America', 'New York', 'East Town address 2'),"
+ "(827272886855938987, 'America', 'New York', 'East Town address 3'),"
+ "(827420106184475563, 'Germany', 'Berlin', 'West Town address 1'),"
+ "(828161258277847979, 'Germany', 'Berlin', 'West Town address 2')");
connection.commit();
} finally {
connection.close();
}
}
private MySqlConnection getConnection() {
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());

@ -105,7 +105,7 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
remainingSplits,
assignedSplits,
splitFinishedOffsets,
true,
SnapshotAssignerStatus.INIT_FINISH,
new ArrayList<>(),
false,
true);

@ -21,6 +21,7 @@ package com.ververica.cdc.connectors.mysql.source.assigners.state;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.connectors.mysql.source.assigners.SnapshotAssignerStatus;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer;
@ -124,7 +125,7 @@ public class PendingSplitsStateSerializerTest {
remainingSplits,
assignedSnapshotSplits,
finishedOffsets,
false,
SnapshotAssignerStatus.INIT,
remainingTables,
false,
true);

@ -251,17 +251,20 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
new ForwardDeserializeSchema(),
new MySqlSourceReaderMetrics(readerContext.metricGroup()),
configuration.isIncludeSchemaChanges());
final MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext);
return new MySqlSourceReader<>(
elementsQueue,
() -> createSplitReader(configuration),
() -> createSplitReader(configuration, mySqlSourceReaderContext),
recordEmitter,
readerContext.getConfiguration(),
readerContext,
mySqlSourceReaderContext,
configuration);
}
private MySqlSplitReader createSplitReader(MySqlSourceConfig configuration) {
return new MySqlSplitReader(configuration, 0);
private MySqlSplitReader createSplitReader(
MySqlSourceConfig configuration, MySqlSourceReaderContext readerContext) {
return new MySqlSplitReader(configuration, 0, readerContext);
}
private void makeBinlogEventsInOneTransaction(MySqlSourceConfig sourceConfig, String tableId)

Loading…
Cancel
Save