diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index fbd4c5a74..e561ebdcc 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -399,7 +400,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { continue; } - MasterSlaveEntry entry = getEntry(currentPart.getSlots().iterator().next()); + MasterSlaveEntry entry = getEntry(currentPart.slots().nextSetBit(0)); // should be invoked first in order to remove stale failedSlaveAddresses Set addedSlaves = addRemoveSlaves(entry, currentPart, newPart); // Do some slaves have changed state from failed to alive? @@ -464,7 +465,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private int slotsAmount(Collection partitions) { int result = 0; for (ClusterPartition clusterPartition : partitions) { - result += clusterPartition.getSlots().size(); + result += clusterPartition.getSlotsAmount(); } return result; } @@ -558,7 +559,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (Integer slot : lastPartitions.keySet()) { boolean found = false; for (ClusterPartition clusterPartition : newPartitions) { - if (clusterPartition.getSlots().contains(slot)) { + if (clusterPartition.hasSlot(slot)) { found = true; break; } @@ -580,34 +581,34 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } } - Set addedSlots = new HashSet(); + BitSet addedSlots = new BitSet(); for (ClusterPartition clusterPartition : newPartitions) { for (Integer slot : clusterPartition.getSlots()) { if (!lastPartitions.containsKey(slot)) { - addedSlots.add(slot); + addedSlots.set(slot); } } } if (!addedSlots.isEmpty()) { log.info("{} slots found to add", addedSlots.size()); } - for (Integer slot : addedSlots) { + for (Integer slot : (Iterable) addedSlots.stream()::iterator) { ClusterPartition partition = find(newPartitions, slot); - Set oldSlots = new HashSet(partition.getSlots()); - oldSlots.removeAll(addedSlots); + BitSet oldSlots = partition.copySlots(); + oldSlots.andNot(addedSlots); if (oldSlots.isEmpty()) { continue; } - MasterSlaveEntry entry = getEntry(oldSlots.iterator().next()); + MasterSlaveEntry entry = getEntry(oldSlots.nextSetBit(0)); if (entry != null) { addEntry(slot, entry); lastPartitions.put(slot, partition); } } } - + private void checkSlotsMigration(Collection newPartitions) { for (ClusterPartition currentPartition : getLastPartitions()) { for (ClusterPartition newPartition : newPartitions) { @@ -615,13 +616,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { continue; } - MasterSlaveEntry entry = getEntry(currentPartition.getSlots().iterator().next()); - Set addedSlots = new HashSet(newPartition.getSlots()); - addedSlots.removeAll(currentPartition.getSlots()); + MasterSlaveEntry entry = getEntry(currentPartition.slots().nextSetBit(0)); + BitSet addedSlots = newPartition.copySlots(); + addedSlots.andNot(currentPartition.slots()); currentPartition.addSlots(addedSlots); - for (Integer slot : addedSlots) { + for (Integer slot : (Iterable) addedSlots.stream()::iterator) { addEntry(slot, entry); lastPartitions.put(slot, currentPartition); } @@ -629,9 +630,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddress()); } - Set removedSlots = new HashSet(currentPartition.getSlots()); - removedSlots.removeAll(newPartition.getSlots()); - for (Integer removeSlot : removedSlots) { + BitSet removedSlots = currentPartition.copySlots(); + removedSlots.andNot(newPartition.slots()); + for (Integer removeSlot : (Iterable) removedSlots.stream()::iterator) { if (lastPartitions.remove(removeSlot, currentPartition)) { removeEntry(removeSlot); } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java b/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java index fcc667448..63abed343 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterPartition.java @@ -16,6 +16,7 @@ package org.redisson.cluster; import java.net.URI; +import java.util.BitSet; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -37,7 +38,7 @@ public class ClusterPartition { private final Set slaveAddresses = new HashSet(); private final Set failedSlaves = new HashSet(); - private final Set slots = new HashSet(); + private final BitSet slots = new BitSet(); private final Set slotRanges = new HashSet(); private ClusterPartition parent; @@ -74,18 +75,18 @@ public class ClusterPartition { return masterFail; } - public void addSlots(Set slots) { - this.slots.addAll(slots); + public void addSlots(BitSet slots) { + this.slots.or(slots); } - public void removeSlots(Set slots) { - this.slots.removeAll(slots); + public void removeSlots(BitSet slots) { + this.slots.andNot(slots); } public void addSlotRanges(Set ranges) { for (ClusterSlotRange clusterSlotRange : ranges) { for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { - slots.add(i); + slots.set(i); } } slotRanges.addAll(ranges); @@ -93,7 +94,7 @@ public class ClusterPartition { public void removeSlotRanges(Set ranges) { for (ClusterSlotRange clusterSlotRange : ranges) { for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) { - slots.remove(i); + slots.clear(i); } } slotRanges.removeAll(ranges); @@ -101,9 +102,26 @@ public class ClusterPartition { public Set getSlotRanges() { return slotRanges; } - public Set getSlots() { + + public Iterable getSlots() { + return (Iterable) slots.stream()::iterator; + } + + public BitSet slots() { return slots; } + + public BitSet copySlots() { + return (BitSet) slots.clone(); + } + + public boolean hasSlot(int slot) { + return slots.get(slot); + } + + public int getSlotsAmount() { + return slots.cardinality(); + } public URI getMasterAddress() { return masterAddress;