[hotfix][cdc-connector][mysql] Skip SchemaChangeEvents that were not included in capturedTableFilter

pull/3084/head
Kunni 1 year ago committed by GitHub
parent e687349440
commit 7d7ba2dc19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -522,6 +522,12 @@ public class MySqlPipelineITCase extends MySqlSourceTestBase {
tableId,
Collections.singletonList(
Column.physicalColumn("DESC3", DataTypes.BIGINT()))));
// Should not catch SchemaChangeEvent of tables other than `products`
statement.execute(
String.format(
"ALTER TABLE `%s`.`orders` ADD COLUMN `desc1` VARCHAR(45) NULL;",
inventoryDatabase.getDatabaseName()));
return expected;
}
}

@ -65,6 +65,8 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSpl
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStructContainsChunkKey;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isTableChangeRecord;
/**
* A Debezium binlog reader implementation that also support reads binlog and filter overlapping
@ -252,9 +254,15 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
}
// not in the monitored splits scope, do not emit
return false;
} else if (isSchemaChangeEvent(sourceRecord)) {
if (isTableChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
return capturedTableFilter.isIncluded(tableId);
} else {
// Not related to changes in table structure, like `CREATE/DROP DATABASE`, skip it
return false;
}
}
// always send the schema change event and signal event
// we need record them to state of Flink
return true;
}

@ -18,6 +18,7 @@ package com.ververica.cdc.connectors.mysql.source.utils;
import org.apache.flink.table.types.logical.RowType;
import com.ververica.cdc.common.utils.StringUtils;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind;
import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
@ -387,6 +388,13 @@ public class RecordUtils {
return new TableId(dbName, null, tableName);
}
public static boolean isTableChangeRecord(SourceRecord dataRecord) {
Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String tableName = source.getString(TABLE_NAME_KEY);
return !StringUtils.isNullOrWhitespaceOnly(tableName);
}
public static Object[] getSplitKey(
RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Struct target) {
// the split key field contains single field now

Loading…
Cancel
Save