From 8ec8ca344ac161d3bb00f15f774dfcd4f15914b2 Mon Sep 17 00:00:00 2001
From: Ihor Mielientiev <igor.melent@gmail.com>
Date: Wed, 8 Jan 2025 12:54:31 +0100
Subject: [PATCH] FLINK-37065: MySQL cdc, fix method 'fixRestoredGtidSet' to
 support gtid sets with gaps

---
 .../debezium/connector/mysql/GtidUtils.java   | 58 ++++++-----
 .../connector/mysql/GtidUtilsTest.java        | 95 +++++++++++++------
 2 files changed, 99 insertions(+), 54 deletions(-)

diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
index fe25208f6..cd89e99b1 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java
@@ -18,6 +18,8 @@
 package io.debezium.connector.mysql;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,36 +38,53 @@ public class GtidUtils {
     public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) {
         Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
         serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));
-        for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
-            GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
+        for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) {
+            GtidSet.UUIDSet serverUuidSet = newSet.get(restoredUuidSet.getUUID());
             if (serverUuidSet != null) {
-                long restoredIntervalEnd = getIntervalEnd(uuidSet);
-                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals =
-                        new ArrayList<>();
-                for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) {
-                    if (serverInterval.getEnd() <= restoredIntervalEnd) {
-                        newIntervals.add(
+                List<GtidSet.Interval> serverIntervals = serverUuidSet.getIntervals();
+                List<GtidSet.Interval> restoredIntervals = restoredUuidSet.getIntervals();
+
+                long earliestRestoredTx = getMinIntervalStart(restoredIntervals);
+
+                List<com.github.shyiko.mysql.binlog.GtidSet.Interval> merged = new ArrayList<>();
+
+                for (GtidSet.Interval serverInterval : serverIntervals) {
+                    if (serverInterval.getEnd() < earliestRestoredTx) {
+                        merged.add(
                                 new com.github.shyiko.mysql.binlog.GtidSet.Interval(
                                         serverInterval.getStart(), serverInterval.getEnd()));
-                    } else if (serverInterval.getStart() <= restoredIntervalEnd
-                            && serverInterval.getEnd() > restoredIntervalEnd) {
-                        newIntervals.add(
+                    } else if (serverInterval.getStart() < earliestRestoredTx
+                            && serverInterval.getEnd() >= earliestRestoredTx) {
+                        merged.add(
                                 new com.github.shyiko.mysql.binlog.GtidSet.Interval(
-                                        serverInterval.getStart(), restoredIntervalEnd));
+                                        serverInterval.getStart(), earliestRestoredTx - 1));
                     }
                 }
-                newSet.put(
-                        uuidSet.getUUID(),
+
+                for (GtidSet.Interval restoredInterval : restoredIntervals) {
+                    merged.add(
+                            new com.github.shyiko.mysql.binlog.GtidSet.Interval(
+                                    restoredInterval.getStart(), restoredInterval.getEnd()));
+                }
+
+                GtidSet.UUIDSet mergedUuidSet =
                         new GtidSet.UUIDSet(
                                 new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
-                                        uuidSet.getUUID(), newIntervals)));
+                                        restoredUuidSet.getUUID(), merged));
+
+                newSet.put(restoredUuidSet.getUUID(), mergedUuidSet);
             } else {
-                newSet.put(uuidSet.getUUID(), uuidSet);
+                newSet.put(restoredUuidSet.getUUID(), restoredUuidSet);
             }
         }
         return new GtidSet(newSet);
     }
 
