diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 22d14b739..4108976c2 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -265,6 +265,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { shutdownEntry(entry); } + private void removeEntry(Integer slot, MasterSlaveEntry entry) { + if (slot2entry.compareAndSet(slot, entry, null)) { + shutdownEntry(entry); + } + } + private void shutdownEntry(MasterSlaveEntry entry) { if (entry != null && entry.decReference() == 0) { entry.getAllEntries().forEach(e -> { @@ -337,6 +343,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (Integer slot : partition.getSlots()) { addEntry(slot, entry); lastPartitions.put(slot, partition); + partition.incReference(); } if (partition.getSlotsAmount() > 0) { lastUri2Partition.put(partition.getMasterAddress(), partition); @@ -691,18 +698,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { .filter(s -> newPartitions.stream().noneMatch(p -> p.hasSlot(s))) .collect(Collectors.toSet()); - for (Integer removedSlot : removedSlots) { - ClusterPartition p = lastPartitions.remove(removedSlot); - if (p != null) { + for (Integer slot : removedSlots) { + ClusterPartition p = lastPartitions.remove(slot); + if (p != null && p.decReference() == 0) { lastUri2Partition.remove(p.getMasterAddress()); } + removeEntry(slot); } if (!removedSlots.isEmpty()) { - log.info("{} slots found to remove", removedSlots.size()); - } - - for (Integer slot : removedSlots) { - removeEntry(slot); + log.info("{} slots removed", removedSlots.size()); } Integer addedSlots = 0; @@ -716,13 +720,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (entry != null) { addEntry(slot, entry); lastPartitions.put(slot, clusterPartition); + clusterPartition.incReference(); lastUri2Partition.put(clusterPartition.getMasterAddress(), clusterPartition); addedSlots++; } } } if (addedSlots > 0) { - log.info("{} slots found to add", addedSlots); + log.info("{} slots added", addedSlots); } } @@ -749,6 +754,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { addedSlots.stream().forEach(slot -> { addEntry(slot, entry); lastPartitions.put(slot, currentPartition); + currentPartition.incReference(); changedSlots.add(slot); }); if (!addedSlots.isEmpty()) { @@ -761,8 +767,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { removedSlots.stream().forEach(slot -> { if (lastPartitions.remove(slot, currentPartition)) { - lastUri2Partition.remove(currentPartition.getMasterAddress()); - removeEntry(slot); + if (currentPartition.decReference() == 0) { + lastUri2Partition.remove(currentPartition.getMasterAddress()); + } + removeEntry(slot, entry); changedSlots.add(slot); } }); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java b/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java index c7cd45fa7..8988b4e78 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -43,6 +43,8 @@ public class ClusterPartition { private Set slotRanges = Collections.emptySet(); private ClusterPartition parent; + + private int references; public ClusterPartition(String nodeId) { super(); @@ -140,7 +142,14 @@ public class ClusterPartition { slaveAddresses.remove(uri); failedSlaves.remove(uri); } - + + public void incReference() { + references++; + } + public int decReference() { + return --references; + } + @Override public int hashCode() { return Objects.hash(nodeId);