From ad80e47c42735c42c9eacf125bd56440b2f85f7f Mon Sep 17 00:00:00 2001 From: yurunchuan Date: Thu, 14 Apr 2022 12:57:45 +0800 Subject: [PATCH] [mysql] Avoid duplicate split requests when add new table (#1156) --- .../connectors/mysql/source/reader/MySqlSourceReader.java | 6 ++---- .../cdc/connectors/mysql/source/utils/RecordUtils.java | 1 + 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index 43c34f0aa..0645d5447 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -63,7 +63,7 @@ import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; -import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER; +import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.BINLOG_READER; import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId; @@ -246,9 +246,7 @@ public class MySqlSourceReader mySqlSourceReaderContext.setStopBinlogSplitReader(); } else if (sourceEvent instanceof WakeupReaderEvent) { WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent) sourceEvent; - if (wakeupReaderEvent.getTarget() == SNAPSHOT_READER) { - context.sendSplitRequest(); - } else { + if (wakeupReaderEvent.getTarget() == BINLOG_READER) { if (suspendedBinlogSplit != null) { context.sendSourceEventToCoordinator( new LatestFinishedSplitsSizeRequestEvent()); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java index 80af25c3f..fda94c488 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -41,6 +41,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional;