diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 0e4b78358..00bdda4ba 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -522,6 +522,12 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { tableId, Collections.singletonList( Column.physicalColumn("DESC3", DataTypes.BIGINT())))); + + // Should not catch SchemaChangeEvent of tables other than `products` + statement.execute( + String.format( + "ALTER TABLE `%s`.`orders` ADD COLUMN `desc1` VARCHAR(45) NULL;", + inventoryDatabase.getDatabaseName())); return expected; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 857b2639c..c9c95d67f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -64,6 +64,8 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSpl import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStructContainsChunkKey; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getTableId; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isTableChangeRecord; /** * A Debezium binlog reader implementation that also support reads binlog and filter overlapping @@ -245,9 +247,15 @@ public class BinlogSplitReader implements DebeziumReader