diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index dd0ac61ff..c05fb0b27 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -16,6 +16,7 @@ package org.redisson.connection; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -27,6 +28,7 @@ import org.redisson.SentinelServersConfig; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; +import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; @@ -47,7 +49,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final ConcurrentMap slaves = PlatformDependent.newConcurrentHashMap(); - public SentinelConnectionManager(final SentinelServersConfig cfg, Config config) { + public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { init(config); final MasterSlaveServersConfig c = new MasterSlaveServersConfig(); @@ -63,6 +65,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); + List disconnectedSlaves = new ArrayList(); for (URI addr : cfg.getSentinelAddresses()) { RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout()); try { @@ -74,7 +77,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { c.setMasterAddress(masterHost); currentMaster.set(masterHost); log.info("master: {} added", masterHost); -// c.addSlaveAddress(masterHost); // TODO async List> sentinelSlaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); @@ -83,24 +85,34 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String port = map.get("port"); String flags = map.get("flags"); - if (flags.contains("s_down") || flags.contains("disconnected")) { - log.info("slave: {}:{} is disconnected. skipped, params: {}", ip, port, map); - continue; - } - - log.info("slave: {}:{} added, params: {}", ip, port, map); - c.addSlaveAddress(ip + ":" + port); String host = ip + ":" + port; + + c.addSlaveAddress(host); slaves.put(host, true); + log.info("slave: {} added, params: {}", host, map); + + if (flags.contains("s_down") || flags.contains("disconnected")) { + disconnectedSlaves.add(host); + } } break; + } catch (RedisConnectionException e) { + // skip } finally { client.shutdownAsync(); } } + if (currentMaster.get() == null) { + throw new IllegalStateException("Can't connect to servers!"); + } init(c); + for (String host : disconnectedSlaves) { + String[] parts = host.split(":"); + slaveDown(parts[0], parts[1]); + } + for (URI addr : cfg.getSentinelAddresses()) { registerSentinel(cfg, addr); } @@ -113,39 +125,43 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - RedisPubSubConnection pubsub = client.connectPubSub(); - pubsub.addListener(new BaseRedisPubSubListener() { + try { + RedisPubSubConnection pubsub = client.connectPubSub(); + pubsub.addListener(new BaseRedisPubSubListener() { - @Override - public void onMessage(String channel, String msg) { - if ("+sentinel".equals(channel)) { - onSentinelAdded(cfg, msg); - } - if ("+slave".equals(channel)) { - onSlaveAdded(addr, msg); - } - if ("+sdown".equals(channel)) { - onSlaveDown(addr, msg); - } - if ("-sdown".equals(channel)) { - onSlaveUp(addr, msg); - } - if ("+switch-master".equals(channel)) { - onMasterChange(cfg, addr, msg); + @Override + public void onMessage(String channel, String msg) { + if ("+sentinel".equals(channel)) { + onSentinelAdded(cfg, msg); + } + if ("+slave".equals(channel)) { + onSlaveAdded(addr, msg); + } + if ("+sdown".equals(channel)) { + onSlaveDown(addr, msg); + } + if ("-sdown".equals(channel)) { + onSlaveUp(addr, msg); + } + if ("+switch-master".equals(channel)) { + onMasterChange(cfg, addr, msg); + } } - } - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == PubSubType.SUBSCRIBE) { - log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == PubSubType.SUBSCRIBE) { + log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); + } + return true; } - return true; - } - }); + }); - pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel"); - log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort()); + pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel"); + log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort()); + } catch (RedisConnectionException e) { + log.warn("can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort()); + } } protected void onSentinelAdded(SentinelServersConfig cfg, String msg) { @@ -188,13 +204,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[2]; String port = parts[3]; - String addr = ip + ":" + port; - - // to avoid freeze twice - if (freezeSlaves.putIfAbsent(addr, true) == null) { - slaveDown(0, ip, Integer.valueOf(port)); - log.info("slave: {} has down", addr); - } + slaveDown(ip, port); } else if ("sentinel".equals(parts[0])) { String ip = parts[2]; String port = parts[3]; @@ -215,6 +225,15 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } + private void slaveDown(String ip, String port) { + // to avoid freeze twice + String addr = ip + ":" + port; + if (freezeSlaves.putIfAbsent(addr, true) == null) { + slaveDown(0, ip, Integer.valueOf(port)); + log.info("slave: {} has down", addr); + } + } + protected void onSlaveUp(URI addr, String msg) { String[] parts = msg.split(" ");