[mysql] Remove the finished snapshot splits for the binlog split when restoring from the checkpoint (#2292) (#2318)

pull/2328/head
Hang Ruan 2 years ago committed by GitHub
parent 2595118ccb
commit 7dbac7277e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -66,6 +66,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner.BINLOG_SPLIT_ID;
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.filterOutdatedSplitInfos;
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
@ -203,6 +204,17 @@ public class MySqlSourceReader<T>
@Override
public void addSplits(List<MySqlSplit> splits) {
addSplits(splits, true);
}
/**
* Adds a list of splits for this reader to read.
*
* @param splits the splits to add.
* @param checkTableChangeForBinlogSplit to check the captured table list change or not, it
* should be true for reader which is during restoration from a checkpoint or savepoint.
*/
private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlogSplit) {
// restore for finishedUnackedSplits
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
@ -224,6 +236,20 @@ public class MySqlSourceReader<T>
}
} else {
MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
// When restore from a checkpoint, the finished split infos may contain some splits
// for the deleted tables.
// We need to remove these splits for the deleted tables at the finished split
// infos.
if (checkTableChangeForBinlogSplit) {
binlogSplit =
filterOutdatedSplitInfos(
binlogSplit,
sourceConfig
.getMySqlConnectorConfig()
.getTableFilters()
.dataCollectionFilter());
}
// Try to discovery table schema once for newly added tables when source reader
// start or restore
boolean checkNewlyAddedTableSchema =
@ -235,15 +261,13 @@ public class MySqlSourceReader<T>
if (binlogSplit.isSuspended()) {
suspendedBinlogSplit = binlogSplit;
} else if (!binlogSplit.isCompletedSplit()) {
uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit());
requestBinlogSplitMetaIfNeeded(split.asBinlogSplit());
uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
requestBinlogSplitMetaIfNeeded(binlogSplit);
} else {
uncompletedBinlogSplits.remove(split.splitId());
uncompletedBinlogSplits.remove(binlogSplit.splitId());
MySqlBinlogSplit mySqlBinlogSplit =
discoverTableSchemasForBinlogSplit(
split.asBinlogSplit(),
sourceConfig,
checkNewlyAddedTableSchema);
binlogSplit, sourceConfig, checkNewlyAddedTableSchema);
unfinishedSplits.add(mySqlBinlogSplit);
}
}
@ -302,7 +326,7 @@ public class MySqlSourceReader<T>
final MySqlBinlogSplit binlogSplit =
toNormalBinlogSplit(suspendedBinlogSplit, finishedSplitsSize);
suspendedBinlogSplit = null;
this.addSplits(Collections.singletonList(binlogSplit));
this.addSplits(Collections.singletonList(binlogSplit), false);
context.sendSourceEventToCoordinator(new BinlogSplitUpdateAckEvent());
LOG.info(

@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.split;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges.TableChange;
import javax.annotation.Nullable;
@ -26,6 +27,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/** The split to describe the binlog of MySql table(s). */
public class MySqlBinlogSplit extends MySqlSplit {
@ -170,6 +172,31 @@ public class MySqlBinlogSplit extends MySqlSplit {
binlogSplit.isSuspended());
}
/**
* Filter out the outdated finished splits in {@link MySqlBinlogSplit}.
*
* <p>When restore from a checkpoint, the finished split infos may contain some splits from the
* deleted tables. We need to remove these splits from the total finished split infos and update
* the size.
*/
public static MySqlBinlogSplit filterOutdatedSplitInfos(
MySqlBinlogSplit binlogSplit, Tables.TableFilter currentTableFilter) {
List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos =
binlogSplit.getFinishedSnapshotSplitInfos().stream()
.filter(i -> currentTableFilter.isIncluded(i.getTableId()))
.collect(Collectors.toList());
return new MySqlBinlogSplit(
binlogSplit.splitId,
binlogSplit.getStartingOffset(),
binlogSplit.getEndingOffset(),
allFinishedSnapshotSplitInfos,
binlogSplit.getTableSchemas(),
binlogSplit.getTotalFinishedSplitSize()
- (binlogSplit.getFinishedSnapshotSplitInfos().size()
- allFinishedSnapshotSplitInfos.size()),
binlogSplit.isSuspended());
}
public static MySqlBinlogSplit fillTableSchemas(
MySqlBinlogSplit binlogSplit, Map<TableId, TableChange> tableSchemas) {
tableSchemas.putAll(binlogSplit.getTableSchemas());

@ -16,19 +16,36 @@
package com.ververica.cdc.connectors.mysql.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.ExceptionUtils;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import com.ververica.cdc.debezium.table.MetadataConverter;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import org.apache.commons.lang3.StringUtils;
@ -45,16 +62,20 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
@ -310,6 +331,225 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase {
"address_shanghai");
}
@Test
public void testRemoveAndAddNewTable() throws Exception {
// round 1 : table0 + table1 (customers_even_dist + customers)
// round 2 : table0 + table2 (customers_even_dist + customers_1)
String tableId0 = customDatabase.getDatabaseName() + ".customers_even_dist";
String tableId1 = "customers";
String tableId2 = "customers_\\d+";
final TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
String finishedSavePointPath = null;
CollectResultIterator<RowData> iterator = null;
for (int i = 0; i < 2; i++) {
String changedTable = i == 0 ? tableId1 : "customers_1";
StreamExecutionEnvironment env =
getStreamExecutionEnvironment(finishedSavePointPath, 4);
RowDataDebeziumDeserializeSchema deserializer =
RowDataDebeziumDeserializeSchema.newBuilder()
.setMetadataConverters(
new MetadataConverter[] {
MySqlReadableMetadata.TABLE_NAME.getConverter()
})
.setPhysicalRowType(
(RowType)
DataTypes.ROW(
DataTypes.FIELD(
"id", DataTypes.BIGINT()),
DataTypes.FIELD(
"name", DataTypes.STRING()),
DataTypes.FIELD(
"address", DataTypes.STRING()),
DataTypes.FIELD(
"phone_number",
DataTypes.STRING()))
.getLogicalType())
.setResultTypeInfo(
InternalTypeInfo.of(
TypeConversions.fromDataToLogicalType(
DataTypes.ROW(
DataTypes.FIELD(
"id", DataTypes.BIGINT()),
DataTypes.FIELD(
"name", DataTypes.STRING()),
DataTypes.FIELD(
"address", DataTypes.STRING()),
DataTypes.FIELD(
"phone_number",
DataTypes.STRING()),
DataTypes.FIELD(
"_table_name",
DataTypes.STRING()
.notNull())))))
.build();
// Build source
MySqlSource<RowData> mySqlSource =
MySqlSource.<RowData>builder()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.databaseList(customDatabase.getDatabaseName())
.serverTimeZone("UTC")
.tableList(
tableId0,
customDatabase.getDatabaseName()
+ "."
+ (i == 0 ? tableId1 : tableId2))
.username(customDatabase.getUsername())
.password(customDatabase.getPassword())
.serverId("5401-5404")
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(true)
.build();
// Build and execute the job
DataStreamSource<RowData> source =
env.fromSource(
mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source" + i);
if (iterator == null) {
iterator = addCollectSink(source);
} else {
addCollectSink(source);
}
JobClient jobClient = env.executeAsync("Collect " + i);
iterator.setJobClient(jobClient);
List<String> expectedCustomersEvenDistResult =
Arrays.asList(
"+I[103, user_3, Shanghai, 123567891234, customers_even_dist]",
"+I[104, user_4, Shanghai, 123567891234, customers_even_dist]",
"+I[101, user_1, Shanghai, 123567891234, customers_even_dist]",
"+I[102, user_2, Shanghai, 123567891234, customers_even_dist]",
"+I[107, user_7, Shanghai, 123567891234, customers_even_dist]",
"+I[108, user_8, Shanghai, 123567891234, customers_even_dist]",
"+I[105, user_5, Shanghai, 123567891234, customers_even_dist]",
"+I[106, user_6, Shanghai, 123567891234, customers_even_dist]",
"+I[109, user_9, Shanghai, 123567891234, customers_even_dist]",
"+I[110, user_10, Shanghai, 123567891234, customers_even_dist]");
List<String> expectedCustomersResult =
Arrays.asList(
format("+I[1011, user_12, Shanghai, 123567891234, %s]", changedTable),
format("+I[1012, user_13, Shanghai, 123567891234, %s]", changedTable),
format("+I[1009, user_10, Shanghai, 123567891234, %s]", changedTable),
format("+I[1010, user_11, Shanghai, 123567891234, %s]", changedTable),
format("+I[1015, user_16, Shanghai, 123567891234, %s]", changedTable),
format("+I[1016, user_17, Shanghai, 123567891234, %s]", changedTable),
format("+I[1013, user_14, Shanghai, 123567891234, %s]", changedTable),
format("+I[118, user_7, Shanghai, 123567891234, %s]", changedTable),
format("+I[1014, user_15, Shanghai, 123567891234, %s]", changedTable),
format("+I[111, user_6, Shanghai, 123567891234, %s]", changedTable),
format("+I[2000, user_21, Shanghai, 123567891234, %s]", changedTable),
format("+I[109, user_4, Shanghai, 123567891234, %s]", changedTable),
format("+I[110, user_5, Shanghai, 123567891234, %s]", changedTable),
format("+I[103, user_3, Shanghai, 123567891234, %s]", changedTable),
format("+I[101, user_1, Shanghai, 123567891234, %s]", changedTable),
format("+I[102, user_2, Shanghai, 123567891234, %s]", changedTable),
format("+I[123, user_9, Shanghai, 123567891234, %s]", changedTable),
format("+I[1019, user_20, Shanghai, 123567891234, %s]", changedTable),
format("+I[121, user_8, Shanghai, 123567891234, %s]", changedTable),
format("+I[1017, user_18, Shanghai, 123567891234, %s]", changedTable),
format("+I[1018, user_19, Shanghai, 123567891234, %s]", changedTable));
List<String> expectedBinlogResult =
Arrays.asList(
format("-U[103, user_3, Shanghai, 123567891234, %s]", changedTable),
format("+U[103, user_3, Update1, 123567891234, %s]", changedTable),
format("-D[102, user_2, Shanghai, 123567891234, %s]", changedTable),
format("+I[102, user_2, Insert1, 123567891234, %s]", changedTable),
format("-U[103, user_3, Update1, 123567891234, %s]", changedTable),
format("+U[103, user_3, Update2, 123567891234, %s]", changedTable));
List<String> expectedSnapshotResult =
i == 0
? Stream.concat(
expectedCustomersEvenDistResult.stream(),
expectedCustomersResult.stream())
.collect(Collectors.toList())
: expectedCustomersResult;
List<String> rows = fetchRowData(iterator, expectedSnapshotResult.size());
assertEqualsInAnyOrder(expectedSnapshotResult, rows);
// make binlog events
try (MySqlConnection connection = getConnection()) {
connection.setAutoCommit(false);
String tableId = customDatabase.getDatabaseName() + "." + changedTable;
connection.execute(
"UPDATE " + tableId + " SET address = 'Update1' where id = 103",
"DELETE FROM " + tableId + " where id = 102",
"INSERT INTO "
+ tableId
+ " VALUES(102, 'user_2','Insert1','123567891234')",
"UPDATE " + tableId + " SET address = 'Update2' where id = 103");
connection.commit();
}
rows = fetchRowData(iterator, expectedBinlogResult.size());
assertEqualsInAnyOrder(expectedBinlogResult, rows);
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
temporaryFolder.delete();
}
/** Add a collect sink in the job. */
protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stream) {
TypeSerializer<RowData> serializer =
stream.getType().createSerializer(stream.getExecutionConfig());
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectSinkOperatorFactory<RowData> factory =
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<RowData> operator =
(CollectSinkOperator<RowData>) factory.getOperator();
CollectStreamSink<RowData> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
stream.getExecutionEnvironment().addOperator(sink.getTransformation());
CollectResultIterator<RowData> iterator =
new CollectResultIterator(
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
stream.getExecutionEnvironment().getCheckpointConfig());
return iterator;
}
private List<String> fetchRowData(Iterator<RowData> iter, int size) {
List<RowData> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
RowData row = iter.next();
rows.add(row);
size--;
}
return convertRowDataToRowString(rows);
}
private static List<String> convertRowDataToRowString(List<RowData> rows) {
LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
map.put("id", 0);
map.put("name", 1);
map.put("address", 2);
map.put("phone_number", 3);
map.put("_table_name", 4);
return rows.stream()
.map(
row ->
RowUtils.createRowWithNamedPositions(
row.getRowKind(),
new Object[] {
row.getLong(0),
row.getString(1),
row.getString(2),
row.getString(3),
row.getString(4)
},
map)
.toString())
.collect(Collectors.toList());
}
private void testRemoveTablesOneByOne(
int parallelism,
FailoverType failoverType,

Loading…
Cancel
Save