From a60d3f491e4c8d5d4d74db05278024f540af983f Mon Sep 17 00:00:00 2001
From: Nikita <abracham.mitchell@gmail.com>
Date: Tue, 29 Mar 2016 15:30:53 +0300
Subject: [PATCH] refactoring

---
 .../cluster/ClusterConnectionManager.java     | 10 +++++----
 .../connection/ConnectionManager.java         |  6 ++---
 .../MasterSlaveConnectionManager.java         | 12 +++-------
 .../redisson/connection/MasterSlaveEntry.java | 14 +++++++-----
 .../balancer/LoadBalancerManager.java         |  3 +--
 .../balancer/LoadBalancerManagerImpl.java     | 22 +++++++++----------
 .../connection/pool/ConnectionPool.java       |  2 +-
 7 files changed, 32 insertions(+), 37 deletions(-)

diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
index d0bc0cf36..7ac9a601e 100644
--- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
+++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
@@ -346,8 +346,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
         failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
         for (URI uri : failedSlaves) {
             currentPart.addFailedSlaveAddress(uri);
-            slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
-            log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
+            if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
+                log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
+            }
         }
     }
 
@@ -358,8 +359,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
         for (URI uri : removedSlaves) {
             currentPart.removeSlaveAddress(uri);
 
-            slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
-            log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
+            if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
+                log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
+            }
         }
 
         Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java
index 21a275ddb..4bd1a720c 100644
--- a/src/main/java/org/redisson/connection/ConnectionManager.java
+++ b/src/main/java/org/redisson/connection/ConnectionManager.java
@@ -23,11 +23,11 @@ import java.util.concurrent.TimeUnit;
 import org.redisson.MasterSlaveServersConfig;
 import org.redisson.client.RedisClient;
 import org.redisson.client.RedisConnection;
+import org.redisson.client.RedisPubSubConnection;
 import org.redisson.client.RedisPubSubListener;
 import org.redisson.client.codec.Codec;
 import org.redisson.client.protocol.RedisCommand;
 import org.redisson.cluster.ClusterSlotRange;
-import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
 import org.redisson.core.NodeType;
 import org.redisson.misc.InfinitySemaphoreLatch;
 
@@ -44,6 +44,8 @@ import io.netty.util.concurrent.Promise;
  */
 public interface ConnectionManager {
 
+    void reattachPubSub(Collection<RedisPubSubConnection> allPubSubConnections);
+    
     boolean isClusterMode();
 
     <R> Future<R> newSucceededFuture(R value);
@@ -62,8 +64,6 @@ public interface ConnectionManager {
 
     <R> Future<R> newFailedFuture(Throwable cause);
 
-    void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason);
-
     Collection<RedisClientEntry> getClients();
 
     void shutdownAsync(RedisClient client);
diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java
index 125df525d..2bf246e3f 100644
--- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java
+++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java
@@ -542,13 +542,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
         return null;
     }
 
-    public void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason) {
-        Collection<RedisPubSubConnection> allPubSubConnections = entry.slaveDown(host, port, freezeReason);
-        if (allPubSubConnections.isEmpty()) {
-            return;
-        }
-
-        // reattach listeners to other channels
+    @Override
+    public void reattachPubSub(Collection<RedisPubSubConnection> allPubSubConnections) {
         for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
             for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) {
                 PubSubConnectionEntry pubSubEntry = mapEntry.getValue();
@@ -620,8 +615,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
     }
 
     protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
-        MasterSlaveEntry entry = getEntry(slotRange);
-        slaveDown(entry, host, port, freezeReason);
+        getEntry(slotRange).slaveDown(host, port, freezeReason);
     }
 
     protected void changeMaster(ClusterSlotRange slotRange, String host, int port) {
diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java
index 7582e98fc..139030165 100644
--- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java
+++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java
@@ -90,9 +90,11 @@ public class MasterSlaveEntry {
         return writeConnectionHolder.add(masterEntry);
     }
 
-    public Collection<RedisPubSubConnection> slaveDown(String host, int port, FreezeReason freezeReason) {
-        Collection<RedisPubSubConnection> conns = slaveBalancer.freeze(host, port, freezeReason);
-
+    public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
+        if (!slaveBalancer.freeze(host, port, freezeReason)) {
+            return false;
+        }
+        
         // add master as slave if no more slaves available
         if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) {
             InetSocketAddress addr = masterEntry.getClient().getAddr();
@@ -100,7 +102,7 @@ public class MasterSlaveEntry {
                 log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort());
             }
         }
