[FLINK-36945] Fix the inconsistency with MySQL CDC internal schema representation with the real database schema when restarting a job

pull/3876/head
Yohei Yoshimuta 1 week ago
parent 0e02ddf008
commit 403c6707e2
No known key found for this signature in database
GPG Key ID: 72F7A861B098E60D

@ -29,7 +29,9 @@ import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChang
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
@ -45,10 +47,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY;
@ -206,11 +212,103 @@ public class EventDispatcherImpl<T extends DataCollectionId>
String historyStr = DOCUMENT_WRITER.write(historyRecord.document());
Struct value = new Struct(schemaChangeValueSchema);
value.put(HistoryRecord.Fields.SOURCE, event.getSource());
value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event));
value.put(HISTORY_RECORD_FIELD, historyStr);
return value;
}
/**
* Rewrites the table name in the Source if needed to handle schema changes properly.
*
* <p>This method addresses a specific issue when renaming multiple tables within a single
* statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO
* customers;}.
*
* <p>In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema}
* emits two separate change events:
*
* <ul>
* <li>{@code RENAME TABLE customers TO customers_old}
* <li>{@code RENAME TABLE customers_copy TO customers}
* </ul>
*
* <p>Both events share a table name of {@code customers, customers_old} in their source
* info, which includes multiple table IDs in a single string.
*
* <p>On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the
* schema change:
*
* <ul>
* <li>The change for {@code RENAME TABLE customers_copy TO customers} has the {@code
* customers} ID.
* <li>The change for {@code RENAME TABLE customers TO customers_old} is empty.
* </ul>
*
* <p>The problem arises because {@link
* org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect
* multiple table IDs in the source info. As a result, changes for tables defined by the
* table filter configuration (e.g., {@code customers}) may be filtered out unintentionally.
* This can lead to schema changes not being saved in the state, which is crucial for
* recovering the job from a snapshot.
*
* <p>To resolve this issue, this method:
*
* <ol>
* <li>Checks if the source info contains multiple table names.
* <li>Verifies if the {@code TableChange#id} matches one of the table names.
* <li>Updates the source info with the correct table name that conforms to Flink CDC
* expectations, ensuring the schema change is saved correctly.
* </ol>
*
* @param event the schema change event emitted by Debezium.
* @return the updated source info with the corrected table name if necessary.
*/
private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) {
Struct sourceInfo = event.getSource();
String tableName = sourceInfo.getString(TABLE_NAME_KEY);
if (tableName == null || tableName.isEmpty()) {
return sourceInfo;
}
List<String> tableNames = parseTableNames(tableName);
if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) {
for (TableChanges.TableChange tableChange : event.getTableChanges()) {
String changedTableName = getMatchingTableName(tableNames, tableChange.getId());
if (changedTableName != null) {
LOG.debug(
"Rewrite table name from {} to {} on swapping tables",
tableName,
changedTableName);
sourceInfo.put(TABLE_NAME_KEY, changedTableName);
}
}
}
return sourceInfo;
}
/**
* Decodes table names from a comma-separated string.
*
* <p>This method extracts individual table names from a string where multiple table names
* are separated by commas. The input string is constructed by {@link
* io.debezium.connector.mysql.SourceInfo}.
*
* @param tableName a comma-separated string containing multiple table names
* @return a list of trimmed table names
*/
private List<String> parseTableNames(String tableName) {
return Arrays.stream(tableName.split(","))
.map(String::trim)
.collect(Collectors.toList());
}
private String getMatchingTableName(List<String> tableNames, TableId tableId) {
return tableNames.stream()
.filter(name -> name.equals(tableId.table()))
.findFirst()
.orElse(null);
}
@Override
public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException {
historizedSchema.applySchemaChange(event);

Loading…
Cancel
Save