|
|
|
@ -408,8 +408,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
Set<ClusterPartition> lastPartitions = getLastPartitions();
|
|
|
|
|
for (ClusterPartition newPart : newPartitions) {
|
|
|
|
|
for (ClusterPartition currentPart : getLastPartitions()) {
|
|
|
|
|
for (ClusterPartition currentPart : lastPartitions) {
|
|
|
|
|
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@ -479,10 +480,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
return addedSlaves;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Collection<Integer> slots(Collection<ClusterPartition> partitions) {
|
|
|
|
|
Set<Integer> result = new HashSet<Integer>(MAX_SLOT);
|
|
|
|
|
private int slotsAmount(Collection<ClusterPartition> partitions) {
|
|
|
|
|
int result = 0;
|
|
|
|
|
for (ClusterPartition clusterPartition : partitions) {
|
|
|
|
|
result.addAll(clusterPartition.getSlots());
|
|
|
|
|
result += clusterPartition.getSlots().size();
|
|
|
|
|
}
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
@ -500,9 +501,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
private RFuture<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
List<ClusterPartition> newMasters = new ArrayList<ClusterPartition>();
|
|
|
|
|
Set<ClusterPartition> lastPartitions = getLastPartitions();
|
|
|
|
|
for (final ClusterPartition newPart : newPartitions) {
|
|
|
|
|
boolean masterFound = false;
|
|
|
|
|
for (ClusterPartition currentPart : getLastPartitions()) {
|
|
|
|
|
for (ClusterPartition currentPart : lastPartitions) {
|
|
|
|
|
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
@ -567,13 +569,24 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
Collection<Integer> newPartitionsSlots = slots(newPartitions);
|
|
|
|
|
if (newPartitionsSlots.size() == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) {
|
|
|
|
|
int newSlotsAmount = slotsAmount(newPartitions);
|
|
|
|
|
if (newSlotsAmount == lastPartitions.size() && lastPartitions.size() == MAX_SLOT) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Set<Integer> removedSlots = new HashSet<Integer>(lastPartitions.keySet());
|
|
|
|
|
removedSlots.removeAll(newPartitionsSlots);
|
|
|
|
|
Set<Integer> removedSlots = new HashSet<Integer>();
|
|
|
|
|
for (Integer slot : lastPartitions.keySet()) {
|
|
|
|
|
boolean found = false;
|
|
|
|
|
for (ClusterPartition clusterPartition : newPartitions) {
|
|
|
|
|
if (clusterPartition.getSlots().contains(slot)) {
|
|
|
|
|
found = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (!found) {
|
|
|
|
|
removedSlots.add(slot);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
lastPartitions.keySet().removeAll(removedSlots);
|
|
|
|
|
if (!removedSlots.isEmpty()) {
|
|
|
|
|
log.info("{} slots found to remove", removedSlots.size());
|
|
|
|
@ -587,9 +600,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Set<Integer> addedSlots = new HashSet<Integer>(newPartitionsSlots);
|
|
|
|
|
addedSlots.removeAll(lastPartitions.keySet());
|
|
|
|
|
Set<Integer> addedSlots = new HashSet<Integer>();
|
|
|
|
|
for (ClusterPartition clusterPartition : newPartitions) {
|
|
|
|
|
for (Integer slot : clusterPartition.getSlots()) {
|
|
|
|
|
if (!lastPartitions.containsKey(slot)) {
|
|
|
|
|
addedSlots.add(slot);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (!addedSlots.isEmpty()) {
|
|
|
|
|
log.info("{} slots found to add", addedSlots.size());
|
|
|
|
|
}
|
|
|
|
@ -611,8 +629,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
Set<ClusterPartition> currentPartitions = getLastPartitions();
|
|
|
|
|
for (ClusterPartition currentPartition : currentPartitions) {
|
|
|
|
|
for (ClusterPartition currentPartition : getLastPartitions()) {
|
|
|
|
|
for (ClusterPartition newPartition : newPartitions) {
|
|
|
|
|
if (!currentPartition.getNodeId().equals(newPartition.getNodeId())
|
|
|
|
|
// skip master change case
|
|
|
|
@ -750,7 +767,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
super.shutdown();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private HashSet<ClusterPartition> getLastPartitions() {
|
|
|
|
|
private Set<ClusterPartition> getLastPartitions() {
|
|
|
|
|
return new HashSet<ClusterPartition>(lastPartitions.values());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|