From 7dbac7277ef0a058c082db925f312fe2e04f3c93 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Wed, 19 Jul 2023 13:54:41 +0800 Subject: [PATCH] [mysql] Remove the finished snapshot splits for the binlog split when restoring from the checkpoint (#2292) (#2318) --- .../source/reader/MySqlSourceReader.java | 38 ++- .../mysql/source/split/MySqlBinlogSplit.java | 27 ++ .../mysql/source/NewlyAddedTableITCase.java | 240 ++++++++++++++++++ 3 files changed, 298 insertions(+), 7 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index 2a93a47bb..356615788 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -66,6 +66,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import static com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner.BINLOG_SPLIT_ID; +import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.filterOutdatedSplitInfos; import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId; @@ -203,6 +204,17 @@ public class MySqlSourceReader @Override public void addSplits(List splits) { + addSplits(splits, true); + } + + /** + * Adds a list of splits for this reader to read. + * + * @param splits the splits to add. + * @param checkTableChangeForBinlogSplit to check the captured table list change or not, it + * should be true for reader which is during restoration from a checkpoint or savepoint. + */ + private void addSplits(List splits, boolean checkTableChangeForBinlogSplit) { // restore for finishedUnackedSplits List unfinishedSplits = new ArrayList<>(); for (MySqlSplit split : splits) { @@ -224,6 +236,20 @@ public class MySqlSourceReader } } else { MySqlBinlogSplit binlogSplit = split.asBinlogSplit(); + // When restore from a checkpoint, the finished split infos may contain some splits + // for the deleted tables. + // We need to remove these splits for the deleted tables at the finished split + // infos. + if (checkTableChangeForBinlogSplit) { + binlogSplit = + filterOutdatedSplitInfos( + binlogSplit, + sourceConfig + .getMySqlConnectorConfig() + .getTableFilters() + .dataCollectionFilter()); + } + // Try to discovery table schema once for newly added tables when source reader // start or restore boolean checkNewlyAddedTableSchema = @@ -235,15 +261,13 @@ public class MySqlSourceReader if (binlogSplit.isSuspended()) { suspendedBinlogSplit = binlogSplit; } else if (!binlogSplit.isCompletedSplit()) { - uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit()); - requestBinlogSplitMetaIfNeeded(split.asBinlogSplit()); + uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit); + requestBinlogSplitMetaIfNeeded(binlogSplit); } else { - uncompletedBinlogSplits.remove(split.splitId()); + uncompletedBinlogSplits.remove(binlogSplit.splitId()); MySqlBinlogSplit mySqlBinlogSplit = discoverTableSchemasForBinlogSplit( - split.asBinlogSplit(), - sourceConfig, - checkNewlyAddedTableSchema); + binlogSplit, sourceConfig, checkNewlyAddedTableSchema); unfinishedSplits.add(mySqlBinlogSplit); } } @@ -302,7 +326,7 @@ public class MySqlSourceReader final MySqlBinlogSplit binlogSplit = toNormalBinlogSplit(suspendedBinlogSplit, finishedSplitsSize); suspendedBinlogSplit = null; - this.addSplits(Collections.singletonList(binlogSplit)); + this.addSplits(Collections.singletonList(binlogSplit), false); context.sendSourceEventToCoordinator(new BinlogSplitUpdateAckEvent()); LOG.info( diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java index 62ee0f0eb..9d2ba0c1f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java @@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.split; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import io.debezium.relational.TableId; +import io.debezium.relational.Tables; import io.debezium.relational.history.TableChanges.TableChange; import javax.annotation.Nullable; @@ -26,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** The split to describe the binlog of MySql table(s). */ public class MySqlBinlogSplit extends MySqlSplit { @@ -170,6 +172,31 @@ public class MySqlBinlogSplit extends MySqlSplit { binlogSplit.isSuspended()); } + /** + * Filter out the outdated finished splits in {@link MySqlBinlogSplit}. + * + *

When restore from a checkpoint, the finished split infos may contain some splits from the + * deleted tables. We need to remove these splits from the total finished split infos and update + * the size. + */ + public static MySqlBinlogSplit filterOutdatedSplitInfos( + MySqlBinlogSplit binlogSplit, Tables.TableFilter currentTableFilter) { + List allFinishedSnapshotSplitInfos = + binlogSplit.getFinishedSnapshotSplitInfos().stream() + .filter(i -> currentTableFilter.isIncluded(i.getTableId())) + .collect(Collectors.toList()); + return new MySqlBinlogSplit( + binlogSplit.splitId, + binlogSplit.getStartingOffset(), + binlogSplit.getEndingOffset(), + allFinishedSnapshotSplitInfos, + binlogSplit.getTableSchemas(), + binlogSplit.getTotalFinishedSplitSize() + - (binlogSplit.getFinishedSnapshotSplitInfos().size() + - allFinishedSnapshotSplitInfos.size()), + binlogSplit.isSuspended()); + } + public static MySqlBinlogSplit fillTableSchemas( MySqlBinlogSplit binlogSplit, Map tableSchemas) { tableSchemas.putAll(binlogSplit.getTableSchemas()); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/NewlyAddedTableITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/NewlyAddedTableITCase.java index 44618af01..8765316ff 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/NewlyAddedTableITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/NewlyAddedTableITCase.java @@ -16,19 +16,36 @@ package com.ververica.cdc.connectors.mysql.source; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowUtils; import org.apache.flink.util.ExceptionUtils; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; +import com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; +import com.ververica.cdc.debezium.table.MetadataConverter; +import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.jdbc.JdbcConnection; import org.apache.commons.lang3.StringUtils; @@ -45,16 +62,20 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.lang.String.format; import static org.apache.flink.util.Preconditions.checkState; @@ -310,6 +331,225 @@ public class NewlyAddedTableITCase extends MySqlSourceTestBase { "address_shanghai"); } + @Test + public void testRemoveAndAddNewTable() throws Exception { + // round 1 : table0 + table1 (customers_even_dist + customers) + // round 2 : table0 + table2 (customers_even_dist + customers_1) + String tableId0 = customDatabase.getDatabaseName() + ".customers_even_dist"; + String tableId1 = "customers"; + String tableId2 = "customers_\\d+"; + + final TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final String savepointDirectory = temporaryFolder.newFolder().toURI().toString(); + + String finishedSavePointPath = null; + CollectResultIterator iterator = null; + for (int i = 0; i < 2; i++) { + String changedTable = i == 0 ? tableId1 : "customers_1"; + StreamExecutionEnvironment env = + getStreamExecutionEnvironment(finishedSavePointPath, 4); + + RowDataDebeziumDeserializeSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setMetadataConverters( + new MetadataConverter[] { + MySqlReadableMetadata.TABLE_NAME.getConverter() + }) + .setPhysicalRowType( + (RowType) + DataTypes.ROW( + DataTypes.FIELD( + "id", DataTypes.BIGINT()), + DataTypes.FIELD( + "name", DataTypes.STRING()), + DataTypes.FIELD( + "address", DataTypes.STRING()), + DataTypes.FIELD( + "phone_number", + DataTypes.STRING())) + .getLogicalType()) + .setResultTypeInfo( + InternalTypeInfo.of( + TypeConversions.fromDataToLogicalType( + DataTypes.ROW( + DataTypes.FIELD( + "id", DataTypes.BIGINT()), + DataTypes.FIELD( + "name", DataTypes.STRING()), + DataTypes.FIELD( + "address", DataTypes.STRING()), + DataTypes.FIELD( + "phone_number", + DataTypes.STRING()), + DataTypes.FIELD( + "_table_name", + DataTypes.STRING() + .notNull()))))) + .build(); + + // Build source + MySqlSource mySqlSource = + MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .databaseList(customDatabase.getDatabaseName()) + .serverTimeZone("UTC") + .tableList( + tableId0, + customDatabase.getDatabaseName() + + "." + + (i == 0 ? tableId1 : tableId2)) + .username(customDatabase.getUsername()) + .password(customDatabase.getPassword()) + .serverId("5401-5404") + .deserializer(deserializer) + .scanNewlyAddedTableEnabled(true) + .build(); + + // Build and execute the job + DataStreamSource source = + env.fromSource( + mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source" + i); + if (iterator == null) { + iterator = addCollectSink(source); + } else { + addCollectSink(source); + } + JobClient jobClient = env.executeAsync("Collect " + i); + iterator.setJobClient(jobClient); + + List expectedCustomersEvenDistResult = + Arrays.asList( + "+I[103, user_3, Shanghai, 123567891234, customers_even_dist]", + "+I[104, user_4, Shanghai, 123567891234, customers_even_dist]", + "+I[101, user_1, Shanghai, 123567891234, customers_even_dist]", + "+I[102, user_2, Shanghai, 123567891234, customers_even_dist]", + "+I[107, user_7, Shanghai, 123567891234, customers_even_dist]", + "+I[108, user_8, Shanghai, 123567891234, customers_even_dist]", + "+I[105, user_5, Shanghai, 123567891234, customers_even_dist]", + "+I[106, user_6, Shanghai, 123567891234, customers_even_dist]", + "+I[109, user_9, Shanghai, 123567891234, customers_even_dist]", + "+I[110, user_10, Shanghai, 123567891234, customers_even_dist]"); + List expectedCustomersResult = + Arrays.asList( + format("+I[1011, user_12, Shanghai, 123567891234, %s]", changedTable), + format("+I[1012, user_13, Shanghai, 123567891234, %s]", changedTable), + format("+I[1009, user_10, Shanghai, 123567891234, %s]", changedTable), + format("+I[1010, user_11, Shanghai, 123567891234, %s]", changedTable), + format("+I[1015, user_16, Shanghai, 123567891234, %s]", changedTable), + format("+I[1016, user_17, Shanghai, 123567891234, %s]", changedTable), + format("+I[1013, user_14, Shanghai, 123567891234, %s]", changedTable), + format("+I[118, user_7, Shanghai, 123567891234, %s]", changedTable), + format("+I[1014, user_15, Shanghai, 123567891234, %s]", changedTable), + format("+I[111, user_6, Shanghai, 123567891234, %s]", changedTable), + format("+I[2000, user_21, Shanghai, 123567891234, %s]", changedTable), + format("+I[109, user_4, Shanghai, 123567891234, %s]", changedTable), + format("+I[110, user_5, Shanghai, 123567891234, %s]", changedTable), + format("+I[103, user_3, Shanghai, 123567891234, %s]", changedTable), + format("+I[101, user_1, Shanghai, 123567891234, %s]", changedTable), + format("+I[102, user_2, Shanghai, 123567891234, %s]", changedTable), + format("+I[123, user_9, Shanghai, 123567891234, %s]", changedTable), + format("+I[1019, user_20, Shanghai, 123567891234, %s]", changedTable), + format("+I[121, user_8, Shanghai, 123567891234, %s]", changedTable), + format("+I[1017, user_18, Shanghai, 123567891234, %s]", changedTable), + format("+I[1018, user_19, Shanghai, 123567891234, %s]", changedTable)); + List expectedBinlogResult = + Arrays.asList( + format("-U[103, user_3, Shanghai, 123567891234, %s]", changedTable), + format("+U[103, user_3, Update1, 123567891234, %s]", changedTable), + format("-D[102, user_2, Shanghai, 123567891234, %s]", changedTable), + format("+I[102, user_2, Insert1, 123567891234, %s]", changedTable), + format("-U[103, user_3, Update1, 123567891234, %s]", changedTable), + format("+U[103, user_3, Update2, 123567891234, %s]", changedTable)); + + List expectedSnapshotResult = + i == 0 + ? Stream.concat( + expectedCustomersEvenDistResult.stream(), + expectedCustomersResult.stream()) + .collect(Collectors.toList()) + : expectedCustomersResult; + List rows = fetchRowData(iterator, expectedSnapshotResult.size()); + assertEqualsInAnyOrder(expectedSnapshotResult, rows); + + // make binlog events + try (MySqlConnection connection = getConnection()) { + connection.setAutoCommit(false); + String tableId = customDatabase.getDatabaseName() + "." + changedTable; + connection.execute( + "UPDATE " + tableId + " SET address = 'Update1' where id = 103", + "DELETE FROM " + tableId + " where id = 102", + "INSERT INTO " + + tableId + + " VALUES(102, 'user_2','Insert1','123567891234')", + "UPDATE " + tableId + " SET address = 'Update2' where id = 103"); + connection.commit(); + } + rows = fetchRowData(iterator, expectedBinlogResult.size()); + assertEqualsInAnyOrder(expectedBinlogResult, rows); + + finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); + jobClient.cancel().get(); + } + temporaryFolder.delete(); + } + + /** Add a collect sink in the job. */ + protected CollectResultIterator addCollectSink(DataStream stream) { + TypeSerializer serializer = + stream.getType().createSerializer(stream.getExecutionConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory<>(serializer, accumulatorName); + CollectSinkOperator operator = + (CollectSinkOperator) factory.getOperator(); + CollectStreamSink sink = new CollectStreamSink<>(stream, factory); + sink.name("Data stream collect sink"); + stream.getExecutionEnvironment().addOperator(sink.getTransformation()); + CollectResultIterator iterator = + new CollectResultIterator( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + stream.getExecutionEnvironment().getCheckpointConfig()); + return iterator; + } + + private List fetchRowData(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + RowData row = iter.next(); + rows.add(row); + size--; + } + return convertRowDataToRowString(rows); + } + + private static List convertRowDataToRowString(List rows) { + LinkedHashMap map = new LinkedHashMap<>(); + map.put("id", 0); + map.put("name", 1); + map.put("address", 2); + map.put("phone_number", 3); + map.put("_table_name", 4); + return rows.stream() + .map( + row -> + RowUtils.createRowWithNamedPositions( + row.getRowKind(), + new Object[] { + row.getLong(0), + row.getString(1), + row.getString(2), + row.getString(3), + row.getString(4) + }, + map) + .toString()) + .collect(Collectors.toList()); + } + private void testRemoveTablesOneByOne( int parallelism, FailoverType failoverType,