[hotfix][mysql] Keep assigned splits in order to fix wrong meta group calculation

This closes #2421.

Co-authored-by: Leonard Xu <leonard@apache.org>
(cherry picked from commit dddab4d62b)
pull/2694/head
North.Lin 1 year ago committed by Leonard Xu
parent 2ec183a6c1
commit d4461f2383

@ -43,13 +43,14 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
@ -106,7 +107,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
currentParallelism,
new ArrayList<>(),
new ArrayList<>(),
new HashMap<>(),
new LinkedHashMap<>(),
new HashMap<>(),
new HashMap<>(),
AssignerStatus.INITIAL_ASSIGNING,
@ -152,7 +153,17 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
this.assignedSplits = assignedSplits;
// When job restore from savepoint, sort the existing tables and newly added tables
// to let enumerator only send newly added tables' BinlogSplitMetaEvent
this.assignedSplits =
assignedSplits.entrySet().stream()
.sorted(Entry.comparingByKey())
.collect(
Collectors.toMap(
Entry::getKey,
Entry::getValue,
(o, o2) -> o,
LinkedHashMap::new));
this.tableSchemas = tableSchemas;
this.splitFinishedOffsets = splitFinishedOffsets;
this.assignerStatus = assignerStatus;
@ -232,7 +243,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
// remove unassigned tables/splits if it does not satisfy new table filter
List<String> splitsToRemove = new LinkedList<>();
for (Map.Entry<String, MySqlSchemalessSnapshotSplit> splitEntry :
for (Entry<String, MySqlSchemalessSnapshotSplit> splitEntry :
assignedSplits.entrySet()) {
if (tablesToRemove.contains(splitEntry.getValue().getTableId())) {
splitsToRemove.add(splitEntry.getKey());
@ -367,9 +378,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
"The assigner is not ready to offer finished split information, this should not be called");
}
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
assignedSplits.values().stream()
.sorted(Comparator.comparing(MySqlSplit::splitId))
.collect(Collectors.toList());
new ArrayList<>(assignedSplits.values());
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());

@ -471,20 +471,24 @@ public class MySqlSourceReader<T>
private Set<String> getExistedSplitsOfLastGroup(
List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int metaGroupSize) {
Set<String> existedSplitsOfLastGroup = new HashSet<>();
int splitsNumOfLastGroup =
finishedSnapshotSplits.size() % sourceConfig.getSplitMetaGroupSize();
if (splitsNumOfLastGroup != 0) {
int lastGroupStart =
((int) (finishedSnapshotSplits.size() / sourceConfig.getSplitMetaGroupSize()))
* metaGroupSize;
existedSplitsOfLastGroup =
finishedSnapshotSplits
.subList(lastGroupStart, lastGroupStart + splitsNumOfLastGroup).stream()
// Keep same order with MySqlHybridSplitAssigner.createBinlogSplit() to avoid
// 'invalid request meta group id' error
List<String> sortedFinishedSnapshotSplits =
finishedSnapshotSplits.stream()
.map(FinishedSnapshotSplitInfo::getSplitId)
.collect(Collectors.toSet());
.sorted()
.collect(Collectors.toList());
return new HashSet<>(
sortedFinishedSnapshotSplits.subList(
lastGroupStart, lastGroupStart + splitsNumOfLastGroup));
}
return existedSplitsOfLastGroup;
return new HashSet<>();
}
private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) {

Loading…
Cancel
Save