[FLINK-36945][cdc-connector/mysql] Support parsing rename multiple tables in one statement

This closes #3876.
pull/3887/head^2
yohei yoshimuta 2 weeks ago committed by GitHub
parent b44e5708b8
commit 39608ed663
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -29,7 +29,9 @@ import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChang
import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator; import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter; import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionFilters; import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema; import io.debezium.schema.DatabaseSchema;
@ -45,10 +47,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY; import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY; import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY;
@ -206,11 +212,103 @@ public class EventDispatcherImpl<T extends DataCollectionId>
String historyStr = DOCUMENT_WRITER.write(historyRecord.document()); String historyStr = DOCUMENT_WRITER.write(historyRecord.document());
Struct value = new Struct(schemaChangeValueSchema); Struct value = new Struct(schemaChangeValueSchema);
value.put(HistoryRecord.Fields.SOURCE, event.getSource()); value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event));
value.put(HISTORY_RECORD_FIELD, historyStr); value.put(HISTORY_RECORD_FIELD, historyStr);
return value; return value;
} }
/**
* Rewrites the table name in the Source if needed to handle schema changes properly.
*
* <p>This method addresses a specific issue when renaming multiple tables within a single
* statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO
* customers;}.
*
* <p>In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema}
* emits two separate change events:
*
* <ul>
* <li>{@code RENAME TABLE customers TO customers_old}
* <li>{@code RENAME TABLE customers_copy TO customers}
* </ul>
*
* <p>Both events share a table name of {@code customers, customers_old} in their source
* info, which includes multiple table IDs in a single string.
*
* <p>On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the
* schema change:
*
* <ul>
* <li>The change for {@code RENAME TABLE customers_copy TO customers} has the {@code
* customers} ID.
* <li>The change for {@code RENAME TABLE customers TO customers_old} is empty.
* </ul>
*
* <p>The problem arises because {@link
* org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect
* multiple table IDs in the source info. As a result, changes for tables defined by the
* table filter configuration (e.g., {@code customers}) may be filtered out unintentionally.
* This can lead to schema changes not being saved in the state, which is crucial for
* recovering the job from a snapshot.
*
* <p>To resolve this issue, this method:
*
* <ol>
* <li>Checks if the source info contains multiple table names.
* <li>Verifies if the {@code TableChange#id} matches one of the table names.
* <li>Updates the source info with the correct table name that conforms to Flink CDC
* expectations, ensuring the schema change is saved correctly.
* </ol>
*
* @param event the schema change event emitted by Debezium.
* @return the updated source info with the corrected table name if necessary.
*/
private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) {
Struct sourceInfo = event.getSource();
String tableName = sourceInfo.getString(TABLE_NAME_KEY);
if (tableName == null || tableName.isEmpty()) {
return sourceInfo;
}
List<String> tableNames = parseTableNames(tableName);
if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) {
for (TableChanges.TableChange tableChange : event.getTableChanges()) {
String changedTableName = getMatchingTableName(tableNames, tableChange.getId());
if (changedTableName != null) {
LOG.debug(
"Rewrite table name from {} to {} on swapping tables",
tableName,
changedTableName);
sourceInfo.put(TABLE_NAME_KEY, changedTableName);
}
}
}
return sourceInfo;
}
/**
* Decodes table names from a comma-separated string.
*
* <p>This method extracts individual table names from a string where multiple table names
* are separated by commas. The input string is constructed by {@link
* io.debezium.connector.mysql.SourceInfo}.
*
* @param tableName a comma-separated string containing multiple table names
* @return a list of trimmed table names
*/
private List<String> parseTableNames(String tableName) {
return Arrays.stream(tableName.split(","))
.map(String::trim)
.collect(Collectors.toList());
}
private String getMatchingTableName(List<String> tableNames, TableId tableId) {
return tableNames.stream()
.filter(name -> name.equals(tableId.table()))
.findFirst()
.orElse(null);
}
@Override @Override
public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException { public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException {
historizedSchema.applySchemaChange(event); historizedSchema.applySchemaChange(event);

@ -0,0 +1,395 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cdc.connectors.mysql.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.utils.TestCaseUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.TestTable;
import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.connector.mysql.MySqlConnection;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import java.io.File;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
/**
* Integration tests for handling schema changes regard to renaming multiple tables within a single
* statement.
*/
public class MySqlMultipleTablesRenamingITCase {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlMultipleTablesRenamingITCase.class);
@RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension();
@SuppressWarnings("unchecked")
private static final MySqlContainer MYSQL_CONTAINER =
(MySqlContainer)
new MySqlContainer()
.withConfigurationOverride(
buildMySqlConfigWithTimezone(
getResourceFolder(), getSystemTimeZone()))
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
private final UniqueDatabase customDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
private final TestTable customers =
new TestTable(customDatabase, "customers", TestTableSchemas.CUSTOMERS);
private MySqlConnection connection;
@BeforeAll
public static void before() throws Exception {
MYSQL_CONTAINER.start();
}
@AfterAll
public static void after() throws Exception {
MYSQL_CONTAINER.stop();
}
@BeforeEach
void prepare() throws Exception {
connection = getConnection();
customDatabase.createAndInitialize();
flushLogs();
}
@AfterEach
void tearDown() throws Exception {
customDatabase.dropDatabase();
connection.close();
}
/**
* Tests handling of renaming multiple tables within a single SQL statement in a Flink CDC job.
*
* <p>This integration test validates that schema changes involving multiple table renames, such
* as {@code RENAME TABLE table1 TO table1_old, table2 TO table1}, are correctly processed
* without data loss or inconsistency.
*
* <p>The test covers:
*
* <ul>
* <li>Initial validation of table contents before renaming.
* <li>Steps to rename tables, including schema changes like column drops.
* <li>Ensuring data integrity during savepoints and job restarts.
* <li>Validation of data consumption before and after savepoints to confirm state
* correctness.
* </ul>
*
* <p>This ensures that the connector can accurately process and persist schema changes when
* tables are swapped, addressing potential issues with table filtering or mismatched table IDs
* during schema updates.
*/
@Test
void testRenameTablesWithinSingleStatement() throws Exception {
// Build Flink job
StreamExecutionEnvironment env = getExecutionEnvironment();
MySqlSource<String> source = getSourceBuilder().build();
DataStreamSource<String> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "rename-tables-test");
CollectResultIterator<String> iterator = addCollector(env, stream);
// Copy transformations into another env
StreamExecutionEnvironment restoredEnv = getExecutionEnvironment();
duplicateTransformations(env, restoredEnv);
// Execute job and validate results
JobClient jobClient = env.executeAsync();
iterator.setJobClient(jobClient);
{
String[] expected =
new String[] {
"{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
"{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
};
List<String> rows = fetchRow(iterator, 21);
TestCaseUtils.repeatedCheck(
() -> Arrays.stream(expected).allMatch(rows.toString()::contains));
}
{
LOG.info("Step 1: Create a copy of the target table");
executeStatements(
String.format(
"CREATE TABLE %s_copy LIKE %s;",
customers.getTableId(), customers.getTableId()));
LOG.info("Step 2: Alter the copied table to drop a column");
executeStatements(
String.format(
"ALTER TABLE %s_copy DROP COLUMN phone_number;",
customers.getTableId()));
LOG.info("Step 3: Swap the tables");
executeStatements(
String.format(
"RENAME TABLE %s TO %s_old, %s_copy TO %s;",
customers.getTableId(),
customers.getTableId(),
customers.getTableId(),
customers.getTableId()));
LOG.info("Step 4: Insert data into the altered table before the savepoint");
executeStatements(
String.format(
"INSERT INTO %s VALUES (19213, 'Diana', 'Berlin');",
customers.getTableId()));
List<String> rowsBeforeRestored = fetchRow(iterator, 1);
TestCaseUtils.repeatedCheck(
() ->
rowsBeforeRestored
.toString()
.contains(
"{\"id\":19213,\"name\":\"Diana\",\"address\":\"Berlin\"}"));
}
{
LOG.info("Step 5: Take a savepoint");
Path savepointDir = Files.createTempDirectory("rename-tables-test");
String savepointPath =
jobClient
.stopWithSavepoint(
false,
savepointDir.toAbsolutePath().toString(),
SavepointFormatType.DEFAULT)
.get();
LOG.info("Step 6: Insert data into the altered table after the savepoint");
executeStatements(
String.format(
"INSERT INTO %s VALUES (19214, 'Diana2', 'Berlin2');",
customers.getTableId()));
LOG.info("Step 7: Restart the job from savepoint");
setupSavepoint(restoredEnv, savepointPath);
JobClient restoredJobClient = restoredEnv.executeAsync("rename-tables-test");
iterator.setJobClient(restoredJobClient);
List<String> rowsAfterRestored = fetchRow(iterator, 1);
TestCaseUtils.repeatedCheck(
() ->
rowsAfterRestored
.toString()
.contains(
"{\"id\":19214,\"name\":\"Diana2\",\"address\":\"Berlin2\"}"));
restoredJobClient.cancel().get();
}
}
private MySqlSourceBuilder<String> getSourceBuilder() {
return MySqlSource.<String>builder()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customDatabase.getUsername())
.password(customDatabase.getPassword())
.databaseList(customDatabase.getDatabaseName())
.tableList(customers.getTableId())
.deserializer(new JsonDebeziumDeserializationSchema());
}
private MySqlConnection getConnection() {
Map<String, String> properties = new HashMap<>();
properties.put("database.hostname", MYSQL_CONTAINER.getHost());
properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
properties.put("database.user", customDatabase.getUsername());
properties.put("database.password", customDatabase.getPassword());
io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties);
return DebeziumUtils.createMySqlConnection(configuration, new Properties());
}
private void executeStatements(String... statements) throws Exception {
connection.execute(statements);
connection.commit();
}
private void flushLogs() throws Exception {
executeStatements("FLUSH LOGS;");
}
private <T> CollectResultIterator<T> addCollector(
StreamExecutionEnvironment env, DataStream<T> stream) {
TypeSerializer<T> serializer =
stream.getTransformation().getOutputType().createSerializer(env.getConfig());
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectSinkOperatorFactory<T> factory =
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
CollectResultIterator<T> iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig(),
10000L);
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());
return iterator;
}
private static List<String> fetchRow(Iterator<String> iter, int size) {
List<String> rows = new ArrayList<>(size);
while (size > 0 && iter.hasNext()) {
String row = iter.next();
rows.add(row);
size--;
}
return rows;
}
private static String buildMySqlConfigWithTimezone(File resourceDirectory, String timezone) {
try {
TemporaryFolder tempFolder = new TemporaryFolder(resourceDirectory);
tempFolder.create();
File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));
Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"));
String mysqldConf =
"[mysqld]\n"
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
+ "binlog_row_image = FULL\n"
+ "gtid_mode = on\n"
+ "enforce_gtid_consistency = on\n";
String timezoneConf = "default-time_zone = '" + timezone + "'\n";
Files.write(
cnf,
Collections.singleton(mysqldConf + timezoneConf),
StandardCharsets.UTF_8,
StandardOpenOption.APPEND);
return Paths.get(resourceDirectory.getAbsolutePath()).relativize(cnf).toString();
} catch (Exception e) {
throw new RuntimeException("Failed to create my.cnf file.", e);
}
}
private static File getResourceFolder() {
try {
return Paths.get(
Objects.requireNonNull(
SpecificStartingOffsetITCase.class
.getClassLoader()
.getResource("."))
.toURI())
.toFile();
} catch (Exception e) {
throw new FlinkRuntimeException("Get Resource File Directory fail");
}
}
private static String getSystemTimeZone() {
return ZoneId.systemDefault().toString();
}
private void setupSavepoint(StreamExecutionEnvironment env, String savepointPath)
throws Exception {
// restore from savepoint
// hack for test to visit protected TestStreamEnvironment#getConfiguration() method
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?> clazz =
classLoader.loadClass(
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
Field field = clazz.getDeclaredField("configuration");
field.setAccessible(true);
Configuration configuration = (Configuration) field.get(env);
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
}
private void duplicateTransformations(
StreamExecutionEnvironment source, StreamExecutionEnvironment target) {
source.getTransformations().forEach(target::addOperator);
}
private StreamExecutionEnvironment getExecutionEnvironment() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
return env;
}
}
Loading…
Cancel
Save