From 75276235dc5426c17ce17f20e0b6cb6de5e24897 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Mon, 8 Nov 2021 15:03:31 +0800 Subject: [PATCH] [mysql] Skip ddl events in snapshot split normalization (#577) --- .../connectors/mysql/source/utils/RecordUtils.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index b4af21503..b0f50f7b2 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -110,11 +110,13 @@ public class RecordUtils { List allBinlogRecords = sourceRecords.subList(i, sourceRecords.size() - 1); for (SourceRecord binlog : allBinlogRecords) { - Object[] key = - getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster); - if (splitKeyRangeContains( - key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) { - binlogRecords.add(binlog); + if (isDataChangeRecord(binlog)) { + Object[] key = + getSplitKey(snapshotSplit.getSplitKeyType(), binlog, nameAdjuster); + if (splitKeyRangeContains( + key, snapshotSplit.getSplitStart(), snapshotSplit.getSplitEnd())) { + binlogRecords.add(binlog); + } } } }