diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java index 11553ccda..6205c9c7a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java @@ -29,7 +29,9 @@ import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChang import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.ChangeEventCreator; import io.debezium.pipeline.spi.SchemaChangeEventEmitter; +import io.debezium.relational.TableId; import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.TableChanges; import io.debezium.schema.DataCollectionFilters; import io.debezium.schema.DataCollectionId; import io.debezium.schema.DatabaseSchema; @@ -45,10 +47,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY; import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY; @@ -206,11 +212,103 @@ public class EventDispatcherImpl String historyStr = DOCUMENT_WRITER.write(historyRecord.document()); Struct value = new Struct(schemaChangeValueSchema); - value.put(HistoryRecord.Fields.SOURCE, event.getSource()); + value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event)); value.put(HISTORY_RECORD_FIELD, historyStr); return value; } + /** + * Rewrites the table name in the Source if needed to handle schema changes properly. + * + *

This method addresses a specific issue when renaming multiple tables within a single + * statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO + * customers;}. + * + *

In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema} + * emits two separate change events: + * + *

+ * + *

Both events share a table name of {@code customers, customers_old} in their source + * info, which includes multiple table IDs in a single string. + * + *

On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the + * schema change: + * + *

+ * + *

The problem arises because {@link + * org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect + * multiple table IDs in the source info. As a result, changes for tables defined by the + * table filter configuration (e.g., {@code customers}) may be filtered out unintentionally. + * This can lead to schema changes not being saved in the state, which is crucial for + * recovering the job from a snapshot. + * + *

To resolve this issue, this method: + * + *

    + *
  1. Checks if the source info contains multiple table names. + *
  2. Verifies if the {@code TableChange#id} matches one of the table names. + *
  3. Updates the source info with the correct table name that conforms to Flink CDC + * expectations, ensuring the schema change is saved correctly. + *
+ * + * @param event the schema change event emitted by Debezium. + * @return the updated source info with the corrected table name if necessary. + */ + private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) { + Struct sourceInfo = event.getSource(); + String tableName = sourceInfo.getString(TABLE_NAME_KEY); + if (tableName == null || tableName.isEmpty()) { + return sourceInfo; + } + + List tableNames = parseTableNames(tableName); + if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) { + for (TableChanges.TableChange tableChange : event.getTableChanges()) { + String changedTableName = getMatchingTableName(tableNames, tableChange.getId()); + if (changedTableName != null) { + LOG.debug( + "Rewrite table name from {} to {} on swapping tables", + tableName, + changedTableName); + sourceInfo.put(TABLE_NAME_KEY, changedTableName); + } + } + } + return sourceInfo; + } + + /** + * Decodes table names from a comma-separated string. + * + *

This method extracts individual table names from a string where multiple table names + * are separated by commas. The input string is constructed by {@link + * io.debezium.connector.mysql.SourceInfo}. + * + * @param tableName a comma-separated string containing multiple table names + * @return a list of trimmed table names + */ + private List parseTableNames(String tableName) { + return Arrays.stream(tableName.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + } + + private String getMatchingTableName(List tableNames, TableId tableId) { + return tableNames.stream() + .filter(name -> name.equals(tableId.table())) + .findFirst() + .orElse(null); + } + @Override public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException { historizedSchema.applySchemaChange(event);