[FLINK-36811][mysql] MySQL cdc setIsProcessingBacklog in snapshot phase and exit when snapshot phase finished (#3793)

pull/3784/head^2
Shawn Huang 3 weeks ago committed by GitHub
parent 1fb68a8d62
commit 085684b773
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -208,7 +208,8 @@ public class MySqlSource<T>
sourceConfig,
enumContext.currentParallelism(),
new ArrayList<>(),
isTableIdCaseSensitive);
isTableIdCaseSensitive,
enumContext);
} catch (Exception e) {
throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e);
@ -233,7 +234,8 @@ public class MySqlSource<T>
new MySqlHybridSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
(HybridPendingSplitsState) checkpoint);
(HybridPendingSplitsState) checkpoint,
enumContext);
} else if (checkpoint instanceof BinlogPendingSplitsState) {
splitAssigner =
new MySqlBinlogSplitAssigner(

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mysql.source.assigners;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
@ -59,11 +60,16 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive) {
boolean isTableIdCaseSensitive,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
sourceConfig,
currentParallelism,
remainingTables,
isTableIdCaseSensitive,
enumeratorContext),
false,
sourceConfig.getSplitMetaGroupSize());
}
@ -71,11 +77,15 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
public MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
HybridPendingSplitsState checkpoint) {
HybridPendingSplitsState checkpoint,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
sourceConfig,
currentParallelism,
checkpoint.getSnapshotPendingSplits(),
enumeratorContext),
checkpoint.isBinlogSplitAssigned(),
sourceConfig.getSplitMetaGroupSize());
}

@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mysql.source.assigners;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
@ -79,6 +80,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private final int currentParallelism;
private final List<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
private final SplitEnumeratorContext<MySqlSplit> enumeratorContext;
private final MySqlPartition partition;
private final Object lock = new Object();
@ -95,7 +97,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive) {
boolean isTableIdCaseSensitive,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
currentParallelism,
@ -108,13 +111,15 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
remainingTables,
isTableIdCaseSensitive,
true,
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
ChunkSplitterState.NO_SPLITTING_TABLE_STATE,
enumeratorContext);
}
public MySqlSnapshotSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
SnapshotPendingSplitsState checkpoint) {
SnapshotPendingSplitsState checkpoint,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
currentParallelism,
@ -127,7 +132,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
checkpoint.getRemainingTables(),
checkpoint.isTableIdCaseSensitive(),
checkpoint.isRemainingTablesCheckpointed(),
checkpoint.getChunkSplitterState());
checkpoint.getChunkSplitterState(),
enumeratorContext);
}
private MySqlSnapshotSplitAssigner(
@ -142,7 +148,8 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed,
ChunkSplitterState chunkSplitterState) {
ChunkSplitterState chunkSplitterState,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this.sourceConfig = sourceConfig;
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
@ -168,10 +175,12 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
createChunkSplitter(sourceConfig, isTableIdCaseSensitive, chunkSplitterState);
this.partition =
new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
this.enumeratorContext = enumeratorContext;
}
@Override
public void open() {
shouldEnterProcessingBacklog();
chunkSplitter.open();
discoveryCaptureTables();
captureNewlyAddedTables();
@ -397,17 +406,20 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
@Override
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
this.splitFinishedOffsets.putAll(splitFinishedOffsets);
if (allSnapshotSplitsFinished()
&& AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerStatus = assignerStatus.onFinish();
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
} else {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
if (allSnapshotSplitsFinished()) {
enumeratorContext.setIsProcessingBacklog(false);
if (AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not
// need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerStatus = assignerStatus.onFinish();
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
} else {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
}
}
}
}
@ -607,4 +619,10 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
}
return new MySqlChunkSplitter(mySqlSchema, sourceConfig);
}
private void shouldEnterProcessingBacklog() {
if (assignerStatus == AssignerStatus.INITIAL_ASSIGNING) {
enumeratorContext.setIsProcessingBacklog(true);
}
}
}

