Fixed - Cluster failover handling #5857

pull/5957/head
Nikita Koksharov 8 months ago
parent 5f16dbf42e
commit 3a8c757448

@ -36,6 +36,7 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BinaryOperator;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -983,7 +984,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
private Collection<ClusterPartition> getLastPartitions() { private Collection<ClusterPartition> getLastPartitions() {
return new HashSet<>(lastUri2Partition.values()); return lastUri2Partition.values().stream().collect(Collectors.toMap(e -> e.getNodeId(), Function.identity(),
BinaryOperator.maxBy(Comparator.comparing(e -> e.getTime())))).values();
} }
public int getSlot(MasterSlaveEntry entry) { public int getSlot(MasterSlaveEntry entry) {

@ -45,6 +45,8 @@ public class ClusterPartition {
private ClusterPartition parent; private ClusterPartition parent;
private int references; private int references;
private long time = System.currentTimeMillis();
public ClusterPartition(String nodeId) { public ClusterPartition(String nodeId) {
super(); super();
@ -150,6 +152,10 @@ public class ClusterPartition {
return --references; return --references;
} }
public long getTime() {
return time;
}
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(nodeId); return Objects.hash(nodeId);

Loading…
Cancel
Save