-        return conns;
+        return true;
     }
 
     public Future<Void> addSlave(String host, int port) {
@@ -134,7 +136,7 @@ public class MasterSlaveEntry {
         // exclude master from slaves
         if (config.getReadMode() == ReadMode.SLAVE
                 && (!addr.getHostName().equals(host) || port != addr.getPort())) {
-            connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
+            slaveDown(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
             log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort());
         }
         return true;
@@ -155,7 +157,7 @@ public class MasterSlaveEntry {
         // more than one slave available, so master can be removed from slaves
         if (config.getReadMode() == ReadMode.SLAVE
                 && slaveBalancer.getAvailableClients() > 1) {
-            connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM);
+            slaveDown(host, port, FreezeReason.SYSTEM);
         }
         connectionManager.shutdownAsync(oldMaster.getClient());
     }
diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
index 56a372b98..2b28c343f 100644
--- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
+++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
@@ -16,7 +16,6 @@
 package org.redisson.connection.balancer;
 
 import java.net.InetSocketAddress;
-import java.util.Collection;
 
 import org.redisson.client.RedisConnection;
 import org.redisson.client.RedisPubSubConnection;
@@ -37,7 +36,7 @@ public interface LoadBalancerManager {
 
     boolean unfreeze(String host, int port, FreezeReason freezeReason);
 
-    Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason);
+    boolean freeze(String host, int port, FreezeReason freezeReason);
 
     Future<Void> add(ClientConnectionsEntry entry);
 
diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java
index 45f55ecc0..8c73c4d8d 100644
--- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java
+++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java
@@ -16,10 +16,6 @@
 package org.redisson.connection.balancer;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
 import org.redisson.MasterSlaveServersConfig;
@@ -99,16 +95,20 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
         return false;
     }
 
-    public Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason) {
+    public boolean freeze(String host, int port, FreezeReason freezeReason) {
         InetSocketAddress addr = new InetSocketAddress(host, port);
         ClientConnectionsEntry connectionEntry = addr2Entry.get(addr);
         if (connectionEntry == null) {
-            return Collections.emptyList();
+            return false;
         }
 
         synchronized (connectionEntry) {
-            log.debug("{} freezed", addr);
+            if (connectionEntry.isFreezed()) {
+                return false;
+            }
+            
             connectionEntry.setFreezed(true);
+
             // only RECONNECT freeze reason could be replaced
             if (connectionEntry.getFreezeReason() == null
                     || connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) {
@@ -134,11 +134,9 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
             connection.closeAsync();
         }
 
-        synchronized (connectionEntry) {
-            List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections());
-            connectionEntry.getAllSubscribeConnections().clear();
-            return list;
-        }
+        connectionManager.reattachPubSub(connectionEntry.getAllSubscribeConnections());
+        connectionEntry.getAllSubscribeConnections().clear();
+        return true;
     }
 
     public Future<RedisPubSubConnection> nextPubSubConnection() {
diff --git a/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/src/main/java/org/redisson/connection/pool/ConnectionPool.java
index 58998a2dd..00c1c6886 100644
--- a/src/main/java/org/redisson/connection/pool/ConnectionPool.java
+++ b/src/main/java/org/redisson/connection/pool/ConnectionPool.java
@@ -277,7 +277,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
 
     private void checkForReconnect(ClientConnectionsEntry entry) {
         if (entry.getNodeType() == NodeType.SLAVE) {
-            connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
+            masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(),
                     entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
             log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
             scheduleCheck(entry);