From c86af4ec18a99caa8fcc3ca9f71f870445504519 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 30 May 2024 10:51:23 +0300 Subject: [PATCH] Fixed - Cluster failover handling #5857 --- .../cluster/ClusterConnectionManager.java | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 3b6f73510..3a26eefa3 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -338,7 +338,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { addEntry(slot, entry); lastPartitions.put(slot, partition); } - if (partition.getSlotsAmount() > 0) { + if (partition.getSlotsAmount() > 0 + && partition.getMasterAddress() != null) { lastUri2Partition.put(partition.getMasterAddress(), partition); } @@ -498,6 +499,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private CompletableFuture checkSlaveNodesChange(Collection newPartitions) { List> futures = new ArrayList<>(); for (ClusterPartition newPart : newPartitions) { + if (newPart.getMasterAddress() == null) { + continue; + } ClusterPartition currentPart = lastUri2Partition.get(newPart.getMasterAddress()); if (currentPart == null) { continue; @@ -630,7 +634,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { List> futures = new ArrayList<>(); for (ClusterPartition newPart : newPartitions) { - if (newPart.getSlotsAmount() == 0) { + if (newPart.getSlotsAmount() == 0 + || newPart.getMasterAddress() == null) { continue; } @@ -693,7 +698,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (Integer removedSlot : removedSlots) { ClusterPartition p = lastPartitions.remove(removedSlot); - if (p != null) { + if (p != null && p.getMasterAddress() != null) { lastUri2Partition.remove(p.getMasterAddress()); } } @@ -708,6 +713,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { Integer addedSlots = 0; for (ClusterPartition clusterPartition : newPartitions) { MasterSlaveEntry entry = getEntry(clusterPartition.getMasterAddress()); + boolean hasNewSlots = false; for (Integer slot : clusterPartition.getSlots()) { if (lastPartitions.containsKey(slot)) { continue; @@ -716,10 +722,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (entry != null) { addEntry(slot, entry); lastPartitions.put(slot, clusterPartition); - lastUri2Partition.put(clusterPartition.getMasterAddress(), clusterPartition); addedSlots++; + hasNewSlots = true; } } + if (hasNewSlots && clusterPartition.getMasterAddress() != null) { + lastUri2Partition.put(clusterPartition.getMasterAddress(), clusterPartition); + } } if (addedSlots > 0) { log.info("{} slots found to add", addedSlots); @@ -751,7 +760,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { lastPartitions.put(slot, currentPartition); changedSlots.add(slot); }); - if (!addedSlots.isEmpty()) { + if (!addedSlots.isEmpty() && currentPartition.getMasterAddress() != null) { lastUri2Partition.put(currentPartition.getMasterAddress(), currentPartition); log.info("{} slots added to {}", addedSlots.cardinality(), currentPartition.getMasterAddress()); } @@ -761,7 +770,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { removedSlots.stream().forEach(slot -> { if (lastPartitions.remove(slot, currentPartition)) { - lastUri2Partition.remove(currentPartition.getMasterAddress()); + if (currentPartition.getMasterAddress() != null) { + lastUri2Partition.remove(currentPartition.getMasterAddress()); + } removeEntry(slot); changedSlots.add(slot); }