|
|
|
@ -496,7 +496,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
futures.add(f);
|
|
|
|
|
}
|
|
|
|
|
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
|
|
|
|
.exceptionally(e -> null);
|
|
|
|
|
.exceptionally(e -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
log.error(e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
return null;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private CompletableFuture<Void> upDownSlaves(MasterSlaveEntry entry, ClusterPartition currentPart, ClusterPartition newPart, Set<RedisURI> addedSlaves) {
|
|
|
|
@ -576,17 +581,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompletableFuture<Void> slaveUpFuture = entry.addSlave(uri, false, NodeType.SLAVE, configEndpointHostName);
|
|
|
|
|
slaveUpFuture = slaveUpFuture.whenComplete((res, ex) -> {
|
|
|
|
|
if (ex != null) {
|
|
|
|
|
log.error("Can't add slave: {}", uri, ex);
|
|
|
|
|
}
|
|
|
|
|
}).thenCompose(res -> {
|
|
|
|
|
CompletableFuture<Void> f = slaveUpFuture.thenCompose(res -> {
|
|
|
|
|
currentPart.addSlaveAddress(uri);
|
|
|
|
|
log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
|
|
|
|
|
return entry.excludeMasterFromSlaves(uri)
|
|
|
|
|
.thenApply(r -> null);
|
|
|
|
|
});
|
|
|
|
|
futures.add(slaveUpFuture);
|
|
|
|
|
futures.add(f);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
|
|
|
@ -649,7 +650,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
futures.add(future.toCompletableFuture());
|
|
|
|
|
}
|
|
|
|
|
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
|
|
|
|
.exceptionally(e -> null);
|
|
|
|
|
.exceptionally(e -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
log.error(e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
return null;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkSlotsChange(Collection<ClusterPartition> newPartitions) {
|
|
|
|
|