diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index cab749917..0a397d604 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -274,7 +274,7 @@ public class BinlogSplitReader implements DebeziumReader { return this; } - /** Whether the {@link MySqlSource} should capture the newly added tables or not. */ - public MySqlSourceBuilder captureNewTables(boolean captureNewTables) { - this.configFactory.captureNewTables(captureNewTables); + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ + public MySqlSourceBuilder scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { + this.configFactory.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); return this; } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/AssignerStatus.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/AssignerStatus.java new file mode 100644 index 000000000..052f133a7 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/AssignerStatus.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.source.assigners; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.lang.String.format; + +/** + * The state of split assigner finite state machine, tips: we use word status instead of word state + * to avoid conflict with Flink state keyword. The assigner finite state machine goes this way. + * + *
+ *        INITIAL_ASSIGNING(start)
+ *              |
+ *              |
+ *          onFinish()
+ *              |
+ *              ↓
+ *    INITIAL_ASSIGNING_FINISHED(end)
+ *              |
+ *              |
+ *        suspend() // found newly added tables
+ *              |
+ *              ↓
+ *          SUSPENDED --- wakeup() --→ NEWLY_ADDED_ASSIGNING --- onFinish() --→ NEWLY_ADDED_ASSIGNING_FINISHED(end)
+ *              ↑                                                                  |
+ *              |                                                                  |
+ *              |----------------- suspend() //found newly added tables -----------|
+ * 
+ */ +public enum AssignerStatus { + INITIAL_ASSIGNING(0) { + @Override + public AssignerStatus getNextStatus() { + return INITIAL_ASSIGNING_FINISHED; + } + + @Override + public AssignerStatus onFinish() { + LOG.info( + "Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED"); + return this.getNextStatus(); + } + }, + INITIAL_ASSIGNING_FINISHED(1) { + @Override + public AssignerStatus getNextStatus() { + return SUSPENDED; + } + + @Override + public AssignerStatus suspend() { + LOG.info("Assigner status changes from INITIAL_ASSIGNING_FINISHED to SUSPENDED"); + return this.getNextStatus(); + } + }, + SUSPENDED(2) { + @Override + public AssignerStatus getNextStatus() { + return NEWLY_ADDED_ASSIGNING; + } + + @Override + public AssignerStatus wakeup() { + LOG.info("Assigner status changes from SUSPENDED to NEWLY_ADDED_ASSIGNING"); + return this.getNextStatus(); + } + }, + NEWLY_ADDED_ASSIGNING(3) { + @Override + public AssignerStatus getNextStatus() { + return NEWLY_ADDED_ASSIGNING_FINISHED; + } + + @Override + public AssignerStatus onFinish() { + LOG.info( + "Assigner status changes from NEWLY_ADDED_ASSIGNING to NEWLY_ADDED_ASSIGNING_FINISHED"); + return this.getNextStatus(); + } + }, + NEWLY_ADDED_ASSIGNING_FINISHED(4) { + @Override + public AssignerStatus getNextStatus() { + return SUSPENDED; + } + + @Override + public AssignerStatus suspend() { + LOG.info("Assigner status changes from NEWLY_ADDED_ASSIGNING_FINISHED to SUSPENDED"); + return this.getNextStatus(); + } + }; + + private static final Logger LOG = LoggerFactory.getLogger(AssignerStatus.class); + private final int statusCode; + + AssignerStatus(int statusCode) { + this.statusCode = statusCode; + } + + public int getStatusCode() { + return statusCode; + } + + public abstract AssignerStatus getNextStatus(); + + public AssignerStatus onFinish() { + throw new IllegalStateException( + format( + "Invalid call, assigner under %s state can not call onFinish()", + fromStatusCode(this.getStatusCode()))); + } + + public AssignerStatus suspend() { + throw new IllegalStateException( + format( + "Invalid call, assigner under %s state can not call suspend()", + fromStatusCode(this.getStatusCode()))); + } + + public AssignerStatus wakeup() { + throw new IllegalStateException( + format( + "Invalid call, assigner under %s state can not call wakeup()", + fromStatusCode(this.getStatusCode()))); + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + /** Gets the {@link AssignerStatus} from status code. */ + public static AssignerStatus fromStatusCode(int statusCode) { + switch (statusCode) { + case 0: + return INITIAL_ASSIGNING; + case 1: + return INITIAL_ASSIGNING_FINISHED; + case 2: + return SUSPENDED; + case 3: + return NEWLY_ADDED_ASSIGNING; + case 4: + return NEWLY_ADDED_ASSIGNING_FINISHED; + default: + throw new IllegalStateException( + format( + "Invalid status code %s,the valid code range is [0, 4]", + statusCode)); + } + } + + /** Returns whether the split assigner state is suspended. */ + public static boolean isSuspended(AssignerStatus assignerStatus) { + return assignerStatus == SUSPENDED; + } + + /** + * Returns whether the split assigner has assigned all snapshot splits, which indicates there is + * no more splits and all records of splits have been completely processed in the pipeline. + */ + public static boolean isAssigningFinished(AssignerStatus assignerStatus) { + return assignerStatus == INITIAL_ASSIGNING_FINISHED + || assignerStatus == NEWLY_ADDED_ASSIGNING_FINISHED; + } + + /** Returns whether the split assigner is assigning snapshot splits. */ + public static boolean isAssigning(AssignerStatus assignerStatus) { + return assignerStatus == INITIAL_ASSIGNING || assignerStatus == NEWLY_ADDED_ASSIGNING; + } + + /** Returns whether the split assigner has finished its initial tables assignment. */ + public static boolean isInitialAssigningFinished(AssignerStatus assignerStatus) { + return assignerStatus == INITIAL_ASSIGNING_FINISHED; + } + + /** Returns whether the split assigner has finished its newly added tables assignment. */ + public static boolean isNewlyAddedAssigningFinished(AssignerStatus assignerStatus) { + return assignerStatus == NEWLY_ADDED_ASSIGNING_FINISHED; + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java index bfadf6b43..3ee6960fd 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java @@ -112,21 +112,19 @@ public class MySqlBinlogSplitAssigner implements MySqlSplitAssigner { } @Override - public void close() {} - - @Override - public boolean isAssignerSuspended() { - return false; + public AssignerStatus getAssignerStatus() { + return AssignerStatus.INITIAL_ASSIGNING_FINISHED; } @Override - public int getTotalFinishedSplitSize() { - return 0; - } + public void suspend() {} @Override public void wakeup() {} + @Override + public void close() {} + // ------------------------------------------------------------------------------------------ private MySqlBinlogSplit createBinlogSplit() { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index 259e6709b..5b2d75645 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -39,6 +39,10 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isInitialAssigningFinished; +import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isNewlyAddedAssigningFinished; +import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended; + /** * A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key * range and chunk size and also continue with a binlog split. @@ -51,7 +55,6 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { private final int splitMetaGroupSize; private boolean isBinlogSplitAssigned; - private boolean shouldWakeupBinlogSplitReader = false; private final MySqlSnapshotSplitAssigner snapshotSplitAssigner; @@ -94,8 +97,8 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { @Override public Optional getNext() { - if (snapshotSplitAssigner.isAssignerSuspended()) { - // do not assign split until Assigner receive SuspendBinlogSplitReaderResponseEvent + if (isSuspended(getAssignerStatus())) { + // do not assign split until the assigner received SuspendBinlogReaderAckEvent return Optional.empty(); } if (snapshotSplitAssigner.noMoreSplits()) { @@ -103,18 +106,15 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { if (isBinlogSplitAssigned) { // no more splits for the assigner return Optional.empty(); - } else if (snapshotSplitAssigner.getAssignerState() - == SnapshotAssignerStatus.INIT_FINISH) { + } else if (isInitialAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) { // we need to wait snapshot-assigner to be finished before // assigning the binlog split. Otherwise, records emitted from binlog split // might be out-of-order in terms of same primary key with snapshot splits. isBinlogSplitAssigned = true; return Optional.of(createBinlogSplit()); - } else if (snapshotSplitAssigner.getAssignerState() - == SnapshotAssignerStatus.RESUMED_FINISH) { - // do not need to create binlog, but send event to wake up the binlog + } else if (isNewlyAddedAssigningFinished(snapshotSplitAssigner.getAssignerStatus())) { + // do not need to create binlog, but send event to wake up the binlog reader isBinlogSplitAssigned = true; - shouldWakeupBinlogSplitReader = true; return Optional.empty(); } else { // binlog split is not ready by now @@ -167,32 +167,23 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner { } @Override - public void close() { - snapshotSplitAssigner.close(); + public AssignerStatus getAssignerStatus() { + return snapshotSplitAssigner.getAssignerStatus(); } @Override - public boolean isAssignerSuspended() { - return snapshotSplitAssigner.isAssignerSuspended(); + public void suspend() { + snapshotSplitAssigner.suspend(); } @Override public void wakeup() { - isBinlogSplitAssigned = false; snapshotSplitAssigner.wakeup(); } - public boolean isShouldWakeupBinlogSplitReader() { - return shouldWakeupBinlogSplitReader; - } - - public void setShouldWakeupBinlogSplitReader(final boolean shouldWakeupBinlogSplitReader) { - this.shouldWakeupBinlogSplitReader = shouldWakeupBinlogSplitReader; - } - @Override - public int getTotalFinishedSplitSize() { - return snapshotSplitAssigner.getTotalFinishedSplitSize(); + public void close() { + snapshotSplitAssigner.close(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 5c3624a0c..d53d53e92 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -50,6 +50,8 @@ import java.util.stream.Collectors; import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables; import static com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection; +import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isAssigningFinished; +import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended; /** * A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key @@ -64,12 +66,12 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private final List remainingSplits; private final Map assignedSplits; private final Map splitFinishedOffsets; - private SnapshotAssignerStatus assignerState; private final MySqlSourceConfig sourceConfig; private final int currentParallelism; private final LinkedList remainingTables; private final boolean isRemainingTablesCheckpointed; + private AssignerStatus assignerStatus; private ChunkSplitter chunkSplitter; private boolean isTableIdCaseSensitive; @@ -87,7 +89,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { new ArrayList<>(), new HashMap<>(), new HashMap<>(), - SnapshotAssignerStatus.INIT, + AssignerStatus.INITIAL_ASSIGNING, remainingTables, isTableIdCaseSensitive, true); @@ -104,7 +106,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { checkpoint.getRemainingSplits(), checkpoint.getAssignedSplits(), checkpoint.getSplitFinishedOffsets(), - checkpoint.getAssignerState(), + checkpoint.getSnapshotAssignerStatus(), checkpoint.getRemainingTables(), checkpoint.isTableIdCaseSensitive(), checkpoint.isRemainingTablesCheckpointed()); @@ -117,7 +119,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { List remainingSplits, Map assignedSplits, Map splitFinishedOffsets, - SnapshotAssignerStatus assignerState, + AssignerStatus assignerStatus, List remainingTables, boolean isTableIdCaseSensitive, boolean isRemainingTablesCheckpointed) { @@ -127,7 +129,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { this.remainingSplits = remainingSplits; this.assignedSplits = assignedSplits; this.splitFinishedOffsets = splitFinishedOffsets; - this.assignerState = assignerState; + this.assignerStatus = assignerStatus; this.remainingTables = new LinkedList<>(remainingTables); this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; this.isTableIdCaseSensitive = isTableIdCaseSensitive; @@ -138,7 +140,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { chunkSplitter = createChunkSplitter(sourceConfig, isTableIdCaseSensitive); // the legacy state didn't snapshot remaining tables, discovery remaining table here - if (!isRemainingTablesCheckpointed && !isFinished()) { + if (!isRemainingTablesCheckpointed && !isAssigningFinished(assignerStatus)) { try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { final List discoverTables = discoverCapturedTables(jdbc, sourceConfig); discoverTables.removeAll(alreadyProcessedTables); @@ -149,29 +151,32 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { "Failed to discover remaining tables to capture", e); } } + captureNewlyAddedTables(); + } - // If the config do not need to capture newly added tables - if (!sourceConfig.isCaptureNewTables()) { - return; - } - - // check whether we got newly added tables - try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { - final List discoverTables = discoverCapturedTables(jdbc, sourceConfig); - discoverTables.removeAll(alreadyProcessedTables); - // got some newly added tables - if (!discoverTables.isEmpty()) { - if (isFinished()) { - Preconditions.checkState(remainingTables.isEmpty()); - assignerState = assignerState.nextState(); - } else { - // if job is in snapshot phase, directly add the new table to remaining table - this.remainingTables.removeAll(discoverTables); + private void captureNewlyAddedTables() { + if (sourceConfig.isScanNewlyAddedTableEnabled()) { + // check whether we got newly added tables + try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { + final List newlyAddedTables = discoverCapturedTables(jdbc, sourceConfig); + newlyAddedTables.removeAll(alreadyProcessedTables); + newlyAddedTables.removeAll(remainingTables); + if (!newlyAddedTables.isEmpty()) { + // if job is still in snapshot reading phase, directly add all newly added + // tables + LOG.info("Found newly added tables, start capture newly added tables process"); + remainingTables.addAll(newlyAddedTables); + if (isAssigningFinished(assignerStatus)) { + // start the newly added tables process under binlog reading phase + LOG.info( + "Found newly added tables, start capture newly added tables process under binlog reading phase"); + this.suspend(); + } } - remainingTables.addAll(discoverTables); + } catch (Exception e) { + throw new FlinkRuntimeException( + "Failed to discover remaining tables to capture", e); } - } catch (Exception e) { - throw new FlinkRuntimeException("Failed to discover remaining tables to capture", e); } } @@ -185,7 +190,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { assignedSplits.put(split.splitId(), split); return Optional.of(split); } else { - // it's turn for new table + // it's turn for next table TableId nextTable = remainingTables.pollFirst(); if (nextTable != null) { // split the given table into chunks (snapshot splits) @@ -233,14 +238,13 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { @Override public void onFinishedSplits(Map splitFinishedOffsets) { this.splitFinishedOffsets.putAll(splitFinishedOffsets); - if (allSplitsFinished()) { + if (allSplitsFinished() && AssignerStatus.isAssigning(assignerStatus)) { // Skip the waiting checkpoint when current parallelism is 1 which means we do not need // to care about the global output data order of snapshot splits and binlog split. if (currentParallelism == 1) { - assignerState = assignerState.nextState(); + assignerStatus = assignerStatus.onFinish(); LOG.info( "Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status."); - } else { LOG.info( "Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished."); @@ -267,13 +271,15 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { remainingSplits, assignedSplits, splitFinishedOffsets, - assignerState, + assignerStatus, remainingTables, isTableIdCaseSensitive, true); // we need a complete checkpoint before mark this assigner to be finished, to wait for all // records of snapshot splits are completely processed - if (checkpointIdToFinish == null && !isFinished() && allSplitsFinished()) { + if (checkpointIdToFinish == null + && !isAssigningFinished(assignerStatus) + && allSplitsFinished()) { checkpointIdToFinish = checkpointId; } return state; @@ -283,51 +289,43 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { public void notifyCheckpointComplete(long checkpointId) { // we have waited for at-least one complete checkpoint after all snapshot-splits are // finished, then we can mark snapshot assigner as finished. - if (checkpointIdToFinish != null && !isFinished() && allSplitsFinished()) { + if (checkpointIdToFinish != null + && !isAssigningFinished(assignerStatus) + && allSplitsFinished()) { if (checkpointId >= checkpointIdToFinish) { - assignerState = assignerState.nextState(); + assignerStatus = assignerStatus.onFinish(); } LOG.info("Snapshot split assigner is turn into finished status."); } } @Override - public void close() {} + public AssignerStatus getAssignerStatus() { + return assignerStatus; + } @Override - public boolean isAssignerSuspended() { - return assignerState == SnapshotAssignerStatus.SUSPENDED; + public void suspend() { + Preconditions.checkState( + isAssigningFinished(assignerStatus), "Invalid assigner status {}", assignerStatus); + assignerStatus = assignerStatus.suspend(); } @Override public void wakeup() { - Preconditions.checkState(assignerState == SnapshotAssignerStatus.SUSPENDED); - assignerState = assignerState.nextState(); - } - - public SnapshotAssignerStatus getAssignerState() { - return assignerState; + Preconditions.checkState( + isSuspended(assignerStatus), "Invalid assigner status {}", assignerStatus); + assignerStatus = assignerStatus.wakeup(); } @Override - public int getTotalFinishedSplitSize() { - return splitFinishedOffsets.size(); - } + public void close() {} /** Indicates there is no more splits available in this assigner. */ public boolean noMoreSplits() { return remainingTables.isEmpty() && remainingSplits.isEmpty(); } - /** - * Returns whether the snapshot split assigner is finished, which indicates there is no more - * splits and all records of splits have been completely processed in the pipeline. - */ - public boolean isFinished() { - return assignerState == SnapshotAssignerStatus.INIT_FINISH - || assignerState == SnapshotAssignerStatus.RESUMED_FINISH; - } - public Map getAssignedSplits() { return assignedSplits; } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java index e7b69b463..d6b31ebf6 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java @@ -101,17 +101,21 @@ public interface MySqlSplitAssigner { */ void notifyCheckpointComplete(long checkpointId); + /** Gets the split assigner status, see {@code AssignerStatus}. */ + AssignerStatus getAssignerStatus(); + + /** + * Suspends the assigner under {@link AssignerStatus#INITIAL_ASSIGNING_FINISHED} or {@link + * AssignerStatus#NEWLY_ADDED_ASSIGNING_FINISHED}. + */ + void suspend(); + + /** Wakes up the assigner under {@link AssignerStatus#SUSPENDED}. */ + void wakeup(); + /** * Called to close the assigner, in case it holds on to any resources, like threads or network * connections. */ void close(); - - /** Whether the assigner is suspended. */ - boolean isAssignerSuspended(); - - /** Get the total finished split count. */ - int getTotalFinishedSplitSize(); - - void wakeup(); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/SnapshotAssignerStatus.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/SnapshotAssignerStatus.java deleted file mode 100644 index fa9a98713..000000000 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/SnapshotAssignerStatus.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.connectors.mysql.source.assigners; - -import org.apache.flink.util.Preconditions; - -/** The snapshot assigner state machine. */ -public enum SnapshotAssignerStatus { - /** - * The assigner state machine goes this way. - * - *
-     * INIT -> INIT_FINISH -> SUSPENDED -> RESUMED -> RESUMED_FINISH
-     *                            ^                         |
-     *                            |_________________________|
-     * 
- */ - INIT { - @Override - public int getValue() { - return 0; - } - - @Override - public SnapshotAssignerStatus nextState() { - return INIT_FINISH; - } - }, - INIT_FINISH { - @Override - public int getValue() { - return 1; - } - - @Override - public SnapshotAssignerStatus nextState() { - return SUSPENDED; - } - }, - SUSPENDED { - @Override - public int getValue() { - return 2; - } - - @Override - public SnapshotAssignerStatus nextState() { - return RESUMED; - } - }, - RESUMED { - @Override - public int getValue() { - return 3; - } - - @Override - public SnapshotAssignerStatus nextState() { - return RESUMED_FINISH; - } - }, - RESUMED_FINISH { - @Override - public int getValue() { - return 4; - } - - @Override - public SnapshotAssignerStatus nextState() { - return SUSPENDED; - } - }; - - public abstract int getValue(); - - public abstract SnapshotAssignerStatus nextState(); - - public static SnapshotAssignerStatus fromInteger(int x) { - Preconditions.checkState(x >= 0 && x < 5); - switch (x) { - case 0: - return INIT; - case 1: - return INIT_FINISH; - case 2: - return SUSPENDED; - case 3: - return RESUMED; - case 4: - return RESUMED_FINISH; - } - return null; - } -} diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java index b99a4a1d7..4efd49d22 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java @@ -22,7 +22,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; -import com.ververica.cdc.connectors.mysql.source.assigners.SnapshotAssignerStatus; +import com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; @@ -152,7 +152,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer

assignedSnapshotSplits = readAssignedSnapshotSplits(splitVersion, in); Map finishedOffsets = readFinishedOffsets(splitVersion, in); - SnapshotAssignerStatus assignerState; + AssignerStatus assignerStatus; boolean isAssignerFinished = in.readBoolean(); if (isAssignerFinished) { - assignerState = SnapshotAssignerStatus.INIT_FINISH; + assignerStatus = AssignerStatus.INITIAL_ASSIGNING_FINISHED; } else { - assignerState = SnapshotAssignerStatus.INIT; + assignerStatus = AssignerStatus.INITIAL_ASSIGNING; } return new SnapshotPendingSplitsState( @@ -192,7 +192,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer

(), false, false); @@ -213,16 +213,16 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer

assignedSnapshotSplits = readAssignedSnapshotSplits(splitVersion, in); Map finishedOffsets = readFinishedOffsets(splitVersion, in); - SnapshotAssignerStatus assignerState; + AssignerStatus assignerStatus; if (splitVersion < 4) { boolean isAssignerFinished = in.readBoolean(); if (isAssignerFinished) { - assignerState = SnapshotAssignerStatus.INIT_FINISH; + assignerStatus = AssignerStatus.INITIAL_ASSIGNING_FINISHED; } else { - assignerState = SnapshotAssignerStatus.INIT; + assignerStatus = AssignerStatus.INITIAL_ASSIGNING; } } else { - assignerState = SnapshotAssignerStatus.fromInteger(in.readInt()); + assignerStatus = AssignerStatus.fromStatusCode(in.readInt()); } List remainingTableIds = readTableIds(in); boolean isTableIdCaseSensitive = in.readBoolean(); @@ -231,7 +231,7 @@ public class PendingSplitsStateSerializer implements SimpleVersionedSerializer

splitFinishedOffsets; - /** - * Whether the snapshot split assigner is finished, which indicates there is no more splits and - * all records of splits have been completely processed in the pipeline. - */ - private final SnapshotAssignerStatus assignerState; + /** The {@link AssignerStatus} that indicates the snapshot assigner status. */ + private final AssignerStatus assignerStatus; - /** Whether the table identifier is case sensitive. */ + /** Whether the table identifier is case-sensitive. */ private final boolean isTableIdCaseSensitive; /** Whether the remaining tables are keep when snapshot state. */ @@ -73,7 +70,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { List remainingSplits, Map assignedSplits, Map splitFinishedOffsets, - SnapshotAssignerStatus assignerState, + AssignerStatus assignerStatus, List remainingTables, boolean isTableIdCaseSensitive, boolean isRemainingTablesCheckpointed) { @@ -81,7 +78,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { this.remainingSplits = remainingSplits; this.assignedSplits = assignedSplits; this.splitFinishedOffsets = splitFinishedOffsets; - this.assignerState = assignerState; + this.assignerStatus = assignerStatus; this.remainingTables = remainingTables; this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; @@ -103,8 +100,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { return splitFinishedOffsets; } - public SnapshotAssignerStatus getAssignerState() { - return assignerState; + public AssignerStatus getSnapshotAssignerStatus() { + return assignerStatus; } public List getRemainingTables() { @@ -128,7 +125,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { return false; } SnapshotPendingSplitsState that = (SnapshotPendingSplitsState) o; - return assignerState == that.assignerState + return assignerStatus == that.assignerStatus && isTableIdCaseSensitive == that.isTableIdCaseSensitive && isRemainingTablesCheckpointed == that.isRemainingTablesCheckpointed && Objects.equals(remainingTables, that.remainingTables) @@ -146,7 +143,7 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { remainingSplits, assignedSplits, splitFinishedOffsets, - assignerState, + assignerStatus, isTableIdCaseSensitive, isRemainingTablesCheckpointed); } @@ -164,8 +161,8 @@ public class SnapshotPendingSplitsState extends PendingSplitsState { + assignedSplits + ", splitFinishedOffsets=" + splitFinishedOffsets - + ", assignerState=" - + assignerState + + ", assignerStatus=" + + assignerStatus + ", isTableIdCaseSensitive=" + isTableIdCaseSensitive + ", isRemainingTablesCheckpointed=" diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index eb0cc1026..15db176ec 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -55,7 +55,7 @@ public class MySqlSourceConfig implements Serializable { private final double distributionFactorUpper; private final double distributionFactorLower; private final boolean includeSchemaChanges; - private final boolean captureNewTables; + private final boolean scanNewlyAddedTableEnabled; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -83,7 +83,7 @@ public class MySqlSourceConfig implements Serializable { double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, - boolean captureNewTable, + boolean scanNewlyAddedTableEnabled, Properties dbzProperties) { this.hostname = checkNotNull(hostname); this.port = port; @@ -103,7 +103,7 @@ public class MySqlSourceConfig implements Serializable { this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.includeSchemaChanges = includeSchemaChanges; - this.captureNewTables = captureNewTable; + this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; this.dbzProperties = checkNotNull(dbzProperties); this.dbzConfiguration = Configuration.from(dbzProperties); this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration); @@ -182,8 +182,8 @@ public class MySqlSourceConfig implements Serializable { return includeSchemaChanges; } - public boolean isCaptureNewTables() { - return captureNewTables; + public boolean isScanNewlyAddedTableEnabled() { + return scanNewlyAddedTableEnabled; } public Properties getDbzProperties() { diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index f8109ed3b..c76883c5a 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -68,7 +68,7 @@ public class MySqlSourceConfigFactory implements Serializable { private double distributionFactorLower = SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); private boolean includeSchemaChanges = false; - private boolean captureNewTables = false; + private boolean scanNewlyAddedTableEnabled = false; private Properties dbzProperties; public MySqlSourceConfigFactory hostname(String hostname) { @@ -208,9 +208,9 @@ public class MySqlSourceConfigFactory implements Serializable { return this; } - /** Whether the {@link MySqlSource} should capture the newly added tables or not. */ - public MySqlSourceConfigFactory captureNewTables(boolean captureNewTables) { - this.captureNewTables = captureNewTables; + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ + public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { + this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; return this; } @@ -310,7 +310,7 @@ public class MySqlSourceConfigFactory implements Serializable { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, - captureNewTables, + scanNewlyAddedTableEnabled, props); } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index a4edf0e05..5b4c45f46 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -200,9 +200,10 @@ public class MySqlSourceOptions { + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); @Experimental - public static final ConfigOption CAPTURE_NEW_TABLES = - ConfigOptions.key("capture-new-tables") + public static final ConfigOption SCAN_NEWLY_ADDED_TABLE_ENABLED = + ConfigOptions.key("scan.newly-added-table.enabled") .booleanType() .defaultValue(false) - .withDescription("Whether capture the snapshot of newly add tables."); + .withDescription( + "Whether capture the scan the newly added tables or not, by default is false."); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java index a830f3770..874d23c36 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -33,13 +32,13 @@ import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsSt import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent; -import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitReaderSuspendedReportEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent; import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent; +import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeEvent; +import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeRequestEvent; +import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderAckEvent; import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent; -import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeRequestEvent; -import com.ververica.cdc.connectors.mysql.source.events.TotalFinishedSplitSizeResponseEvent; import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; @@ -57,6 +56,9 @@ import java.util.Optional; import java.util.TreeSet; import java.util.stream.Collectors; +import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isAssigningFinished; +import static com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus.isSuspended; + /** * A MySQL CDC source enumerator that enumerates receive the split request and assign the split to * source readers. @@ -73,6 +75,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator readersAwaitingSplit; private List> binlogSplitMeta; + private boolean binlogReaderIsSuspended = false; public MySqlSourceEnumerator( SplitEnumeratorContext context, @@ -87,6 +90,7 @@ public class MySqlSourceEnumerator implements SplitEnumerator finishedOffsets = reportEvent.getFinishedOffsets(); + splitAssigner.onFinishedSplits(finishedOffsets); + + wakeupBinlogReaderIfNeed(); + // send acknowledge event FinishedSnapshotSplitsAckEvent ackEvent = new FinishedSnapshotSplitsAckEvent(new ArrayList<>(finishedOffsets.keySet())); @@ -136,15 +144,13 @@ public class MySqlSourceEnumerator implements SplitEnumerator // restore for finishedUnackedSplits List unfinishedSplits = new ArrayList<>(); for (MySqlSplit split : splits) { - LOG.info("Received Split: " + split); + LOG.info("Add Split: " + split); if (split.isSnapshotSplit()) { MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit(); if (snapshotSplit.isSnapshotReadFinished()) { @@ -176,9 +179,9 @@ public class MySqlSourceReader } } else { MySqlBinlogSplit binlogSplit = split.asBinlogSplit(); - // the binlog split is uncompleted + // the binlog split is suspended if (binlogSplit.isSuspended()) { - suspendedSplit = binlogSplit; + suspendedBinlogSplit = binlogSplit; } else if (!binlogSplit.isCompletedSplit()) { uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit()); requestBinlogSplitMetaIfNeeded(split.asBinlogSplit()); @@ -241,34 +244,26 @@ public class MySqlSourceReader ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); } else if (sourceEvent instanceof SuspendBinlogReaderEvent) { - if (!mySqlSourceReaderContext.isShouldBinlogSplitReaderStopped()) { - mySqlSourceReaderContext.setShouldBinlogSplitReaderStopped(true); - } + mySqlSourceReaderContext.setStopBinlogSplitReader(); } else if (sourceEvent instanceof WakeupReaderEvent) { WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent) sourceEvent; - if (wakeupReaderEvent.getType() == WakeupReaderEvent.WakeUpType.SNAPSHOT) { + if (wakeupReaderEvent.getTarget() == SNAPSHOT_READER) { context.sendSplitRequest(); } else { - if (suspendedSplit == null) { - return; + if (suspendedBinlogSplit != null) { + context.sendSourceEventToCoordinator( + new LatestFinishedSplitsSizeRequestEvent()); } - context.sendSourceEventToCoordinator(new TotalFinishedSplitSizeRequestEvent()); } - } else if (sourceEvent instanceof TotalFinishedSplitSizeResponseEvent) { - if (suspendedSplit == null) { - return; + } else if (sourceEvent instanceof LatestFinishedSplitsSizeEvent) { + if (suspendedBinlogSplit != null) { + final int finishedSplitsSize = + ((LatestFinishedSplitsSizeEvent) sourceEvent).getLatestFinishedSplitsSize(); + final MySqlBinlogSplit binlogSplit = + toNormalBinlogSplit(suspendedBinlogSplit, finishedSplitsSize); + suspendedBinlogSplit = null; + this.addSplits(Collections.singletonList(binlogSplit)); } - Preconditions.checkState(suspendedSplit.isSuspended()); - MySqlBinlogSplit split = - MySqlBinlogSplit.updateTotalFinishedSplitSize( - suspendedSplit, - ((TotalFinishedSplitSizeResponseEvent) sourceEvent) - .getTotalFinishedSplitSize()); - split = MySqlBinlogSplit.updateIsSuspended(split, false); - - // Can flink make sure the following two sentense happend in the same checkpoint? - suspendedSplit = null; - this.addSplits(Collections.singletonList(split)); } else { super.handleSourceEvents(sourceEvent); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderContext.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderContext.java index 6100ebfe4..b5c6f3891 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderContext.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReaderContext.java @@ -21,31 +21,32 @@ package com.ververica.cdc.connectors.mysql.source.reader; import org.apache.flink.api.connector.source.SourceReaderContext; /** - * A wrapper class that wraps SourceReaderContext and some data shared by {@link MySqlSourceReader} - * and {@link MySqlSplitReader}. + * A wrapper class that wraps {@link SourceReaderContext} for sharing message between {@link + * MySqlSourceReader} and {@link MySqlSplitReader}. */ public class MySqlSourceReaderContext { - SourceReaderContext sourceReaderContext; - boolean shouldBinlogSplitReaderStopped; + + private final SourceReaderContext sourceReaderContext; + private volatile boolean stopBinlogSplitReader; public MySqlSourceReaderContext(final SourceReaderContext sourceReaderContext) { this.sourceReaderContext = sourceReaderContext; - this.shouldBinlogSplitReaderStopped = false; + this.stopBinlogSplitReader = false; } public SourceReaderContext getSourceReaderContext() { return sourceReaderContext; } - public void setSourceReaderContext(final SourceReaderContext sourceReaderContext) { - this.sourceReaderContext = sourceReaderContext; + public boolean needStopBinlogSplitReader() { + return stopBinlogSplitReader; } - public boolean isShouldBinlogSplitReaderStopped() { - return shouldBinlogSplitReaderStopped; + public void setStopBinlogSplitReader() { + this.stopBinlogSplitReader = true; } - public void setShouldBinlogSplitReaderStopped(final boolean shouldBinlogSplitReaderStopped) { - this.shouldBinlogSplitReaderStopped = shouldBinlogSplitReaderStopped; + public void resetStopBinlogSplitReader() { + this.stopBinlogSplitReader = false; } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.java index e183d8ec2..76fa25f5c 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSplitReader.java @@ -69,15 +69,13 @@ public class MySqlSplitReader implements SplitReader { @Override public RecordsWithSplitIds fetch() throws IOException { + checkSplitOrStartNext(); + checkNeedStopBinlogReader(); - Iterator dataIt = null; + Iterator dataIt; try { dataIt = currentReader.pollSplitRecords(); - if (context.isShouldBinlogSplitReaderStopped() - && currentReader instanceof BinlogSplitReader) { - ((BinlogSplitReader) currentReader).finishBinlogSplit(); - } } catch (InterruptedException e) { LOG.warn("fetch data failed.", e); throw new IOException(e); @@ -87,6 +85,14 @@ public class MySqlSplitReader implements SplitReader { : MySqlRecords.forRecords(currentSplitId, dataIt); } + private void checkNeedStopBinlogReader() { + if (currentReader instanceof BinlogSplitReader + && context.needStopBinlogSplitReader() + && !currentReader.isFinished()) { + ((BinlogSplitReader) currentReader).stopBinlogReadTask(); + } + } + @Override public void handleSplitsChanges(SplitsChange splitsChanges) { if (!(splitsChanges instanceof SplitsAddition)) { @@ -125,7 +131,8 @@ public class MySqlSplitReader implements SplitReader { if (nextSplit.isSnapshotSplit()) { if (currentReader instanceof BinlogSplitReader) { - LOG.info("This is the point from binlog split back to snapshot split"); + LOG.info( + "This is the point from binlog split reading change to snapshot split reading"); currentReader.close(); currentReader = null; } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java index 619a1d32b..842343fa3 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java @@ -166,28 +166,6 @@ public class MySqlBinlogSplit extends MySqlSplit { binlogSplit.isSuspended()); } - public static MySqlBinlogSplit emptyFinishedSplitInfos(MySqlBinlogSplit binlogSplit) { - return new MySqlBinlogSplit( - binlogSplit.splitId, - binlogSplit.getStartingOffset(), - binlogSplit.getEndingOffset(), - new ArrayList<>(), - binlogSplit.getTableSchemas(), - binlogSplit.getTotalFinishedSplitSize(), - binlogSplit.isSuspended()); - } - - public static MySqlBinlogSplit emptyTableSchemas(MySqlBinlogSplit binlogSplit) { - return new MySqlBinlogSplit( - binlogSplit.splitId, - binlogSplit.getStartingOffset(), - binlogSplit.getEndingOffset(), - binlogSplit.getFinishedSnapshotSplitInfos(), - new HashMap<>(), - binlogSplit.getTotalFinishedSplitSize(), - binlogSplit.isSuspended()); - } - public static MySqlBinlogSplit fillTableSchemas( MySqlBinlogSplit binlogSplit, Map tableSchemas) { tableSchemas.putAll(binlogSplit.getTableSchemas()); @@ -201,27 +179,26 @@ public class MySqlBinlogSplit extends MySqlSplit { binlogSplit.isSuspended()); } - public static MySqlBinlogSplit updateTotalFinishedSplitSize( - MySqlBinlogSplit binlogSplit, int totalFinishedSplitSize) { + public static MySqlBinlogSplit toNormalBinlogSplit( + MySqlBinlogSplit suspendedBinlogSplit, int totalFinishedSplitSize) { return new MySqlBinlogSplit( - binlogSplit.splitId, - binlogSplit.getStartingOffset(), - binlogSplit.getEndingOffset(), - binlogSplit.getFinishedSnapshotSplitInfos(), - binlogSplit.getTableSchemas(), + suspendedBinlogSplit.splitId, + suspendedBinlogSplit.getStartingOffset(), + suspendedBinlogSplit.getEndingOffset(), + suspendedBinlogSplit.getFinishedSnapshotSplitInfos(), + suspendedBinlogSplit.getTableSchemas(), totalFinishedSplitSize, - binlogSplit.isSuspended()); + false); } - public static MySqlBinlogSplit updateIsSuspended( - MySqlBinlogSplit binlogSplit, boolean isSuspended) { + public static MySqlBinlogSplit toSuspendedBinlogSplit(MySqlBinlogSplit normalBinlogSplit) { return new MySqlBinlogSplit( - binlogSplit.splitId, - binlogSplit.getStartingOffset(), - binlogSplit.getEndingOffset(), - binlogSplit.getFinishedSnapshotSplitInfos(), - binlogSplit.getTableSchemas(), - binlogSplit.getTotalFinishedSplitSize(), - isSuspended); + normalBinlogSplit.splitId, + normalBinlogSplit.getStartingOffset(), + normalBinlogSplit.getEndingOffset(), + new ArrayList<>(), + new HashMap<>(), + normalBinlogSplit.getTotalFinishedSplitSize(), + true); } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index 610c7bc6f..fbe54b2f6 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -78,7 +78,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final double distributionFactorUpper; private final double distributionFactorLower; private final StartupOptions startupOptions; - private final boolean captureNewTables; + private final boolean scanNewlyAddedTableEnabled; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -156,7 +156,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat double distributionFactorUpper, double distributionFactorLower, StartupOptions startupOptions, - boolean captureNewTable) { + boolean scanNewlyAddedTableEnabled) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -177,7 +177,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.startupOptions = startupOptions; - this.captureNewTables = captureNewTable; + this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; // Mutable attributes this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.metadataKeys = Collections.emptyList(); @@ -232,7 +232,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat .debeziumProperties(dbzProperties) .startupOptions(startupOptions) .deserializer(deserializer) - .captureNewTables(captureNewTables) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .build(); return SourceProvider.of(parallelSource); } else { @@ -308,7 +308,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat connectionPoolSize, distributionFactorUpper, distributionFactorLower, - startupOptions); + startupOptions, + scanNewlyAddedTableEnabled); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -330,6 +331,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat && fetchSize == that.fetchSize && distributionFactorUpper == that.distributionFactorUpper && distributionFactorLower == that.distributionFactorLower + && scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled && Objects.equals(physicalSchema, that.physicalSchema) && Objects.equals(hostname, that.hostname) && Objects.equals(database, that.database) @@ -371,7 +373,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat distributionFactorLower, startupOptions, producedDataType, - metadataKeys); + metadataKeys, + scanNewlyAddedTableEnabled); } @Override diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 25c1f2921..8f2ecb1c7 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -38,7 +38,6 @@ import java.util.HashSet; import java.util.Set; import java.util.regex.Pattern; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CAPTURE_NEW_TABLES; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; @@ -49,6 +48,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOption import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_MODE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; @@ -98,7 +98,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); - boolean captureNewTables = config.get(CAPTURE_NEW_TABLES); + boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); if (enableParallelRead) { @@ -134,7 +134,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { distributionFactorUpper, distributionFactorLower, startupOptions, - captureNewTables); + scanNewlyAddedTableEnabled); } @Override @@ -172,7 +172,7 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory { options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(CONNECT_MAX_RETRIES); - options.add(CAPTURE_NEW_TABLES); + options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); return options; } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java index acbe3d7ed..037b5c30d 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java @@ -105,7 +105,7 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase { remainingSplits, assignedSplits, splitFinishedOffsets, - SnapshotAssignerStatus.INIT_FINISH, + AssignerStatus.INITIAL_ASSIGNING_FINISHED, new ArrayList<>(), false, true); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java index d82e6b480..3fe7d2609 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java @@ -21,7 +21,7 @@ package com.ververica.cdc.connectors.mysql.source.assigners.state; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.RowType; -import com.ververica.cdc.connectors.mysql.source.assigners.SnapshotAssignerStatus; +import com.ververica.cdc.connectors.mysql.source.assigners.AssignerStatus; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitSerializer; @@ -125,7 +125,7 @@ public class PendingSplitsStateSerializerTest { remainingSplits, assignedSnapshotSplits, finishedOffsets, - SnapshotAssignerStatus.INIT, + AssignerStatus.INITIAL_ASSIGNING, remainingTables, false, true);