[mysql] skip closing reader when the reader received the binlog split (#2261)

pull/2331/head
Hang Ruan 2 years ago
parent cc364a5efc
commit 8221f51ca8

@ -189,6 +189,10 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
snapshotSplitAssigner.close(); snapshotSplitAssigner.close();
} }
public boolean noMoreSnapshotSplits() {
return snapshotSplitAssigner.noMoreSnapshotSplits();
}
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
private MySqlBinlogSplit createBinlogSplit() { private MySqlBinlogSplit createBinlogSplit() {

@ -24,10 +24,12 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner; import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitAssignedEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
@ -39,6 +41,7 @@ import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumb
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberRequestEvent;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -72,6 +75,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
private final TreeSet<Integer> readersAwaitingSplit; private final TreeSet<Integer> readersAwaitingSplit;
private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta; private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta;
@Nullable private Integer binlogSplitTaskId;
public MySqlSourceEnumerator( public MySqlSourceEnumerator(
SplitEnumeratorContext<MySqlSplit> context, SplitEnumeratorContext<MySqlSplit> context,
MySqlSourceConfig sourceConfig, MySqlSourceConfig sourceConfig,
@ -107,6 +112,12 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
@Override @Override
public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) { public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) {
LOG.debug("The enumerator adds splits back: {}", splits); LOG.debug("The enumerator adds splits back: {}", splits);
Optional<MySqlSplit> binlogSplit =
splits.stream().filter(MySqlSplit::isBinlogSplit).findAny();
if (binlogSplit.isPresent()) {
LOG.info("The enumerator adds add binlog split back: {}", binlogSplit);
this.binlogSplitTaskId = null;
}
splitAssigner.addSplits(splits); splitAssigner.addSplits(splits);
} }
@ -153,6 +164,11 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
"The enumerator receives request from subtask {} for the latest finished splits number after added newly tables. ", "The enumerator receives request from subtask {} for the latest finished splits number after added newly tables. ",
subtaskId); subtaskId);
handleLatestFinishedSplitNumberRequest(subtaskId); handleLatestFinishedSplitNumberRequest(subtaskId);
} else if (sourceEvent instanceof BinlogSplitAssignedEvent) {
LOG.info(
"The enumerator receives notice from subtask {} for the binlog split assignment. ",
subtaskId);
binlogSplitTaskId = subtaskId;
} }
} }
@ -188,7 +204,10 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
continue; continue;
} }
if (splitAssigner.isStreamSplitAssigned() && sourceConfig.isCloseIdleReaders()) { if (splitAssigner.isStreamSplitAssigned()
&& sourceConfig.isCloseIdleReaders()
&& noMoreSnapshotSplits()
&& (binlogSplitTaskId != null && !binlogSplitTaskId.equals(nextAwaiting))) {
// close idle readers when snapshot phase finished. // close idle readers when snapshot phase finished.
context.signalNoMoreSplits(nextAwaiting); context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove(); awaitingReader.remove();
@ -200,6 +219,9 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
if (split.isPresent()) { if (split.isPresent()) {
final MySqlSplit mySqlSplit = split.get(); final MySqlSplit mySqlSplit = split.get();
context.assignSplit(mySqlSplit, nextAwaiting); context.assignSplit(mySqlSplit, nextAwaiting);
if (mySqlSplit instanceof MySqlBinlogSplit) {
this.binlogSplitTaskId = nextAwaiting;
}
awaitingReader.remove(); awaitingReader.remove();
LOG.info("The enumerator assigns split {} to subtask {}", mySqlSplit, nextAwaiting); LOG.info("The enumerator assigns split {} to subtask {}", mySqlSplit, nextAwaiting);
} else { } else {
@ -210,6 +232,16 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
} }
} }
private boolean noMoreSnapshotSplits() {
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
return ((MySqlHybridSplitAssigner) splitAssigner).noMoreSnapshotSplits();
} else if (splitAssigner instanceof MySqlBinlogSplitAssigner) {
return true;
}
throw new IllegalStateException(
"Unexpected subtype of MySqlSplitAssigner class when invoking noMoreSnapshotSplits.");
}
private int[] getRegisteredReader() { private int[] getRegisteredReader() {
return this.context.registeredReaders().keySet().stream() return this.context.registeredReaders().keySet().stream()
.mapToInt(Integer::intValue) .mapToInt(Integer::intValue)

@ -0,0 +1,34 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ververica.cdc.connectors.mysql.source.events;
import org.apache.flink.api.connector.source.SourceEvent;
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
/**
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
* notify the {@link MySqlBinlogSplit} assigned to itself.
*/
public class BinlogSplitAssignedEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
public BinlogSplitAssignedEvent() {}
}

@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitAssignedEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent; import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
@ -270,6 +271,9 @@ public class MySqlSourceReader<T>
binlogSplit, sourceConfig, checkNewlyAddedTableSchema); binlogSplit, sourceConfig, checkNewlyAddedTableSchema);
unfinishedSplits.add(mySqlBinlogSplit); unfinishedSplits.add(mySqlBinlogSplit);
} }
LOG.info(
"Source reader {} received the binlog split : {}.", subtaskId, binlogSplit);
context.sendSourceEventToCoordinator(new BinlogSplitAssignedEvent());
} }
} }
// notify split enumerator again about the finished unacked snapshot splits // notify split enumerator again about the finished unacked snapshot splits

