diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index a6082dfd4..864c5ca55 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -249,7 +249,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } RedisConnection connection = nodeConnections.get(key); if (connection != null) { - return RedissonPromise.newSucceededFuture(connection); + if (!connection.isActive()) { + nodeConnections.remove(key); + connection.closeAsync(); + } else { + return RedissonPromise.newSucceededFuture(connection); + } } if (addr != null) { diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index cfdeac52a..3580e92ed 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -32,6 +32,7 @@ import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; import org.redisson.config.ReplicatedServersConfig; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,15 +120,15 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { monitorFuture = group.schedule(new Runnable() { @Override public void run() { + if (isShuttingDown()) { + return; + } + final URI master = currentMaster.get(); log.debug("Current master: {}", master); final AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size()); for (final URI addr : cfg.getNodeAddresses()) { - if (isShuttingDown()) { - return; - } - RFuture connectionFuture = connectToNode(cfg, addr, null, addr.getHost()); connectionFuture.addListener(new FutureListener() { @Override @@ -166,6 +167,8 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { } else if (currentMaster.compareAndSet(master, addr)) { changeMaster(singleSlotRange.getStartSlot(), addr); } + } else if (!config.checkSkipSlavesInit()) { + slaveUp(addr); } if (count.decrementAndGet() == 0) { @@ -181,6 +184,13 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { }, cfg.getScanInterval(), TimeUnit.MILLISECONDS); } + private void slaveUp(URI uri) { + MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); + if (entry.slaveUp(uri, FreezeReason.MANAGER)) { + log.info("slave: {} has up", uri); + } + } + @Override public void shutdown() { if (monitorFuture != null) {