Fixed - Cluster failover handling #5857

pull/5924/head
Nikita Koksharov 9 months ago
parent 4e6c0be911
commit c86af4ec18

@ -338,7 +338,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, partition); lastPartitions.put(slot, partition);
} }
if (partition.getSlotsAmount() > 0) { if (partition.getSlotsAmount() > 0
&& partition.getMasterAddress() != null) {
lastUri2Partition.put(partition.getMasterAddress(), partition); lastUri2Partition.put(partition.getMasterAddress(), partition);
} }
@ -498,6 +499,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private CompletableFuture<Void> checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) { private CompletableFuture<Void> checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
List<CompletableFuture<?>> futures = new ArrayList<>(); List<CompletableFuture<?>> futures = new ArrayList<>();
for (ClusterPartition newPart : newPartitions) { for (ClusterPartition newPart : newPartitions) {
if (newPart.getMasterAddress() == null) {
continue;
}
ClusterPartition currentPart = lastUri2Partition.get(newPart.getMasterAddress()); ClusterPartition currentPart = lastUri2Partition.get(newPart.getMasterAddress());
if (currentPart == null) { if (currentPart == null) {
continue; continue;
@ -630,7 +634,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
List<CompletableFuture<?>> futures = new ArrayList<>(); List<CompletableFuture<?>> futures = new ArrayList<>();
for (ClusterPartition newPart : newPartitions) { for (ClusterPartition newPart : newPartitions) {
if (newPart.getSlotsAmount() == 0) { if (newPart.getSlotsAmount() == 0
|| newPart.getMasterAddress() == null) {
continue; continue;
} }
@ -693,7 +698,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (Integer removedSlot : removedSlots) { for (Integer removedSlot : removedSlots) {
ClusterPartition p = lastPartitions.remove(removedSlot); ClusterPartition p = lastPartitions.remove(removedSlot);
if (p != null) { if (p != null && p.getMasterAddress() != null) {
lastUri2Partition.remove(p.getMasterAddress()); lastUri2Partition.remove(p.getMasterAddress());
} }
} }
@ -708,6 +713,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Integer addedSlots = 0; Integer addedSlots = 0;
for (ClusterPartition clusterPartition : newPartitions) { for (ClusterPartition clusterPartition : newPartitions) {
MasterSlaveEntry entry = getEntry(clusterPartition.getMasterAddress()); MasterSlaveEntry entry = getEntry(clusterPartition.getMasterAddress());
boolean hasNewSlots = false;
for (Integer slot : clusterPartition.getSlots()) { for (Integer slot : clusterPartition.getSlots()) {
if (lastPartitions.containsKey(slot)) { if (lastPartitions.containsKey(slot)) {
continue; continue;
@ -716,10 +722,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (entry != null) { if (entry != null) {
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, clusterPartition); lastPartitions.put(slot, clusterPartition);
lastUri2Partition.put(clusterPartition.getMasterAddress(), clusterPartition);
addedSlots++; addedSlots++;
hasNewSlots = true;
} }
} }
if (hasNewSlots && clusterPartition.getMasterAddress() != null) {
lastUri2Partition.put(clusterPartition.getMasterAddress(), clusterPartition);
}
} }
if (addedSlots > 0) { if (addedSlots > 0) {
log.info("{} slots found to add", addedSlots); log.info("{} slots found to add", addedSlots);
@ -751,7 +760,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
lastPartitions.put(slot, currentPartition); lastPartitions.put(slot, currentPartition);
changedSlots.add(slot); changedSlots.add(slot);
}); });
if (!addedSlots.isEmpty()) { if (!addedSlots.isEmpty() && currentPartition.getMasterAddress() != null) {
lastUri2Partition.put(currentPartition.getMasterAddress(), currentPartition); lastUri2Partition.put(currentPartition.getMasterAddress(), currentPartition);
log.info("{} slots added to {}", addedSlots.cardinality(), currentPartition.getMasterAddress()); log.info("{} slots added to {}", addedSlots.cardinality(), currentPartition.getMasterAddress());
} }
@ -761,7 +770,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
removedSlots.stream().forEach(slot -> { removedSlots.stream().forEach(slot -> {
if (lastPartitions.remove(slot, currentPartition)) { if (lastPartitions.remove(slot, currentPartition)) {
if (currentPartition.getMasterAddress() != null) {
lastUri2Partition.remove(currentPartition.getMasterAddress()); lastUri2Partition.remove(currentPartition.getMasterAddress());
}
removeEntry(slot); removeEntry(slot);
changedSlots.add(slot); changedSlots.add(slot);
} }

Loading…
Cancel
Save