|
|
|
@ -555,12 +555,18 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
Set<RedisURI> removedSlaves = new HashSet<>(currentPart.getSlaveAddresses());
|
|
|
|
|
removedSlaves.removeAll(newPart.getSlaveAddresses());
|
|
|
|
|
|
|
|
|
|
if (!removedSlaves.isEmpty()) {
|
|
|
|
|
log.info("removed slaves detected for master {}. current slaves {} last slaves {}",
|
|
|
|
|
currentPart.getMasterAddress(), currentPart.getSlaveAddresses(), newPart.getSlaveAddresses());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (RedisURI uri : removedSlaves) {
|
|
|
|
|
currentPart.removeSlaveAddress(uri);
|
|
|
|
|
|
|
|
|
|
if (config.isSlaveNotUsed() || entry.slaveDown(uri, FreezeReason.MANAGER)) {
|
|
|
|
|
disconnectNode(uri);
|
|
|
|
|
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
|
|
|
|
|
log.info("slave {} removed for master {} and slot ranges: {}",
|
|
|
|
|
currentPart.getMasterAddress(), uri, currentPart.getSlotRanges());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -569,6 +575,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
&& !newPart.getFailedSlaveAddresses().contains(uri))
|
|
|
|
|
.collect(Collectors.toSet());
|
|
|
|
|
|
|
|
|
|
if (!addedSlaves.isEmpty()) {
|
|
|
|
|
log.info("added slaves detected for master {}. current slaves {} last slaves {} last failed slaves {}",
|
|
|
|
|
currentPart.getMasterAddress(), currentPart.getSlaveAddresses(),
|
|
|
|
|
newPart.getSlaveAddresses(), newPart.getFailedSlaveAddresses());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
List<CompletableFuture<?>> futures = new ArrayList<>();
|
|
|
|
|
for (RedisURI uri : addedSlaves) {
|
|
|
|
|
ClientConnectionsEntry slaveEntry = entry.getEntry(uri);
|
|
|
|
@ -577,7 +589,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
slaveUpFuture = slaveUpFuture.thenApply(v -> {
|
|
|
|
|
if (v) {
|
|
|
|
|
currentPart.addSlaveAddress(uri);
|
|
|
|
|
log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
|
|
|
|
|
log.info("slave: {} unfreezed for master {} and slot ranges: {}",
|
|
|
|
|
currentPart.getMasterAddress(), uri, currentPart.getSlotRanges());
|
|
|
|
|
entry.excludeMasterFromSlaves(uri);
|
|
|
|
|
}
|
|
|
|
|
return v;
|
|
|
|
@ -589,7 +602,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
CompletableFuture<Void> slaveUpFuture = entry.addSlave(uri, false, configEndpointHostName);
|
|
|
|
|
CompletableFuture<Void> f = slaveUpFuture.thenAccept(res -> {
|
|
|
|
|
currentPart.addSlaveAddress(uri);
|
|
|
|
|
log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
|
|
|
|
|
log.info("slave: {} added for master {} and slot ranges: {}",
|
|
|
|
|
currentPart.getMasterAddress(), uri, currentPart.getSlotRanges());
|
|
|
|
|
entry.excludeMasterFromSlaves(uri);
|
|
|
|
|
});
|
|
|
|
|
futures.add(f);
|
|
|
|
|