+    private static long getMinIntervalStart(List<GtidSet.Interval> intervals) {
+        return Collections.min(intervals, Comparator.comparingLong(GtidSet.Interval::getStart))
+                .getStart();
+    }
+
     /**
      * This method merges one GTID set (toMerge) into another (base), without overwriting the
      * existing elements in the base GTID set.
@@ -80,11 +99,4 @@ public class GtidUtils {
         }
         return new GtidSet(newSet);
     }
-
-    private static long getIntervalEnd(GtidSet.UUIDSet uuidSet) {
-        return uuidSet.getIntervals().stream()
-                .mapToLong(GtidSet.Interval::getEnd)
-                .max()
-                .getAsLong();
-    }
 }
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
index f4c28453d..70601b96e 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java
@@ -18,6 +18,11 @@
 package io.debezium.connector.mysql;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
 
 import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet;
 import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto;
@@ -25,46 +30,74 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit test for {@link GtidUtils}. */
 class GtidUtilsTest {
-    @Test
-    void testFixingRestoredGtidSet() {
-        GtidSet serverGtidSet = new GtidSet("A:1-100");
-        GtidSet restoredGtidSet = new GtidSet("A:30-100");
-        assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
-                .isEqualTo("A:1-100");
 
-        serverGtidSet = new GtidSet("A:1-100");
-        restoredGtidSet = new GtidSet("A:30-50");
-        assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
-                .isEqualTo("A:1-50");
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("gtidSetsProvider")
+    void testFixingRestoredGtidSet(
+            String description, String serverStr, String restoredStr, String expectedStr) {
+        GtidSet serverGtidSet = new GtidSet(serverStr);
+        GtidSet restoredGtidSet = new GtidSet(restoredStr);
 
-        serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
-        restoredGtidSet = new GtidSet("A:106-150");
-        assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
-                .isEqualTo("A:1-100:102-150,B:20-200");
+        GtidSet result = fixRestoredGtidSet(serverGtidSet, restoredGtidSet);
 
-        serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
-        restoredGtidSet = new GtidSet("A:106-150,C:1-100");
-        assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
-                .isEqualTo("A:1-100:102-150,B:20-200,C:1-100");
+        assertThat(result.toString()).isEqualTo(expectedStr);
+    }
 
-        serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
-        restoredGtidSet = new GtidSet("A:106-150:152-200,C:1-100");
-        assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
-                .isEqualTo("A:1-100:102-200,B:20-200,C:1-100");
+    private static Stream<Arguments> gtidSetsProvider() {
+        return Stream.of(
+                Arguments.of(
+                        "Basic example with a straightforward subset",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:63-100",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:63-100"),
+                Arguments.of(
+                        "Restored starts midrange, single gap",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:45-80",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-80"),
+                Arguments.of(
+                        "Multiple intervals with gaps in restored",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:45-80:83-90:92-98",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-80:83-90:92-98"),
+                Arguments.of(
+                        "Server has disjoint intervals, restored partially overlaps",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:60-90:95-200",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:45-50:65-70:96-100",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:65-70:96-100"),
+                Arguments.of(
+                        "Restored completely covers server range",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100"),
+                Arguments.of(
+                        "Restored partially covers server range",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100:102-200",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:106-150:152-200",
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100:102-150:152-200"));
     }
 
     @Test
     void testMergingGtidSets() {
-        GtidSet base = new GtidSet("A:1-100");
-        GtidSet toMerge = new GtidSet("A:1-10");
-        assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100");
+        GtidSet base = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
+        GtidSet toMerge = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-10");
+        assertThat(mergeGtidSetInto(base, toMerge).toString())
+                .isEqualTo("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
 
-        base = new GtidSet("A:1-100");
-        toMerge = new GtidSet("B:1-10");
-        assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100,B:1-10");
+        base = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
+        toMerge = new GtidSet("a32e8354-04b8-42a9-bfe3-12d71e809820:1-10");
+        assertThat(mergeGtidSetInto(base, toMerge).toString())
+                .isEqualTo(
+                        "a32e8354-04b8-42a9-bfe3-12d71e809820:1-10,e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
 
-        base = new GtidSet("A:1-100,C:1-100");
-        toMerge = new GtidSet("A:1-10,B:1-10");
-        assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100,B:1-10,C:1-100");
+        base =
+                new GtidSet(
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100,c22e8354-04b8-42a9-bfe3-12d71e809831:1-100");
+        toMerge =
+                new GtidSet(
+                        "e21e8354-04b8-42a9-bfe3-12d71e809836:1-10,a32e8354-04b8-42a9-bfe3-12d71e809820:1-10");
+        assertThat(mergeGtidSetInto(base, toMerge).toString())
+                .isEqualTo(
+                        "a32e8354-04b8-42a9-bfe3-12d71e809820:1-10,c22e8354-04b8-42a9-bfe3-12d71e809831:1-100,e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
     }
 }