|
|
|
@ -338,8 +338,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
addEntry(slot, entry);
|
|
|
|
|
lastPartitions.put(slot, partition);
|
|
|
|
|
}
|
|
|
|
|
if (partition.getSlotsAmount() > 0
|
|
|
|
|
&& partition.getMasterAddress() != null) {
|
|
|
|
|
if (partition.getSlotsAmount() > 0) {
|
|
|
|
|
lastUri2Partition.put(partition.getMasterAddress(), partition);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -499,9 +498,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
private CompletableFuture<Void> checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
List<CompletableFuture<?>> futures = new ArrayList<>();
|
|
|
|
|
for (ClusterPartition newPart : newPartitions) {
|
|
|
|
|
if (newPart.getMasterAddress() == null) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
ClusterPartition currentPart = lastUri2Partition.get(newPart.getMasterAddress());
|
|
|
|
|
if (currentPart == null) {
|
|
|
|
|
continue;
|
|
|
|
@ -634,8 +630,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
List<CompletableFuture<?>> futures = new ArrayList<>();
|
|
|
|
|
|
|
|
|
|
for (ClusterPartition newPart : newPartitions) {
|
|
|
|
|
if (newPart.getSlotsAmount() == 0
|
|
|
|
|
|| newPart.getMasterAddress() == null) {
|
|
|
|
|
if (newPart.getSlotsAmount() == 0) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -698,7 +693,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
for (Integer removedSlot : removedSlots) {
|
|
|
|
|
ClusterPartition p = lastPartitions.remove(removedSlot);
|
|
|
|
|
if (p != null && p.getMasterAddress() != null) {
|
|
|
|
|
if (p != null) {
|
|
|
|
|
lastUri2Partition.remove(p.getMasterAddress());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -713,7 +708,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
Integer addedSlots = 0;
|
|
|
|
|
for (ClusterPartition clusterPartition : newPartitions) {
|
|
|
|
|
MasterSlaveEntry entry = getEntry(clusterPartition.getMasterAddress());
|
|
|
|
|
boolean hasNewSlots = false;
|
|
|
|
|
for (Integer slot : clusterPartition.getSlots()) {
|
|
|
|
|
if (lastPartitions.containsKey(slot)) {
|
|
|
|
|
continue;
|
|
|
|
@ -722,19 +716,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
if (entry != null) {
|
|
|
|
|
addEntry(slot, entry);
|
|
|
|
|
lastPartitions.put(slot, clusterPartition);
|
|
|
|
|
lastUri2Partition.put(clusterPartition.getMasterAddress(), clusterPartition);
|
|
|
|
|
addedSlots++;
|
|
|
|
|
hasNewSlots = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (hasNewSlots && clusterPartition.getMasterAddress() != null) {
|
|
|
|
|
lastUri2Partition.put(clusterPartition.getMasterAddress(), clusterPartition);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (addedSlots > 0) {
|
|
|
|
|
log.info("{} slots found to add", addedSlots);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
Collection<ClusterPartition> clusterLastPartitions = getLastPartitions();
|
|
|
|
|
|
|
|
|
@ -760,7 +751,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
lastPartitions.put(slot, currentPartition);
|
|
|
|
|
changedSlots.add(slot);
|
|
|
|
|
});
|
|
|
|
|
if (!addedSlots.isEmpty() && currentPartition.getMasterAddress() != null) {
|
|
|
|
|
if (!addedSlots.isEmpty()) {
|
|
|
|
|
lastUri2Partition.put(currentPartition.getMasterAddress(), currentPartition);
|
|
|
|
|
log.info("{} slots added to {}", addedSlots.cardinality(), currentPartition.getMasterAddress());
|
|
|
|
|
}
|
|
|
|
@ -770,9 +761,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
removedSlots.stream().forEach(slot -> {
|
|
|
|
|
if (lastPartitions.remove(slot, currentPartition)) {
|
|
|
|
|
if (currentPartition.getMasterAddress() != null) {
|
|
|
|
|
lastUri2Partition.remove(currentPartition.getMasterAddress());
|
|
|
|
|
}
|
|
|
|
|
lastUri2Partition.remove(currentPartition.getMasterAddress());
|
|
|
|
|
removeEntry(slot);
|
|
|
|
|
changedSlots.add(slot);
|
|
|
|
|
}
|
|
|
|
|