From 76eebdd45ab8b9e28c64893b460d029fb3086c90 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 25 Dec 2015 13:23:26 +0300 Subject: [PATCH] Connections restore with Sentinel servers fixed. --- .../MasterSlaveConnectionManager.java | 1 - .../redisson/connection/MasterSlaveEntry.java | 11 ++++--- .../connection/SentinelConnectionManager.java | 33 ++++++++----------- .../balancer/LoadBalancerManagerImpl.java | 8 +++-- 4 files changed, 25 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index df866385e..b77ec7dc2 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -545,7 +545,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) { MasterSlaveEntry entry = getEntry(slotRange); slaveDown(entry, host, port, freezeReason); - log.info("slave: {}:{} has down", host, port); } protected void changeMaster(ClusterSlotRange slotRange, String host, int port) { diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index e17e0e04f..e34037391 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -92,8 +92,9 @@ public class MasterSlaveEntry { // add master as slave if no more slaves available if (slaveBalancer.getAvailableClients() == 0) { InetSocketAddress addr = masterEntry.getClient().getAddr(); - slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); - log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); + if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) { + log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); + } } return conns; } @@ -120,15 +121,17 @@ public class MasterSlaveEntry { return masterEntry.getClient(); } - public void slaveUp(String host, int port, FreezeReason freezeReason) { + public boolean slaveUp(String host, int port, FreezeReason freezeReason) { if (!slaveBalancer.unfreeze(host, port, freezeReason)) { - return; + return false; } + InetSocketAddress addr = masterEntry.getClient().getAddr(); // exclude master from slaves if (!addr.getHostName().equals(host) || port != addr.getPort()) { connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); } + return true; } /** diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index b226b9e3a..7d6055c5b 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -49,7 +49,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final ConcurrentMap sentinels = PlatformDependent.newConcurrentHashMap(); private final AtomicReference currentMaster = new AtomicReference(); - private final ConcurrentMap freezeSlaves = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap slaves = PlatformDependent.newConcurrentHashMap(); @@ -222,8 +221,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { // to avoid addition twice if (slaves.putIfAbsent(slaveAddr, true) == null) { - addSlave(ip, Integer.valueOf(port)); + getEntry(singleSlotRange).addSlave(ip, Integer.valueOf(port)); log.info("slave: {} added", slaveAddr); + } else { + slaveUp(ip, port); } } else { log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); @@ -266,11 +267,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private void slaveDown(String ip, String port) { - // to avoid freeze twice - String addr = ip + ":" + port; - if (freezeSlaves.putIfAbsent(addr, true) == null) { - slaveDown(singleSlotRange, ip, Integer.valueOf(port), FreezeReason.MANAGER); - } + slaveDown(singleSlotRange, ip, Integer.valueOf(port), FreezeReason.MANAGER); + log.info("slave: {}:{} has down", ip, port); } private void onNodeUp(URI addr, String msg) { @@ -281,11 +279,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[2]; String port = parts[3]; - String slaveAddr = ip + ":" + port; - if (freezeSlaves.remove(slaveAddr) != null) { - slaveUp(ip, Integer.valueOf(port)); - log.info("slave: {} has up", slaveAddr); - } + slaveUp(ip, port); } else if ("master".equals(parts[0])) { String ip = parts[2]; String port = parts[3]; @@ -303,6 +297,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } + private void slaveUp(String ip, String port) { + if (getEntry(singleSlotRange).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + String slaveAddr = ip + ":" + port; + log.info("slave: {} has up", slaveAddr); + } + } + private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) { String[] parts = msg.split(" "); @@ -324,14 +325,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } - private void addSlave(String host, int port) { - getEntry(0).addSlave(host, port); - } - - private void slaveUp(String host, int port) { - getEntry(0).slaveUp(host, port, FreezeReason.MANAGER); - } - @Override public void shutdown() { super.shutdown(); diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java index 366aedf9f..d5dee0b94 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java @@ -134,9 +134,11 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { connection.closeAsync(); } - List list = new ArrayList(connectionEntry.getAllSubscribeConnections()); - connectionEntry.getAllSubscribeConnections().clear(); - return list; + synchronized (connectionEntry) { + List list = new ArrayList(connectionEntry.getAllSubscribeConnections()); + connectionEntry.getAllSubscribeConnections().clear(); + return list; + } } public Future nextPubSubConnection() {