From 7d7ba2dc19e95728719ef0dc64a53fe7329bc10c Mon Sep 17 00:00:00 2001 From: Kunni Date: Tue, 16 Jan 2024 14:33:40 +0800 Subject: [PATCH] [hotfix][cdc-connector][mysql] Skip SchemaChangeEvents that were not included in capturedTableFilter --- .../connectors/mysql/source/MySqlPipelineITCase.java | 6 ++++++ .../mysql/debezium/reader/BinlogSplitReader.java | 12 ++++++++++-- .../connectors/mysql/source/utils/RecordUtils.java | 8 ++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 0e4b78358..00bdda4ba 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -522,6 +522,12 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase { tableId, Collections.singletonList( Column.physicalColumn("DESC3", DataTypes.BIGINT())))); + + // Should not catch SchemaChangeEvent of tables other than `products` + statement.execute( + String.format( + "ALTER TABLE `%s`.`orders` ADD COLUMN `desc1` VARCHAR(45) NULL;", + inventoryDatabase.getDatabaseName())); return expected; } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index f8fc488fe..a9add78df 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -65,6 +65,8 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSpl import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStructContainsChunkKey; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getTableId; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; +import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isTableChangeRecord; /** * A Debezium binlog reader implementation that also support reads binlog and filter overlapping @@ -252,9 +254,15 @@ public class BinlogSplitReader implements DebeziumReader