From 897f4cc524c922ef4992a35381986b45d22d0d9e Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 30 Jan 2016 12:15:42 +0300 Subject: [PATCH] Sentinel offline slaves handling during Redisson start. #391 --- .../MasterSlaveConnectionManager.java | 26 ++++++++++------ .../redisson/connection/MasterSlaveEntry.java | 8 +++-- .../connection/SentinelConnectionManager.java | 31 ++++++++++++++----- 3 files changed, 44 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index ebfe85ca8..7f4298045 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -199,21 +199,27 @@ public class MasterSlaveConnectionManager implements ConnectionManager { HashSet slots = new HashSet(); slots.add(singleSlotRange); + MasterSlaveEntry entry; if (config.getReadMode() == ReadMode.MASTER) { - SingleEntry entry = new SingleEntry(slots, this, config); + entry = new SingleEntry(slots, this, config); Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); f.syncUninterruptibly(); - addEntry(singleSlotRange, entry); } else { - MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); - List> fs = entry.initSlaveBalancer(); - for (Future future : fs) { - future.syncUninterruptibly(); - } - Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); - f.syncUninterruptibly(); - addEntry(singleSlotRange, entry); + entry = createMasterSlaveEntry(config, slots); + } + addEntry(singleSlotRange, entry); + } + + protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, + HashSet slots) { + MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); + List> fs = entry.initSlaveBalancer(java.util.Collections.emptyList()); + for (Future future : fs) { + future.syncUninterruptibly(); } + Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + f.syncUninterruptibly(); + return entry; } protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig cfg) { diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 8992113f2..f19d5122a 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -68,14 +68,16 @@ public class MasterSlaveEntry { writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); } - public List> initSlaveBalancer() { - boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() && config.getReadMode() == ReadMode.SLAVE; + public List> initSlaveBalancer(Collection disconnectedNodes) { + boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() + && config.getReadMode() == ReadMode.SLAVE + && disconnectedNodes.size() < config.getSlaveAddresses().size(); List> result = new LinkedList>(); Future f = addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER); result.add(f); for (URI address : config.getSlaveAddresses()) { - f = addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE); + f = addSlave(address.getHost(), address.getPort(), disconnectedNodes.contains(address), NodeType.SLAVE); result.add(f); } return result; diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index c39ac8c8d..e99e0bd52 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -18,8 +18,10 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; @@ -36,6 +38,7 @@ import org.redisson.client.RedisPubSubConnection; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.pubsub.PubSubType; +import org.redisson.cluster.ClusterSlotRange; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.NodeType; import org.redisson.misc.URIBuilder; @@ -55,13 +58,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final AtomicReference currentMaster = new AtomicReference(); private final ConcurrentMap slaves = PlatformDependent.newConcurrentHashMap(); + private final Set disconnectedSlaves = new HashSet(); public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { super(config); final MasterSlaveServersConfig c = create(cfg); - List disconnectedSlaves = new ArrayList(); for (URI addr : cfg.getSentinelAddresses()) { RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout()); try { @@ -95,10 +98,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { c.addSlaveAddress(host); slaves.put(host, true); - log.info("slave: {} added, params: {}", host, map); + log.debug("slave {} state: {}", host, map); if (flags.contains("s_down") || flags.contains("disconnected")) { - disconnectedSlaves.add(host); + URI url = URIBuilder.create(host); + disconnectedSlaves.add(url); + log.info("slave: {} down", host); + } else { + log.info("slave: {} added", host); } } break; @@ -114,11 +121,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } init(c); - for (String host : disconnectedSlaves) { - String[] parts = host.split(":"); - slaveDown(parts[0], parts[1]); - } - List> connectionFutures = new ArrayList>(cfg.getSentinelAddresses().size()); for (URI addr : cfg.getSentinelAddresses()) { Future future = registerSentinel(cfg, addr, c); @@ -130,6 +132,19 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } + @Override + protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, + HashSet slots) { + MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); + List> fs = entry.initSlaveBalancer(disconnectedSlaves); + for (Future future : fs) { + future.syncUninterruptibly(); + } + Future f = entry.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); + f.syncUninterruptibly(); + return entry; + } + private Future registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) { RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout()); RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);