|
|
|
@ -485,21 +485,18 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
Set<ClusterPartition> lastPartitions = getLastPartitions();
|
|
|
|
|
Map<RedisURI, ClusterPartition> lastPartitions = getLastPartitonsByURI();
|
|
|
|
|
for (ClusterPartition newPart : newPartitions) {
|
|
|
|
|
for (ClusterPartition currentPart : lastPartitions) {
|
|
|
|
|
if (!Objects.equals(newPart.getMasterAddress(), currentPart.getMasterAddress())) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MasterSlaveEntry entry = getEntry(currentPart.slots().nextSetBit(0));
|
|
|
|
|
// should be invoked first in order to remove stale failedSlaveAddresses
|
|
|
|
|
Set<RedisURI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
|
|
|
|
|
// Do some slaves have changed state from failed to alive?
|
|
|
|
|
upDownSlaves(entry, currentPart, newPart, addedSlaves);
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
ClusterPartition currentPart = lastPartitions.get(newPart.getMasterAddress());
|
|
|
|
|
if (currentPart == null) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MasterSlaveEntry entry = getEntry(currentPart.slots().nextSetBit(0));
|
|
|
|
|
// should be invoked first in order to remove stale failedSlaveAddresses
|
|
|
|
|
Set<RedisURI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
|
|
|
|
|
// Do some slaves have changed state from failed to alive?
|
|
|
|
|
upDownSlaves(entry, currentPart, newPart, addedSlaves);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -562,20 +559,17 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RFuture<Void> checkMasterNodesChange(ClusterServersConfig cfg, Collection<ClusterPartition> newPartitions) {
|
|
|
|
|
Set<ClusterPartition> lastPartitions = getLastPartitions();
|
|
|
|
|
Map<RedisURI, ClusterPartition> lastPartitions = getLastPartitonsByURI();
|
|
|
|
|
Map<RedisURI, ClusterPartition> addedPartitions = new HashMap<>();
|
|
|
|
|
Set<RedisURI> mastersElected = new HashSet<>();
|
|
|
|
|
for (ClusterPartition newPart : newPartitions) {
|
|
|
|
|
boolean masterFound = false;
|
|
|
|
|
for (ClusterPartition currentPart : lastPartitions) {
|
|
|
|
|
if (!Objects.equals(newPart.getMasterAddress(), currentPart.getMasterAddress())) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
masterFound = true;
|
|
|
|
|
// skip master if it is not marked as failed or has no slots
|
|
|
|
|
if (!newPart.isMasterFail() || newPart.getSlotsAmount() == 0) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if (newPart.getSlotsAmount() == 0) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ClusterPartition currentPart = lastPartitions.get(newPart.getMasterAddress());
|
|
|
|
|
boolean masterFound = currentPart != null;
|
|
|
|
|
if (masterFound && newPart.isMasterFail()) {
|
|
|
|
|
for (Integer slot : currentPart.getSlots()) {
|
|
|
|
|
ClusterPartition newMasterPart = find(newPartitions, slot);
|
|
|
|
|
// does partition has a new master?
|
|
|
|
@ -596,10 +590,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!masterFound && !newPart.isMasterFail() && newPart.getSlotsAmount() > 0) {
|
|
|
|
|
if (!masterFound && !newPart.isMasterFail()) {
|
|
|
|
|
addedPartitions.put(newPart.getMasterAddress(), newPart);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -873,6 +866,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
super.shutdown();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Map<RedisURI, ClusterPartition> getLastPartitonsByURI() {
|
|
|
|
|
return lastPartitions.values().stream().collect(Collectors.toMap(p -> p.getMasterAddress(), p -> p));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Set<ClusterPartition> getLastPartitions() {
|
|
|
|
|
return new HashSet<>(lastPartitions.values());
|
|
|
|
|
}
|
|
|
|
|