FLINK-37065: MySQL cdc, fix method 'fixRestoredGtidSet' to support gtid sets with gaps

pull/3845/head
Ihor Mielientiev 3 weeks ago
parent fc71888d7a
commit 8ec8ca344a

@ -18,6 +18,8 @@
package io.debezium.connector.mysql;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -36,36 +38,53 @@ public class GtidUtils {
public static GtidSet fixRestoredGtidSet(GtidSet serverGtidSet, GtidSet restoredGtidSet) {
Map<String, GtidSet.UUIDSet> newSet = new HashMap<>();
serverGtidSet.getUUIDSets().forEach(uuidSet -> newSet.put(uuidSet.getUUID(), uuidSet));
for (GtidSet.UUIDSet uuidSet : restoredGtidSet.getUUIDSets()) {
GtidSet.UUIDSet serverUuidSet = newSet.get(uuidSet.getUUID());
for (GtidSet.UUIDSet restoredUuidSet : restoredGtidSet.getUUIDSets()) {
GtidSet.UUIDSet serverUuidSet = newSet.get(restoredUuidSet.getUUID());
if (serverUuidSet != null) {
long restoredIntervalEnd = getIntervalEnd(uuidSet);
List<com.github.shyiko.mysql.binlog.GtidSet.Interval> newIntervals =
new ArrayList<>();
for (GtidSet.Interval serverInterval : serverUuidSet.getIntervals()) {
if (serverInterval.getEnd() <= restoredIntervalEnd) {
newIntervals.add(
List<GtidSet.Interval> serverIntervals = serverUuidSet.getIntervals();
List<GtidSet.Interval> restoredIntervals = restoredUuidSet.getIntervals();
long earliestRestoredTx = getMinIntervalStart(restoredIntervals);
List<com.github.shyiko.mysql.binlog.GtidSet.Interval> merged = new ArrayList<>();
for (GtidSet.Interval serverInterval : serverIntervals) {
if (serverInterval.getEnd() < earliestRestoredTx) {
merged.add(
new com.github.shyiko.mysql.binlog.GtidSet.Interval(
serverInterval.getStart(), serverInterval.getEnd()));
} else if (serverInterval.getStart() <= restoredIntervalEnd
&& serverInterval.getEnd() > restoredIntervalEnd) {
newIntervals.add(
} else if (serverInterval.getStart() < earliestRestoredTx
&& serverInterval.getEnd() >= earliestRestoredTx) {
merged.add(
new com.github.shyiko.mysql.binlog.GtidSet.Interval(
serverInterval.getStart(), restoredIntervalEnd));
serverInterval.getStart(), earliestRestoredTx - 1));
}
}
newSet.put(
uuidSet.getUUID(),
for (GtidSet.Interval restoredInterval : restoredIntervals) {
merged.add(
new com.github.shyiko.mysql.binlog.GtidSet.Interval(
restoredInterval.getStart(), restoredInterval.getEnd()));
}
GtidSet.UUIDSet mergedUuidSet =
new GtidSet.UUIDSet(
new com.github.shyiko.mysql.binlog.GtidSet.UUIDSet(
uuidSet.getUUID(), newIntervals)));
restoredUuidSet.getUUID(), merged));
newSet.put(restoredUuidSet.getUUID(), mergedUuidSet);
} else {
newSet.put(uuidSet.getUUID(), uuidSet);
newSet.put(restoredUuidSet.getUUID(), restoredUuidSet);
}
}
return new GtidSet(newSet);
}
private static long getMinIntervalStart(List<GtidSet.Interval> intervals) {
return Collections.min(intervals, Comparator.comparingLong(GtidSet.Interval::getStart))
.getStart();
}
/**
* This method merges one GTID set (toMerge) into another (base), without overwriting the
* existing elements in the base GTID set.
@ -80,11 +99,4 @@ public class GtidUtils {
}
return new GtidSet(newSet);
}
private static long getIntervalEnd(GtidSet.UUIDSet uuidSet) {
return uuidSet.getIntervals().stream()
.mapToLong(GtidSet.Interval::getEnd)
.max()
.getAsLong();
}
}

@ -18,6 +18,11 @@
package io.debezium.connector.mysql;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.stream.Stream;
import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet;
import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto;
@ -25,46 +30,74 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Unit test for {@link GtidUtils}. */
class GtidUtilsTest {
@Test
void testFixingRestoredGtidSet() {
GtidSet serverGtidSet = new GtidSet("A:1-100");
GtidSet restoredGtidSet = new GtidSet("A:30-100");
assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
.isEqualTo("A:1-100");
serverGtidSet = new GtidSet("A:1-100");
restoredGtidSet = new GtidSet("A:30-50");
assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
.isEqualTo("A:1-50");
@ParameterizedTest(name = "{0}")
@MethodSource("gtidSetsProvider")
void testFixingRestoredGtidSet(
String description, String serverStr, String restoredStr, String expectedStr) {
GtidSet serverGtidSet = new GtidSet(serverStr);
GtidSet restoredGtidSet = new GtidSet(restoredStr);
serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
restoredGtidSet = new GtidSet("A:106-150");
assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
.isEqualTo("A:1-100:102-150,B:20-200");
GtidSet result = fixRestoredGtidSet(serverGtidSet, restoredGtidSet);
serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
restoredGtidSet = new GtidSet("A:106-150,C:1-100");
assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
.isEqualTo("A:1-100:102-150,B:20-200,C:1-100");
assertThat(result.toString()).isEqualTo(expectedStr);
}
serverGtidSet = new GtidSet("A:1-100:102-200,B:20-200");
restoredGtidSet = new GtidSet("A:106-150:152-200,C:1-100");
assertThat(fixRestoredGtidSet(serverGtidSet, restoredGtidSet).toString())
.isEqualTo("A:1-100:102-200,B:20-200,C:1-100");
private static Stream<Arguments> gtidSetsProvider() {
return Stream.of(
Arguments.of(
"Basic example with a straightforward subset",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:63-100",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:63-100"),
Arguments.of(
"Restored starts midrange, single gap",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
"e21e8354-04b8-42a9-bfe3-12d71e809836:45-80",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-80"),
Arguments.of(
"Multiple intervals with gaps in restored",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
"e21e8354-04b8-42a9-bfe3-12d71e809836:45-80:83-90:92-98",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-80:83-90:92-98"),
Arguments.of(
"Server has disjoint intervals, restored partially overlaps",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:60-90:95-200",
"e21e8354-04b8-42a9-bfe3-12d71e809836:45-50:65-70:96-100",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-50:65-70:96-100"),
Arguments.of(
"Restored completely covers server range",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100"),
Arguments.of(
"Restored partially covers server range",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100:102-200",
"e21e8354-04b8-42a9-bfe3-12d71e809836:106-150:152-200",
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100:102-150:152-200"));
}
@Test
void testMergingGtidSets() {
GtidSet base = new GtidSet("A:1-100");
GtidSet toMerge = new GtidSet("A:1-10");
assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100");
GtidSet base = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
GtidSet toMerge = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-10");
assertThat(mergeGtidSetInto(base, toMerge).toString())
.isEqualTo("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
base = new GtidSet("A:1-100");
toMerge = new GtidSet("B:1-10");
assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100,B:1-10");
base = new GtidSet("e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
toMerge = new GtidSet("a32e8354-04b8-42a9-bfe3-12d71e809820:1-10");
assertThat(mergeGtidSetInto(base, toMerge).toString())
.isEqualTo(
"a32e8354-04b8-42a9-bfe3-12d71e809820:1-10,e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
base = new GtidSet("A:1-100,C:1-100");
toMerge = new GtidSet("A:1-10,B:1-10");
assertThat(mergeGtidSetInto(base, toMerge).toString()).isEqualTo("A:1-100,B:1-10,C:1-100");
base =
new GtidSet(
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-100,c22e8354-04b8-42a9-bfe3-12d71e809831:1-100");
toMerge =
new GtidSet(
"e21e8354-04b8-42a9-bfe3-12d71e809836:1-10,a32e8354-04b8-42a9-bfe3-12d71e809820:1-10");
assertThat(mergeGtidSetInto(base, toMerge).toString())
.isEqualTo(
"a32e8354-04b8-42a9-bfe3-12d71e809820:1-10,c22e8354-04b8-42a9-bfe3-12d71e809831:1-100,e21e8354-04b8-42a9-bfe3-12d71e809836:1-100");
}
}

Loading…
Cancel
Save