@ -86,6 +86,7 @@ import static org.apache.flink.cdc.connectors.mysql.MySqlTestUtils.assertContain
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -1182,7 +1183,11 @@ public class BinlogSplitReaderTest extends MySqlSourceTestBase {
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
sourceConfig,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
assigner.open();
List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
while (true) {

@ -50,6 +50,7 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -629,7 +630,11 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
MySqlSourceConfig sourceConfig, List<TableId> remainingTables) {
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
sourceConfig,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
assigner.open();
List<MySqlSplit> mySqlSplitList = new ArrayList<>();
while (true) {

@ -29,6 +29,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.table.api.DataTypes;
@ -50,6 +51,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -109,8 +112,11 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
HybridPendingSplitsState checkpoint =
new HybridPendingSplitsState(snapshotPendingSplitsState, false);
MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
getMySqlSplitEnumeratorContext();
final MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
new MySqlHybridSplitAssigner(
configuration, DEFAULT_PARALLELISM, checkpoint, enumeratorContext);
// step 2. Get the MySqlBinlogSplit after all snapshot splits finished
Optional<MySqlSplit> binlogSplit = assigner.getNext();
@ -152,7 +158,12 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
// Create and initialize assigner
MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(sourceConfig, 1, new ArrayList<>(), false);
new MySqlHybridSplitAssigner(
sourceConfig,
1,
new ArrayList<>(),
false,
getMySqlSplitEnumeratorContext());
assigner.open();
// Get all snapshot splits
@ -201,6 +212,57 @@ public class MySqlHybridSplitAssignerTest extends MySqlSourceTestBase {
.createConfig(0);
}
@Test
public void testSetProcessingBacklog() {
final String captureTable = "customers";
MySqlSourceConfig configuration = getConfig(new String[] {captureTable});
MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
getMySqlSplitEnumeratorContext();
final MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(
configuration,
DEFAULT_PARALLELISM,
new ArrayList<>(),
false,
enumeratorContext);
assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
assigner.open();
assertThat(enumeratorContext.isProcessingBacklog()).isTrue();
// Get all snapshot splits
List<MySqlSnapshotSplit> snapshotSplits = drainSnapshotSplits(assigner);
Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
int i = 0;
for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) {
BinlogOffset binlogOffset =
BinlogOffset.builder().setBinlogFilePosition("foo", i++).build();
finishedOffsets.put(snapshotSplit.splitId(), binlogOffset);
}
assigner.onFinishedSplits(finishedOffsets);
assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
assigner.close();
}
private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.toArray(String[]::new);
return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList(customerDatabase.getDatabaseName())
.tableList(captureTableIds)
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString());
}
private MySqlSourceConfig getConfig(String[] captureTables) {
return getConfigFactory(captureTables).createConfig(0);
}
private List<MySqlSnapshotSplit> drainSnapshotSplits(MySqlHybridSplitAssigner assigner) {
List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
while (true) {

@ -50,6 +50,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -475,7 +476,11 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, new ArrayList<>(), false);
configuration,
DEFAULT_PARALLELISM,
new ArrayList<>(),
false,
getMySqlSplitEnumeratorContext());
assertTrue(assigner.needToDiscoveryTables());
assigner.open();
@ -549,7 +554,11 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
.collect(Collectors.toList());
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, remainingTables, false);
configuration,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
return getSplitsFromAssigner(assigner);
}
@ -642,7 +651,11 @@ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
true,
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
new MySqlSnapshotSplitAssigner(
configuration,
DEFAULT_PARALLELISM,
checkpoint,
getMySqlSplitEnumeratorContext());
return getSplitsFromAssigner(assigner);
}

@ -99,6 +99,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isH
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
@ -404,7 +405,8 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
sourceConfig,
DEFAULT_PARALLELISM,
tableNames.stream().map(TableId::parse).collect(Collectors.toList()),
false);
false,
getMySqlSplitEnumeratorContext());
assigner.open();
List<MySqlSplit> splits = new ArrayList<>();
MySqlSnapshotSplit split = (MySqlSnapshotSplit) assigner.getNext().get();
@ -459,7 +461,8 @@ public class MySqlSourceReaderTest extends MySqlSourceTestBase {
sourceConfig,
DEFAULT_PARALLELISM,
Collections.singletonList(TableId.parse(tableName)),
false);
false,
getMySqlSplitEnumeratorContext());
assigner.open();
MySqlSnapshotSplit snapshotSplit = (MySqlSnapshotSplit) assigner.getNext().get();
// should contain only one split

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.mysql.source.utils;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
/** A mock enumerator context to record isProcessingBacklog. */
public class MockMySqlSplitEnumeratorEnumeratorContext
extends MockSplitEnumeratorContext<MySqlSplit> {
private boolean isProcessingBacklog = false;
public MockMySqlSplitEnumeratorEnumeratorContext(int parallelism) {
super(parallelism);
}
@Override
public void setIsProcessingBacklog(boolean isProcessingBacklog) {
this.isProcessingBacklog = isProcessingBacklog;
}
public boolean isProcessingBacklog() {
return isProcessingBacklog;
}
}

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.mysql.testutils;
import org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext;
/** The test utils for metrics. */
public class MetricsUtils {
public static MockMySqlSplitEnumeratorEnumeratorContext getMySqlSplitEnumeratorContext() {
return new MockMySqlSplitEnumeratorEnumeratorContext(1);
}
}
Loading…
Cancel
Save