diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 10fb017b4..4fb884258 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -38,6 +38,8 @@ import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; public class SentinelConnectionManager extends MasterSlaveConnectionManager { @@ -129,55 +131,70 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { slaveDown(parts[0], parts[1]); } + List> connectionFutures = new ArrayList>(cfg.getSentinelAddresses().size()); for (URI addr : cfg.getSentinelAddresses()) { - registerSentinel(cfg, addr, c); + Future future = registerSentinel(cfg, addr, c); + connectionFutures.add(future); + } + + for (Future future : connectionFutures) { + future.syncUninterruptibly(); } } - private void registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { + private Future registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout()); RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client); if (oldClient != null) { - return; + return newSucceededFuture(null); } - try { - RedisPubSubConnection pubsub = client.connectPubSub(); - pubsub.addListener(new BaseRedisPubSubListener() { + Future pubsubFuture = client.connectPubSubAsync(); + pubsubFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort()); + return; + } - @Override - public void onMessage(String channel, String msg) { - if ("+sentinel".equals(channel)) { - onSentinelAdded(cfg, msg, c); - } - if ("+slave".equals(channel)) { - onSlaveAdded(addr, msg); - } - if ("+sdown".equals(channel)) { - onNodeDown(addr, msg); - } - if ("-sdown".equals(channel)) { - onSlaveUp(addr, msg); + RedisPubSubConnection pubsub = future.getNow(); + pubsub.addListener(new BaseRedisPubSubListener() { + + @Override + public void onMessage(String channel, String msg) { + if ("+sentinel".equals(channel)) { + onSentinelAdded(cfg, msg, c); + } + if ("+slave".equals(channel)) { + onSlaveAdded(addr, msg); + } + if ("+sdown".equals(channel)) { + onNodeDown(addr, msg); + } + if ("-sdown".equals(channel)) { + onSlaveUp(addr, msg); + } + if ("+switch-master".equals(channel)) { + onMasterChange(cfg, addr, msg); + } } - if ("+switch-master".equals(channel)) { - onMasterChange(cfg, addr, msg); - } - } - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == PubSubType.SUBSCRIBE) { - log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == PubSubType.SUBSCRIBE) { + log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); + } + return true; } - return true; - } - }); + }); - pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel"); - log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort()); - } catch (RedisConnectionException e) { - log.warn("can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort()); - } + pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel"); + log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort()); + } + }); + + return pubsubFuture; } protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) {