diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java index fe25208f6..cd89e99b1 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java @@ -18,6 +18,8 @@ package io.debezium.connector.mysql; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,36 +38,53 @@ public class GtidUtils { public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) { Map newSet = new HashMap<>(); serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet)); - for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) { - GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID()); + for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) { + GtidSet.UUIDSet serverUuidSet = newSet.get(restoredUuidSet.getUUID()); if (serverUuidSet != null) { - long restoredIntervalEnd = getIntervalEnd(uuidSet); - List newIntervals = - new ArrayList<>(); - for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) { - if (serverInterval.getEnd() <= restoredIntervalEnd) { - newIntervals.add( + List serverIntervals = serverUuidSet.getIntervals(); + List restoredIntervals = restoredUuidSet.getIntervals(); + + long earliestRestoredTx = getMinIntervalStart(restoredIntervals); + + List merged = new ArrayList<>(); + + for (GtidSet.Interval serverInterval : serverIntervals) { + if (serverInterval.getEnd() < earliestRestoredTx) { + merged.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( serverInterval.getStart(), serverInterval.getEnd())); - } else if (serverInterval.getStart() <= restoredIntervalEnd - && serverInterval.getEnd() > restoredIntervalEnd) { - newIntervals.add( + } else if (serverInterval.getStart() < earliestRestoredTx + && serverInterval.getEnd() >= earliestRestoredTx) { + merged.add( new com.github.shyiko.mysql.binlog.GtidSet.Interval( - serverInterval.getStart(), restoredIntervalEnd)); + serverInterval.getStart(), earliestRestoredTx - 1)); } } - newSet.put( - uuidSet.getUUID(), + + for (GtidSet.Interval restoredInterval : restoredIntervals) { + merged.add( + new com.github.shyiko.mysql.binlog.GtidSet.Interval( + restoredInterval.getStart(), restoredInterval.getEnd())); + } + + GtidSet.UUIDSet mergedUuidSet = new GtidSet.UUIDSet( new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet( - uuidSet.getUUID(), newIntervals))); + restoredUuidSet.getUUID(), merged)); + + newSet.put(restoredUuidSet.getUUID(), mergedUuidSet); } else { - newSet.put(uuidSet.getUUID(), uuidSet); + newSet.put(restoredUuidSet.getUUID(), restoredUuidSet); } } return new GtidSet(newSet); } + private static long getMinIntervalStart(List intervals) { + return Collections.min(intervals, Comparator.comparingLong(GtidSet.Interval::getStart)) + .getStart(); + } + /** * This method merges one GTID set (toMerge) into another (base), without overwriting the * existing elements in the base GTID set. @@ -80,11 +99,4 @@ public class GtidUtils { } return new GtidSet(newSet); } - - private static long getIntervalEnd(GtidSet.UUIDSet uuidSet) { - return uuidSet.getIntervals().stream() - .mapToLong(GtidSet.Interval::getEnd) - .max() - .getAsLong(); - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java index f4c28453d..70601b96e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java @@ -18,6 +18,11 @@ package io.debezium.connector.mysql; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet; import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto; @@ -25,46 +30,74 @@ import static org.assertj.core.api.Assertions.assertThat; /** Unit test for {@link GtidUtils}. */ class GtidUtilsTest { - @Test - void testFixingRestoredGtidSet() { - GtidSet serverGtidSet = new GtidSet("A:1-100"); - GtidSet restoredGtidSet = new GtidSet("A:30-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()) - .isEqualTo("A:1-100"); - serverGtidSet = new GtidSet("A:1-100"); - restoredGtidSet = new GtidSet("A:30-50"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()) - .isEqualTo("A:1-50"); + @ParameterizedTest(name = "{0}") + @MethodSource("gtidSetsProvider") + void testFixingRestoredGtidSet( + String description, String serverStr, String restoredStr, String expectedStr) { + GtidSet serverGtidSet = new GtidSet(serverStr); + GtidSet restoredGtidSet = new GtidSet(restoredStr); - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()) - .isEqualTo("A:1-100:102-150,B:20-200"); + GtidSet result = fixRestoredGtidSet(serverGtidSet, restoredGtidSet); - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150,C:1-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()) - .isEqualTo("A:1-100:102-150,B:20-200,C:1-100"); + assertThat(result.toString()).isEqualTo(expectedStr); + } - serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200"); - restoredGtidSet = new GtidSet("A:106-150:152-200,C:1-100"); - assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString()) - .isEqualTo("A:1-100:102-200,B:20-200,C:1-100"); + private static Stream gtidSetsProvider() { + return Stream.of( + Arguments.of( + "Basic example with a straightforward subset", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:63-100", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:63-100"), + Arguments.of( + "Restored starts midrange, single gap", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100", + "e21e8354-04b8-42a9-bfe3-12d71e809836:45-80", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-80"), + Arguments.of( + "Multiple intervals with gaps in restored", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100", + "e21e8354-04b8-42a9-bfe3-12d71e809836:45-80:83-90:92-98", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-80:83-90:92-98"), + Arguments.of( + "Server has disjoint intervals, restored partially overlaps", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:60-90:95-200", + "e21e8354-04b8-42a9-bfe3-12d71e809836:45-50:65-70:96-100", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:65-70:96-100"), + Arguments.of( + "Restored completely covers server range", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100"), + Arguments.of( + "Restored partially covers server range", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100:102-200", + "e21e8354-04b8-42a9-bfe3-12d71e809836:106-150:152-200", + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100:102-150:152-200")); } @Test void testMergingGtidSets() { - GtidSet base = new GtidSet("A:1-100"); - GtidSet toMerge = new GtidSet("A:1-10"); - assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100"); + GtidSet base = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100"); + GtidSet toMerge = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-10"); + assertThat(mergeGtidSetInto(base, toMerge).toString()) + .isEqualTo("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100"); - base = new GtidSet("A:1-100"); - toMerge = new GtidSet("B:1-10"); - assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100,B:1-10"); + base = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100"); + toMerge = new GtidSet("a32e8354-04b8-42a9-bfe3-12d71e809820:1-10"); + assertThat(mergeGtidSetInto(base, toMerge).toString()) + .isEqualTo( + "a32e8354-04b8-42a9-bfe3-12d71e809820:1-10,e21e8354-04b8-42a9-bfe3-12d71e809836:1-100"); - base = new GtidSet("A:1-100,C:1-100"); - toMerge = new GtidSet("A:1-10,B:1-10"); - assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100,B:1-10,C:1-100"); + base = + new GtidSet( + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-100,c22e8354-04b8-42a9-bfe3-12d71e809831:1-100"); + toMerge = + new GtidSet( + "e21e8354-04b8-42a9-bfe3-12d71e809836:1-10,a32e8354-04b8-42a9-bfe3-12d71e809820:1-10"); + assertThat(mergeGtidSetInto(base, toMerge).toString()) + .isEqualTo( + "a32e8354-04b8-42a9-bfe3-12d71e809820:1-10,c22e8354-04b8-42a9-bfe3-12d71e809831:1-100,e21e8354-04b8-42a9-bfe3-12d71e809836:1-100"); } }