@ -172,6 +172,22 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
"address_shanghai"); "address_shanghai");
} }
@Test
public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlogAndAutoCloseReader()
throws Exception {
Map<String, String> otherOptions = new HashMap<>();
otherOptions.put("scan.incremental.close-idle-reader.enabled", "true");
testNewlyAddedTableOneByOne(
DEFAULT_PARALLELISM,
otherOptions,
FailoverType.NONE,
FailoverPhase.NEVER,
true,
"address_hangzhou",
"address_beijing",
"address_shanghai");
}
@Test @Test
public void testNewlyAddedTableForExistsPipelineThrice() throws Exception { public void testNewlyAddedTableForExistsPipelineThrice() throws Exception {
testNewlyAddedTableOneByOne( testNewlyAddedTableOneByOne(
@ -588,7 +604,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
getStreamExecutionEnvironment(finishedSavePointPath, parallelism); getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement = getCreateTableStatement(captureAddressTables); String createTableStatement =
getCreateTableStatement(new HashMap<>(), captureAddressTables);
tEnv.executeSql(createTableStatement); tEnv.executeSql(createTableStatement);
tEnv.executeSql( tEnv.executeSql(
"CREATE TABLE sink (" "CREATE TABLE sink ("
@ -630,7 +647,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
getStreamExecutionEnvironment(finishedSavePointPath, parallelism); getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement = getCreateTableStatement(captureTablesThisRound); String createTableStatement =
getCreateTableStatement(new HashMap<>(), captureTablesThisRound);
tEnv.executeSql(createTableStatement); tEnv.executeSql(createTableStatement);
tEnv.executeSql( tEnv.executeSql(
"CREATE TABLE sink (" "CREATE TABLE sink ("
@ -703,6 +721,23 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
boolean makeBinlogBeforeCapture, boolean makeBinlogBeforeCapture,
String... captureAddressTables) String... captureAddressTables)
throws Exception { throws Exception {
testNewlyAddedTableOneByOne(
parallelism,
new HashMap<>(),
failoverType,
failoverPhase,
makeBinlogBeforeCapture,
captureAddressTables);
}
private void testNewlyAddedTableOneByOne(
int parallelism,
Map<String, String> sourceOptions,
FailoverType failoverType,
FailoverPhase failoverPhase,
boolean makeBinlogBeforeCapture,
String... captureAddressTables)
throws Exception {
// step 1: create mysql tables with initial data // step 1: create mysql tables with initial data
initialAddressTables(getConnection(), captureAddressTables); initialAddressTables(getConnection(), captureAddressTables);
@ -727,7 +762,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
getStreamExecutionEnvironment(finishedSavePointPath, parallelism); getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createTableStatement = getCreateTableStatement(captureTablesThisRound); String createTableStatement =
getCreateTableStatement(sourceOptions, captureTablesThisRound);
tEnv.executeSql(createTableStatement); tEnv.executeSql(createTableStatement);
tEnv.executeSql( tEnv.executeSql(
"CREATE TABLE sink (" "CREATE TABLE sink ("
@ -836,7 +872,8 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
} }
} }
private String getCreateTableStatement(String... captureTableNames) { private String getCreateTableStatement(
Map<String, String> otherOptions, String... captureTableNames) {
return format( return format(
"CREATE TABLE address (" "CREATE TABLE address ("
+ " table_name STRING METADATA VIRTUAL," + " table_name STRING METADATA VIRTUAL,"
@ -858,6 +895,7 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
+ " 'server-time-zone' = 'UTC'," + " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'," + " 'server-id' = '%s',"
+ " 'scan.newly-added-table.enabled' = 'true'" + " 'scan.newly-added-table.enabled' = 'true'"
+ " %s"
+ ")", + ")",
MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(), MYSQL_CONTAINER.getDatabasePort(),
@ -865,7 +903,17 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
customDatabase.getPassword(), customDatabase.getPassword(),
customDatabase.getDatabaseName(), customDatabase.getDatabaseName(),
getTableNameRegex(captureTableNames), getTableNameRegex(captureTableNames),
getServerId()); getServerId(),
otherOptions.isEmpty()
? ""
: ","
+ otherOptions.entrySet().stream()
.map(
e ->
String.format(
"'%s'='%s'",
e.getKey(), e.getValue()))
.collect(Collectors.joining(",")));
} }
private StreamExecutionEnvironment getStreamExecutionEnvironment( private StreamExecutionEnvironment getStreamExecutionEnvironment(

Loading…
Cancel
Save