From 96c9e5c118d24e315880e4a308b04ebefe678a0f Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 14 Aug 2015 16:54:27 +0300 Subject: [PATCH] sentinel up/down discovery. #221 --- .../connection/SentinelConnectionManager.java | 151 +++++++++++------- 1 file changed, 89 insertions(+), 62 deletions(-) diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 1b68b7401..dd0ac61ff 100644 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -16,12 +16,9 @@ package org.redisson.connection; import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import org.redisson.Config; @@ -31,19 +28,24 @@ import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.RedisPubSubListener; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.pubsub.PubSubType; -import org.redisson.core.Node; +import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.internal.PlatformDependent; + public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final Logger log = LoggerFactory.getLogger(getClass()); - private final List sentinels = new ArrayList(); + private final ConcurrentMap sentinels = PlatformDependent.newConcurrentHashMap(); + private final AtomicReference currentMaster = new AtomicReference(); + private final ConcurrentMap freezeSlaves = PlatformDependent.newConcurrentHashMap(); + private final ConcurrentMap slaves = PlatformDependent.newConcurrentHashMap(); + public SentinelConnectionManager(final SentinelServersConfig cfg, Config config) { init(config); @@ -61,7 +63,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); - final Set addedSlaves = Collections.newSetFromMap(new ConcurrentHashMap()); for (URI addr : cfg.getSentinelAddresses()) { RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getTimeout()); try { @@ -71,12 +72,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { List master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); String masterHost = master.get(0) + ":" + master.get(1); c.setMasterAddress(masterHost); + currentMaster.set(masterHost); log.info("master: {} added", masterHost); // c.addSlaveAddress(masterHost); // TODO async - List> slaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); - for (Map map : slaves) { + List> sentinelSlaves = connection.sync(RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); + for (Map map : sentinelSlaves) { String ip = map.get("ip"); String port = map.get("port"); String flags = map.get("flags"); @@ -89,7 +91,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.info("slave: {}:{} added, params: {}", ip, port, map); c.addSlaveAddress(ip + ":" + port); String host = ip + ":" + port; - addedSlaves.add(host); + slaves.put(host, true); } break; } finally { @@ -99,50 +101,66 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { init(c); - monitorMasterChange(cfg, addedSlaves); + for (URI addr : cfg.getSentinelAddresses()) { + registerSentinel(cfg, addr); + } } - private void monitorMasterChange(final SentinelServersConfig cfg, final Set addedSlaves) { - final AtomicReference master = new AtomicReference(); - final Set freezeSlaves = Collections.newSetFromMap(new ConcurrentHashMap()); - - for (final URI addr : cfg.getSentinelAddresses()) { - RedisClient client = createClient(addr.getHost(), addr.getPort()); - sentinels.add(client); + private void registerSentinel(final SentinelServersConfig cfg, final URI addr) { + RedisClient client = createClient(addr.getHost(), addr.getPort()); + RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client); + if (oldClient != null) { + return; + } - RedisPubSubConnection pubsub = client.connectPubSub(); - pubsub.addListener(new BaseRedisPubSubListener() { + RedisPubSubConnection pubsub = client.connectPubSub(); + pubsub.addListener(new BaseRedisPubSubListener() { - @Override - public void onMessage(String channel, String msg) { - if ("+slave".equals(channel)) { - onSlaveAdded(addedSlaves, addr, msg); - } - if ("+sdown".equals(channel)) { - onSlaveDown(freezeSlaves, addr, msg); - } - if ("-sdown".equals(channel)) { - onSlaveUp(freezeSlaves, addr, msg); - } - if ("+switch-master".equals(channel)) { - onMasterChange(cfg, master, addr, msg); - } + @Override + public void onMessage(String channel, String msg) { + if ("+sentinel".equals(channel)) { + onSentinelAdded(cfg, msg); + } + if ("+slave".equals(channel)) { + onSlaveAdded(addr, msg); + } + if ("+sdown".equals(channel)) { + onSlaveDown(addr, msg); + } + if ("-sdown".equals(channel)) { + onSlaveUp(addr, msg); + } + if ("+switch-master".equals(channel)) { + onMasterChange(cfg, addr, msg); } + } - @Override - public boolean onStatus(PubSubType type, String channel) { - if (type == PubSubType.SUBSCRIBE) { - log.info("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); - } - return true; + @Override + public boolean onStatus(PubSubType type, String channel) { + if (type == PubSubType.SUBSCRIBE) { + log.debug("subscribed to channel: {} from Sentinel {}:{}", channel, addr.getHost(), addr.getPort()); } - }); + return true; + } + }); - pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave"); + pubsub.subscribe(StringCodec.INSTANCE, "+switch-master", "+sdown", "-sdown", "+slave", "+sentinel"); + log.info("sentinel: {}:{} added", addr.getHost(), addr.getPort()); + } + + protected void onSentinelAdded(SentinelServersConfig cfg, String msg) { + String[] parts = msg.split(" "); + if ("sentinel".equals(parts[0])) { + String ip = parts[2]; + String port = parts[3]; + + String addr = ip + ":" + port; + URI uri = URIBuilder.create(addr); + registerSentinel(cfg, uri); } } - protected void onSlaveAdded(Set addedSlaves, URI addr, String msg) { + protected void onSlaveAdded(URI addr, String msg) { String[] parts = msg.split(" "); if (parts.length > 4 @@ -153,16 +171,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String slaveAddr = ip + ":" + port; // to avoid addition twice - if (addedSlaves.add(slaveAddr)) { - log.debug("Slave has been added - {}", slaveAddr); + if (slaves.putIfAbsent(slaveAddr, true) == null) { addSlave(ip, Integer.valueOf(port)); + log.info("slave: {} added", slaveAddr); } } else { log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); } } - private void onSlaveDown(final Set freezeSlaves, final URI addr, String msg) { + private void onSlaveDown(URI sentinelAddr, String msg) { String[] parts = msg.split(" "); if (parts.length > 3) { @@ -170,24 +188,34 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[2]; String port = parts[3]; - String slaveAddr = ip + ":" + port; + String addr = ip + ":" + port; // to avoid freeze twice - if (freezeSlaves.add(slaveAddr)) { - log.debug("Slave has down - {}", slaveAddr); + if (freezeSlaves.putIfAbsent(addr, true) == null) { slaveDown(0, ip, Integer.valueOf(port)); + log.info("slave: {} has down", addr); + } + } else if ("sentinel".equals(parts[0])) { + String ip = parts[2]; + String port = parts[3]; + + String addr = ip + ":" + port; + RedisClient sentinel = sentinels.remove(addr); + if (sentinel != null) { + sentinel.shutdownAsync(); + log.info("sentinel: {} has down", addr); } - } else if ("sentinel".equals(parts[0]) || "master".equals(parts[0])) { + } else if ("master".equals(parts[0])) { // skip } else { - log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort()); } } else { - log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); + log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort()); } } - protected void onSlaveUp(Set freezeSlaves, URI addr, String msg) { + protected void onSlaveUp(URI addr, String msg) { String[] parts = msg.split(" "); if (parts.length > 4 @@ -196,17 +224,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String port = parts[3]; String slaveAddr = ip + ":" + port; - if (freezeSlaves.remove(slaveAddr)) { - log.debug("Slave has up - {}", slaveAddr); + if (freezeSlaves.remove(slaveAddr) != null) { slaveUp(ip, Integer.valueOf(port)); + log.info("slave: {} has up", slaveAddr); } } else { log.warn("onSlaveUp. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); } } - private void onMasterChange(final SentinelServersConfig cfg, - final AtomicReference master, final URI addr, String msg) { + private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) { String[] parts = msg.split(" "); if (parts.length > 3) { @@ -214,12 +241,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[3]; String port = parts[4]; - String current = master.get(); + String current = currentMaster.get(); String newMaster = ip + ":" + port; if (!newMaster.equals(current) - && master.compareAndSet(current, newMaster)) { - log.debug("changing master from {} to {}", current, newMaster); + && currentMaster.compareAndSet(current, newMaster)) { changeMaster(0, ip, Integer.valueOf(port)); + log.info("master has changed from {} to {}", current, newMaster); } } } else { @@ -239,7 +266,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { public void shutdown() { super.shutdown(); - for (RedisClient sentinel : sentinels) { + for (RedisClient sentinel : sentinels.values()) { sentinel.shutdown(); } }