From a99682f240dc09efb339f427800edbddc4b74a9d Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 8 Mar 2018 18:52:11 +0300 Subject: [PATCH] DNS monitoring for Sentinel nodes. #1299 --- .../MasterSlaveConnectionManager.java | 14 ++- .../connection/SentinelConnectionManager.java | 100 +++++++++++++++--- .../balancer/LoadBalancerManager.java | 4 - 3 files changed, 97 insertions(+), 21 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 304e1d2e1..ac1c73105 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -366,16 +366,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager { addEntry(slot, entry); } - if (config.getDnsMonitoringInterval() != -1) { - dnsMonitor = new DNSMonitor(this, f.getNow(), - config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup); - dnsMonitor.start(); - } + startDNSMonitoring(f.getNow()); } catch (RuntimeException e) { stopThreads(); throw e; } } + + protected void startDNSMonitoring(RedisClient masterHost) { + if (config.getDnsMonitoringInterval() != -1) { + dnsMonitor = new DNSMonitor(this, masterHost, + config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup); + dnsMonitor.start(); + } + } protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet slots) { diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index dae856f99..c35c51211 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -15,6 +15,7 @@ */ package org.redisson.connection; +import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -51,6 +52,7 @@ import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.resolver.AddressResolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; @@ -70,7 +72,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final Set slaves = Collections.newSetFromMap(PlatformDependent.newConcurrentHashMap()); private final Set disconnectedSlaves = new HashSet(); + private String masterName; private ScheduledFuture monitorFuture; + private AddressResolver sentinelResolver; public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { super(config); @@ -79,9 +83,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { throw new IllegalArgumentException("masterName parameter is not defined!"); } + this.masterName = cfg.getMasterName(); this.config = create(cfg); initTimer(this.config); + this.sentinelResolver = resolverGroup.getResolver(getGroup().next()); + for (URI addr : cfg.getSentinelAddresses()) { RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts()); try { @@ -133,7 +140,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String host = createAddress(ip, port); URI sentinelAddr = URIBuilder.create(host); - RFuture future = registerSentinel(cfg, sentinelAddr, this.config); + RFuture future = registerSentinel(sentinelAddr, this.config); connectionFutures.add(future); } @@ -159,6 +166,76 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { scheduleChangeCheck(cfg, null); } + @Override + protected void startDNSMonitoring(RedisClient masterHost) { + if (config.getDnsMonitoringInterval() == -1) { + return; + } + + scheduleSentinelDNSCheck(); + } + + protected void scheduleSentinelDNSCheck() { + monitorFuture = group.schedule(new Runnable() { + @Override + public void run() { + List sentinels = new ArrayList(SentinelConnectionManager.this.sentinels.values()); + + final AtomicInteger sentinelsCounter = new AtomicInteger(sentinels.size()); + FutureListener> commonListener = new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (sentinelsCounter.decrementAndGet() == 0) { + scheduleSentinelDNSCheck(); + } + } + }; + + for (final RedisClient client : sentinels) { + Future> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(client.getAddr().getHostName(), client.getAddr().getPort())); + allNodes.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + log.error("Unable to resolve " + client.getAddr().getHostName(), future.cause()); + return; + } + + boolean clientFound = false; + for (InetSocketAddress addr : future.getNow()) { + boolean found = false; + for (RedisClient currentSentinel : SentinelConnectionManager.this.sentinels.values()) { + if (currentSentinel.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress()) + && currentSentinel.getAddr().getPort() == addr.getPort()) { + found = true; + break; + } + } + if (!found) { + URI uri = convert(addr.getAddress().getHostAddress(), "" + addr.getPort()); + registerSentinel(uri, getConfig()); + } + if (client.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress()) + && client.getAddr().getPort() == addr.getPort()) { + clientFound = true; + } + } + if (!clientFound) { + String addr = client.getAddr().getAddress().getHostAddress() + ":" + client.getAddr().getPort(); + RedisClient sentinel = SentinelConnectionManager.this.sentinels.remove(addr); + if (sentinel != null) { + sentinel.shutdownAsync(); + log.warn("sentinel: {} has down", addr); + } + } + } + }); + allNodes.addListener(commonListener); + } + } + }, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS); + } + private void scheduleChangeCheck(final SentinelServersConfig cfg, final Iterator iterator) { monitorFuture = group.schedule(new Runnable() { @Override @@ -305,7 +382,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }); slavesFuture.addListener(commonListener); } - + RFuture>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName()); sentinelsFuture.addListener(new FutureListener>>() { @Override @@ -323,9 +400,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = map.get("ip"); String port = map.get("port"); - String host = createAddress(ip, port); - URI sentinelAddr = URIBuilder.create(host); - registerSentinel(cfg, sentinelAddr, getConfig()); + URI sentinelAddr = convert(ip, port); + registerSentinel(sentinelAddr, getConfig()); } } }); @@ -350,7 +426,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return entry; } - private RFuture registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { + private RFuture registerSentinel(final URI addr, final MasterSlaveServersConfig c) { String key = addr.getHost() + ":" + addr.getPort(); RedisClient client = sentinels.get(key); if (client != null) { @@ -380,7 +456,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.debug("message {} from {}", msg, channel); if ("+sentinel".equals(channel)) { - onSentinelAdded(cfg, (String) msg, c); + onSentinelAdded((String) msg, c); } if ("+slave".equals(channel)) { onSlaveAdded(addr, (String) msg); @@ -392,7 +468,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { onNodeUp(addr, (String) msg); } if ("+switch-master".equals(channel)) { - onMasterChange(cfg, addr, (String) msg); + onMasterChange(addr, (String) msg); } } @@ -413,14 +489,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return RedissonPromise.newSucceededFuture(null); } - protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlaveServersConfig c) { + protected void onSentinelAdded(String msg, MasterSlaveServersConfig c) { String[] parts = msg.split(" "); if ("sentinel".equals(parts[0])) { String ip = parts[2]; String port = parts[3]; URI uri = convert(ip, port); - registerSentinel(cfg, uri, c); + registerSentinel(uri, c); } } @@ -589,11 +665,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } - private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) { + private void onMasterChange(URI addr, String msg) { String[] parts = msg.split(" "); if (parts.length > 3) { - if (cfg.getMasterName().equals(parts[0])) { + if (masterName.equals(parts[0])) { String ip = parts[3]; String port = parts[4]; diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 12423b080..87a8a273b 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -222,10 +222,6 @@ public class LoadBalancerManager { return client2Entry.get(redisClient); } - protected String convert(InetSocketAddress addr) { - return addr.getAddress().getHostAddress() + ":" + addr.getPort(); - } - public RFuture getConnection(RedisCommand command, URI addr) { ClientConnectionsEntry entry = getEntry(addr); if (entry != null) {