diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index b4af21503..b0f50f7b2 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -110,11 +110,13 @@ public class RecordUtils { List allBinlogRecords = sourceRecords.subList(i, sourceRecords.size() - 1); for (SourceRecord binlog : allBinlogRecords) { - Object[] key = - getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster); - if (splitKeyRangeContains( - key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) { - binlogRecords.add(binlog); + if (isDataChangeRecord(binlog)) { + Object[] key = + getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster); + if (splitKeyRangeContains( + key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) { + binlogRecords.add(binlog); + } } } }