diff --git a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java index ef629bfc5..aad53793d 100644 --- a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java +++ b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java @@ -166,22 +166,30 @@ public class DNSMonitor { slaveFound = true; if (masterSlaveEntry.hasSlave(newSlaveAddr)) { - masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER); - masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); - slaves.put(entry.getKey(), newSlaveAddr); - promise.complete(null); + CompletableFuture slaveUpFuture = masterSlaveEntry.slaveUpAsync(newSlaveAddr, FreezeReason.MANAGER); + slaveUpFuture.whenComplete((r, e) -> { + if (e != null) { + promise.complete(null); + return; + } + if (r) { + slaves.put(entry.getKey(), newSlaveAddr); + masterSlaveEntry.slaveDownAsync(currentSlaveAddr, FreezeReason.MANAGER); + } + promise.complete(null); + }); } else { CompletableFuture addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey()); addFuture.whenComplete((res, e) -> { - promise.complete(null); - if (e != null) { log.error("Can't add slave: " + newSlaveAddr, e); + promise.complete(null); return; } - masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); slaves.put(entry.getKey(), newSlaveAddr); + masterSlaveEntry.slaveDownAsync(currentSlaveAddr, FreezeReason.MANAGER); + promise.complete(null); }); } break; diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index af2ef2246..9093d7161 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -454,10 +454,34 @@ public class MasterSlaveEntry { }); } + public CompletableFuture excludeMasterFromSlaves(InetSocketAddress address) { + InetSocketAddress addr = masterEntry.getClient().getAddr(); + if (config.checkSkipSlavesInit() || addr.equals(address)) { + return CompletableFuture.completedFuture(false); + } + CompletableFuture downFuture = slaveDownAsync(addr, FreezeReason.SYSTEM); + return downFuture.thenApply(r -> { + if (r) { + log.info("master {} excluded from slaves", addr); + } + return r; + }); + } + public CompletableFuture slaveUpAsync(RedisURI address, FreezeReason freezeReason) { return slaveBalancer.unfreezeAsync(address, freezeReason); } + public CompletableFuture slaveUpAsync(InetSocketAddress address, FreezeReason freezeReason) { + CompletableFuture f = slaveBalancer.unfreezeAsync(address, freezeReason); + return f.thenCompose(r -> { + if (r) { + return excludeMasterFromSlaves(address); + } + return CompletableFuture.completedFuture(r); + }); + } + public boolean slaveUp(InetSocketAddress address, FreezeReason freezeReason) { if (!slaveBalancer.unfreeze(address, freezeReason)) { return false;