diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
index 585df5ce7..29dcba552 100644
--- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
+++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
@@ -140,7 +140,8 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
     }
 
     public RMap<String, Object> getMap(String sessionId) {
-        return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId);
+        String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
+        return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
     }
     
     @Override
diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
index 66dd91d79..691caf322 100644
--- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
+++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
@@ -120,7 +120,8 @@ public class RedissonSessionManager extends ManagerBase {
     }
 
     public RMap<String, Object> getMap(String sessionId) {
-        return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId);
+        String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
+        return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
     }
     
     @Override
diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
index 35d4d84f9..e710753c9 100644
--- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
+++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
@@ -119,7 +119,8 @@ public class RedissonSessionManager extends ManagerBase {
     }
 
     public RMap<String, Object> getMap(String sessionId) {
-        return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId);
+        String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
+        return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
     }
     
     @Override
diff --git a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
index b370552f5..b8da6512a 100644
--- a/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
+++ b/redisson-tomcat/redisson-tomcat-9/src/main/java/org/redisson/tomcat/RedissonSessionManager.java
@@ -119,7 +119,8 @@ public class RedissonSessionManager extends ManagerBase {
     }
 
     public RMap<String, Object> getMap(String sessionId) {
-        return redisson.getMap(keyPrefix + "redisson_tomcat_session:" + sessionId);
+        String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
+        return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
     }
     
     @Override
diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java
index d7108a683..d49c97a68 100644
--- a/redisson/src/main/java/org/redisson/RedisNodes.java
+++ b/redisson/src/main/java/org/redisson/RedisNodes.java
@@ -30,9 +30,12 @@ import org.redisson.api.NodesGroup;
 import org.redisson.api.RFuture;
 import org.redisson.client.RedisConnection;
 import org.redisson.client.protocol.RedisCommands;
+import org.redisson.connection.ClientConnectionsEntry;
 import org.redisson.connection.ConnectionListener;
 import org.redisson.connection.ConnectionManager;
+import org.redisson.connection.MasterSlaveEntry;
 import org.redisson.connection.RedisClientEntry;
+import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
 import org.redisson.misc.URIBuilder;
 
 import io.netty.util.concurrent.Future;
@@ -54,11 +57,17 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
 
     @Override
     public N getNode(String address) {
-        Collection<N> clients = (Collection<N>) connectionManager.getClients();
+        Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
         URI addr = URIBuilder.create(address);
-        for (N node : clients) {
-            if (URIBuilder.compare(node.getAddr(), addr)) {
-                return node;
+        for (MasterSlaveEntry masterSlaveEntry : entries) {
+            if (URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) {
+                return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
+            }
+            for (ClientConnectionsEntry entry : masterSlaveEntry.getSlaveEntries()) {
+                if (URIBuilder.compare(entry.getClient().getAddr(), addr) ||
+                        entry.getFreezeReason() == null || entry.getFreezeReason() == FreezeReason.RECONNECT) {
+                    return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType());
+                }
             }
         }
         return null;
@@ -66,11 +75,20 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
     
     @Override
     public Collection<N> getNodes(NodeType type) {
-        Collection<N> clients = (Collection<N>) connectionManager.getClients();
+        Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
         List<N> result = new ArrayList<N>();
-        for (N node : clients) {
-            if (node.getType().equals(type)) {
-                result.add(node);
+        for (MasterSlaveEntry masterSlaveEntry : entries) {
+            if (type == NodeType.MASTER) {
+                RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
+                result.add((N) entry);
+            }
+            if (type == NodeType.SLAVE) {
+                for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) {
+                    if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) {
+                        RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
+                        result.add((N) entry);
+                    }
+                }
             }
         }
         return result;
@@ -79,12 +97,25 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
 
     @Override
     public Collection<N> getNodes() {
-        return (Collection<N>) connectionManager.getClients();
+        Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
+        List<N> result = new ArrayList<N>();
+        for (MasterSlaveEntry masterSlaveEntry : entries) {
+            RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
+            result.add((N) masterEntry);
+            
+            for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) {
+                if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) {
+                    RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
+                    result.add((N) entry);
+                }
+            }
+        }
+        return result;
     }
 
     @Override
     public boolean pingAll() {
-        List<RedisClientEntry> clients = new ArrayList<RedisClientEntry>(connectionManager.getClients());
+        List<RedisClientEntry> clients = new ArrayList<RedisClientEntry>((Collection<RedisClientEntry>)getNodes());
         final Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<RedisConnection, RFuture<String>>(clients.size());
         final CountDownLatch latch = new CountDownLatch(clients.size());
         for (RedisClientEntry entry : clients) {
diff --git a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java
index d6eeb1e31..ddb4b759b 100644
--- a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java
+++ b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java
@@ -106,11 +106,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
     }
 
     private RFuture<RedissonCountDownLatchEntry> subscribe() {
-        return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
+        return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     private void unsubscribe(RFuture<RedissonCountDownLatchEntry> future) {
-        PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
+        PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     @Override
diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java
index fa4210b41..dd040f656 100644
--- a/redisson/src/main/java/org/redisson/RedissonFairLock.java
+++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java
@@ -59,13 +59,13 @@ public class RedissonFairLock extends RedissonLock implements RLock {
     @Override
     protected RFuture<RedissonLockEntry> subscribe(long threadId) {
         return PUBSUB.subscribe(getEntryName() + ":" + threadId, 
-                getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
+                getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     @Override
     protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
         PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, 
-                getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
+                getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     @Override
diff --git a/redisson/src/main/java/org/redisson/RedissonLock.java b/redisson/src/main/java/org/redisson/RedissonLock.java
index 35ef6b747..023f9b267 100644
--- a/redisson/src/main/java/org/redisson/RedissonLock.java
+++ b/redisson/src/main/java/org/redisson/RedissonLock.java
@@ -348,11 +348,11 @@ public class RedissonLock extends RedissonExpirable implements RLock {
     }
 
     protected RFuture<RedissonLockEntry> subscribe(long threadId) {
-        return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
+        return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
-        PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
+        PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     @Override
diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java
index c5632b2ca..f6f15cb4e 100644
--- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java
+++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java
@@ -29,6 +29,7 @@ import org.redisson.command.CommandExecutor;
 import org.redisson.config.MasterSlaveServersConfig;
 import org.redisson.connection.PubSubConnectionEntry;
 import org.redisson.pubsub.AsyncSemaphore;
+import org.redisson.pubsub.PublishSubscribeService;
 
 /**
  * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
@@ -39,6 +40,7 @@ import org.redisson.pubsub.AsyncSemaphore;
  */
 public class RedissonPatternTopic<M> implements RPatternTopic<M> {
 
+    final PublishSubscribeService subscribeService;
     final CommandExecutor commandExecutor;
     private final String name;
     private final Codec codec;
@@ -51,6 +53,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
         this.commandExecutor = commandExecutor;
         this.name = name;
         this.codec = codec;
+        this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService();
     }
 
     @Override
@@ -65,7 +68,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
     }
 
     private int addListener(RedisPubSubListener<?> pubSubListener) {
-        RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
+        RFuture<PubSubConnectionEntry> future = subscribeService.psubscribe(name, codec, pubSubListener);
         commandExecutor.syncSubscription(future);
         return System.identityHashCode(pubSubListener);
     }
@@ -80,10 +83,10 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
     
     @Override
     public void removeListener(int listenerId) {
-        AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
+        AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
         acquire(semaphore);
 
-        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
+        PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
         if (entry == null) {
             semaphore.release();
             return;
@@ -91,7 +94,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
         
         entry.removeListener(name, listenerId);
         if (!entry.hasListeners(name)) {
-            commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
+            subscribeService.punsubscribe(name, semaphore);
         } else {
             semaphore.release();
         }
@@ -99,10 +102,10 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
     
     @Override
     public void removeAllListeners() {
-        AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
+        AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
         acquire(semaphore);
         
-        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
+        PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
         if (entry == null) {
             semaphore.release();
             return;
@@ -110,7 +113,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
 
         entry.removeAllListeners(name);
         if (!entry.hasListeners(name)) {
-            commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
+            subscribeService.punsubscribe(name, semaphore);
         } else {
             semaphore.release();
         }
@@ -118,10 +121,10 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
 
     @Override
     public void removeListener(PatternMessageListener<M> listener) {
-        AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
+        AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
         acquire(semaphore);
         
-        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
+        PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
         if (entry == null) {
             semaphore.release();
             return;
@@ -129,7 +132,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
 
         entry.removeListener(name, listener);
         if (!entry.hasListeners(name)) {
-            commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
+            subscribeService.punsubscribe(name, semaphore);
         } else {
             semaphore.release();
         }
diff --git a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
index 131f1acee..8c4bd09cc 100644
--- a/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
+++ b/redisson/src/main/java/org/redisson/RedissonPermitExpirableSemaphore.java
@@ -587,11 +587,11 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
     }
 
     private RFuture<RedissonLockEntry> subscribe() {
-        return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager());
+        return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     private void unsubscribe(RFuture<RedissonLockEntry> future) {
-        semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager());
+        semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     @Override
diff --git a/redisson/src/main/java/org/redisson/RedissonSemaphore.java b/redisson/src/main/java/org/redisson/RedissonSemaphore.java
index 0026aa224..c78bb59f4 100644
--- a/redisson/src/main/java/org/redisson/RedissonSemaphore.java
+++ b/redisson/src/main/java/org/redisson/RedissonSemaphore.java
@@ -437,11 +437,11 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
     }
 
     private RFuture<RedissonLockEntry> subscribe() {
-        return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager());
+        return semaphorePubSub.subscribe(getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     private void unsubscribe(RFuture<RedissonLockEntry> future) {
-        semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager());
+        semaphorePubSub.unsubscribe(future.getNow(), getName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
     }
 
     @Override
diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java
index 12245d58e..9d1b8083a 100644
--- a/redisson/src/main/java/org/redisson/RedissonTopic.java
+++ b/redisson/src/main/java/org/redisson/RedissonTopic.java
@@ -34,6 +34,7 @@ import org.redisson.misc.RPromise;
 import org.redisson.misc.RedissonObjectFactory;
 import org.redisson.misc.RedissonPromise;
 import org.redisson.pubsub.AsyncSemaphore;
+import org.redisson.pubsub.PublishSubscribeService;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.Future;
@@ -48,6 +49,7 @@ import io.netty.util.concurrent.FutureListener;
  */
 public class RedissonTopic<M> implements RTopic<M> {
 
+    final PublishSubscribeService subscribeService;
     final CommandAsyncExecutor commandExecutor;
     private final String name;
     private final Codec codec;
@@ -60,6 +62,7 @@ public class RedissonTopic<M> implements RTopic<M> {
         this.commandExecutor = commandExecutor;
         this.name = name;
         this.codec = codec;
+        this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService();
     }
 
     public List<String> getChannelNames() {
@@ -103,13 +106,13 @@ public class RedissonTopic<M> implements RTopic<M> {
     }
 
     private int addListener(RedisPubSubListener<?> pubSubListener) {
-        RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
+        RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);
         commandExecutor.syncSubscription(future);
         return System.identityHashCode(pubSubListener);
     }
     
     public RFuture<Integer> addListenerAsync(final RedisPubSubListener<?> pubSubListener) {
-        RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
+        RFuture<PubSubConnectionEntry> future = subscribeService.subscribe(codec, name, pubSubListener);
         final RPromise<Integer> result = new RedissonPromise<Integer>();
         future.addListener(new FutureListener<PubSubConnectionEntry>() {
             @Override
@@ -127,10 +130,10 @@ public class RedissonTopic<M> implements RTopic<M> {
 
     @Override
     public void removeAllListeners() {
-        AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
+        AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
         acquire(semaphore);
         
-        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
+        PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
         if (entry == null) {
             semaphore.release();
             return;
@@ -138,7 +141,7 @@ public class RedissonTopic<M> implements RTopic<M> {
 
         entry.removeAllListeners(name);
         if (!entry.hasListeners(name)) {
-            commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
+            subscribeService.unsubscribe(name, semaphore);
         } else {
             semaphore.release();
         }
@@ -154,10 +157,10 @@ public class RedissonTopic<M> implements RTopic<M> {
     
     @Override
     public void removeListener(MessageListener<?> listener) {
-        AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
+        AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
         acquire(semaphore);
         
-        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
+        PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
         if (entry == null) {
             semaphore.release();
             return;
@@ -165,7 +168,7 @@ public class RedissonTopic<M> implements RTopic<M> {
 
         entry.removeListener(name, listener);
         if (!entry.hasListeners(name)) {
-            commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
+            subscribeService.unsubscribe(name, semaphore);
         } else {
             semaphore.release();
         }
@@ -174,10 +177,10 @@ public class RedissonTopic<M> implements RTopic<M> {
     
     @Override
     public void removeListener(int listenerId) {
-        AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
+        AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
         acquire(semaphore);
         
-        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
+        PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
         if (entry == null) {
             semaphore.release();
             return;
@@ -185,7 +188,7 @@ public class RedissonTopic<M> implements RTopic<M> {
 
         entry.removeListener(name, listenerId);
         if (!entry.hasListeners(name)) {
-            commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
+            subscribeService.unsubscribe(name, semaphore);
         } else {
             semaphore.release();
         }
diff --git a/redisson/src/main/java/org/redisson/api/NodeType.java b/redisson/src/main/java/org/redisson/api/NodeType.java
index 9d0cc60f2..0a7f9e5db 100644
--- a/redisson/src/main/java/org/redisson/api/NodeType.java
+++ b/redisson/src/main/java/org/redisson/api/NodeType.java
@@ -15,6 +15,11 @@
  */
 package org.redisson.api;
 
+/**
+ * 
+ * @author Nikita Koksharov
+ *
+ */
 public enum NodeType {
 
     MASTER, SLAVE, SENTINEL
diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java
index f69790536..aeaae72ce 100644
--- a/redisson/src/main/java/org/redisson/client/RedisClient.java
+++ b/redisson/src/main/java/org/redisson/client/RedisClient.java
@@ -315,6 +315,11 @@ public class RedisClient {
                     return;
                 }
                 
+                if (!hasOwnTimer && !hasOwnExecutor && !hasOwnResolver && !hasOwnGroup) {
+                    result.trySuccess(null);
+                    return;
+                }
+                
                 Thread t = new Thread() {
                     @Override
                     public void run() {
diff --git a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java
index a9393e433..b90f3f821 100644
--- a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java
+++ b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java
@@ -55,6 +55,7 @@ public class RedisClientConfig {
     private boolean keepAlive;
     private boolean tcpNoDelay;
     
+    private String sslHostname;
     private boolean sslEnableEndpointIdentification = true;
     private SslProvider sslProvider = SslProvider.JDK;
     private URI sslTruststore;
@@ -89,8 +90,17 @@ public class RedisClientConfig {
         this.sslKeystore = config.sslKeystore;
         this.sslKeystorePassword = config.sslKeystorePassword;
         this.resolverGroup = config.resolverGroup;
+        this.sslHostname = config.sslHostname;
     }
     
+    public String getSslHostname() {
+        return sslHostname;
+    }
+    public RedisClientConfig setSslHostname(String sslHostname) {
+        this.sslHostname = sslHostname;
+        return this;
+    }
+
     public RedisClientConfig setAddress(String host, int port) {
         this.address = URIBuilder.create("redis://" + host + ":" + port);
         return this;
diff --git a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java
index 39bd51c25..9645f6249 100644
--- a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java
+++ b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java
@@ -33,6 +33,7 @@ import org.redisson.client.RedisClient;
 import org.redisson.client.RedisClientConfig;
 import org.redisson.client.RedisConnection;
 import org.redisson.config.SslProvider;
+import org.redisson.misc.URIBuilder;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
@@ -162,7 +163,12 @@ public class RedisChannelInitializer extends ChannelInitializer<Channel> {
         }
 
         SslContext sslContext = sslContextBuilder.build();
-        SSLEngine sslEngine = sslContext.newEngine(ch.alloc(), config.getAddress().getHost(), config.getAddress().getPort());
+        String hostname = config.getSslHostname();
+        if (hostname == null || URIBuilder.isValidIP(hostname)) {
+            hostname = config.getAddress().getHost();
+        }
+        
+        SSLEngine sslEngine = sslContext.newEngine(ch.alloc(), hostname, config.getAddress().getPort());
         sslEngine.setSSLParameters(sslParams);
         
         SslHandler sslHandler = new SslHandler(sslEngine);
diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
index de3fe9aa0..84f774545 100644
--- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
+++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java
@@ -82,6 +82,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
     
     private RedisStrictCommand<List<ClusterNodeInfo>> clusterNodesCommand;
     
+    private String configEndpointHostName;
+    
     private boolean isConfigEndpoint;
 
     private AddressResolver<InetSocketAddress> resolver;
@@ -95,7 +97,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
         Throwable lastException = null;
         List<String> failedMasters = new ArrayList<String>();
         for (URI addr : cfg.getNodeAddresses()) {
-            RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
+            RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
             try {
                 RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
 
@@ -104,6 +106,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
                     Future<List<InetSocketAddress>> addrsFuture = resolver.resolveAll(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort()));
                     List<InetSocketAddress> allAddrs = addrsFuture.syncUninterruptibly().getNow();
                     if (allAddrs.size() > 1) {
+                        configEndpointHostName = addr.getHost();
                         isConfigEndpoint = true;
                     } else {
                         resolver.close();
@@ -178,8 +181,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
     }
     
     @Override
-    protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) {
-        RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout);
+    protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) {
+        RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
         result.setReadOnly(type == NodeType.SLAVE && config.getReadMode() != ReadMode.MASTER);
         return result;
     }
@@ -198,7 +201,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
         }
 
         final RPromise<Collection<RFuture<Void>>> result = new RedissonPromise<Collection<RFuture<Void>>>();
-        RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null);
+        RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null, configEndpointHostName);
         connectionFuture.addListener(new FutureListener<RedisConnection>() {
             @Override
             public void operationComplete(Future<RedisConnection> future) throws Exception {
@@ -351,7 +354,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
             return;
         }
         final URI uri = iterator.next();
-        RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, null);
+        RFuture<RedisConnection> connectionFuture = connectToNode(cfg, uri, null, configEndpointHostName);
         connectionFuture.addListener(new FutureListener<RedisConnection>() {
             @Override
             public void operationComplete(Future<RedisConnection> future) throws Exception {
@@ -651,6 +654,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
             }
         }
     }
+    
+    public String getConfigEndpointHostName() {
+        return configEndpointHostName;
+    }
 
     @Override
     public int calcSlot(String key) {
diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java
index d6427cb24..4de8b004d 100644
--- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java
+++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java
@@ -26,15 +26,13 @@ import org.redisson.api.NodeType;
 import org.redisson.api.RFuture;
 import org.redisson.client.RedisClient;
 import org.redisson.client.RedisConnection;
-import org.redisson.client.RedisPubSubListener;
 import org.redisson.client.codec.Codec;
 import org.redisson.client.protocol.RedisCommand;
-import org.redisson.client.protocol.pubsub.PubSubType;
 import org.redisson.command.CommandSyncService;
 import org.redisson.config.Config;
 import org.redisson.config.MasterSlaveServersConfig;
 import org.redisson.misc.InfinitySemaphoreLatch;
-import org.redisson.pubsub.AsyncSemaphore;
+import org.redisson.pubsub.PublishSubscribeService;
 
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.Timeout;
@@ -51,6 +49,8 @@ public interface ConnectionManager {
     
     CommandSyncService getCommandExecutor();
     
+    PublishSubscribeService getSubscribeService();
+    
     ExecutorService getExecutor();
     
     URI getLastClusterNode();
@@ -59,22 +59,14 @@ public interface ConnectionManager {
 
     boolean isClusterMode();
 
-    AsyncSemaphore getSemaphore(String channelName);
-    
     ConnectionEventsHub getConnectionEventsHub();
 
     boolean isShutdown();
 
     boolean isShuttingDown();
-
-    RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners);
-
-    RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners);
     
     IdleConnectionWatcher getConnectionWatcher();
 
-    Collection<RedisClientEntry> getClients();
-
     void shutdownAsync(RedisClient client);
 
     int calcSlot(String key);
@@ -97,26 +89,14 @@ public interface ConnectionManager {
 
     RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command);
 
-    RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout);
+    RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname);
 
-    RedisClient createClient(NodeType type, InetSocketAddress address, URI uri);
+    RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname);
     
-    RedisClient createClient(NodeType type, URI address);
+    RedisClient createClient(NodeType type, URI address, String sslHostname);
 
     MasterSlaveEntry getEntry(RedisClient redisClient);
     
-    PubSubConnectionEntry getPubSubEntry(String channelName);
-
-    RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, RedisPubSubListener<?>... listeners);
-    
-    RFuture<PubSubConnectionEntry> psubscribe(String pattern, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners);
-
-    void unsubscribe(String channelName, AsyncSemaphore lock);
-    
-    RFuture<Codec> unsubscribe(String channelName, PubSubType topicType);
-
-    void punsubscribe(String channelName, AsyncSemaphore lock);
-    
     void shutdown();
 
     void shutdown(long quietPeriod, long timeout, TimeUnit unit);
diff --git a/redisson/src/main/java/org/redisson/connection/FutureConnectionListener.java b/redisson/src/main/java/org/redisson/connection/FutureConnectionListener.java
deleted file mode 100644
index 4e58cecd3..000000000
--- a/redisson/src/main/java/org/redisson/connection/FutureConnectionListener.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright 2018 Nikita Koksharov
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.redisson.connection;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.redisson.api.RFuture;
-import org.redisson.client.RedisConnection;
-import org.redisson.client.protocol.RedisCommand;
-import org.redisson.misc.RPromise;
-
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-
-public class FutureConnectionListener<T extends RedisConnection> implements FutureListener<Object> {
-
-    private final AtomicInteger commandsCounter = new AtomicInteger();
-
-    private final RPromise<T> connectionPromise;
-    private final T connection;
-    private final List<Runnable> commands = new ArrayList<Runnable>(4);
-
-    public FutureConnectionListener(RPromise<T> connectionFuture, T connection) {
-        super();
-        this.connectionPromise = connectionFuture;
-        this.connection = connection;
-    }
-
-    public void addCommand(final RedisCommand<?> command, final Object ... params) {
-        commandsCounter.incrementAndGet();
-        commands.add(new Runnable() {
-            @Override
-            public void run() {
-                RFuture<Object> future = connection.async(command, params);
-                future.addListener(FutureConnectionListener.this);
-            }
-        });
-    }
-
-    public void executeCommands() {
-        if (commands.isEmpty()) {
-            connectionPromise.trySuccess(connection);
-            return;
-        }
-
-        for (Runnable command : commands) {
-            command.run();
-        }
-        commands.clear();
-    }
-
-    @Override
-    public void operationComplete(Future<Object> future) throws Exception {
-        if (!future.isSuccess()) {
-            connection.closeAsync();
-            connectionPromise.tryFailure(future.cause());
-            return;
-        }
-        if (commandsCounter.decrementAndGet() == 0) {
-            connectionPromise.trySuccess(connection);
-        }
-    }
-
-}
diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java
index 10bfdebce..3801a29f9 100644
--- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java
+++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java
@@ -21,14 +21,10 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -37,19 +33,13 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.redisson.Version;
 import org.redisson.api.NodeType;
 import org.redisson.api.RFuture;
-import org.redisson.client.BaseRedisPubSubListener;
 import org.redisson.client.RedisClient;
 import org.redisson.client.RedisClientConfig;
 import org.redisson.client.RedisConnection;
 import org.redisson.client.RedisException;
 import org.redisson.client.RedisNodeNotFoundException;
-import org.redisson.client.RedisPubSubConnection;
-import org.redisson.client.RedisPubSubListener;
-import org.redisson.client.RedisTimeoutException;
-import org.redisson.client.SubscribeListener;
 import org.redisson.client.codec.Codec;
 import org.redisson.client.protocol.RedisCommand;
-import org.redisson.client.protocol.pubsub.PubSubType;
 import org.redisson.cluster.ClusterSlotRange;
 import org.redisson.command.CommandSyncService;
 import org.redisson.config.BaseMasterSlaveServersConfig;
@@ -61,6 +51,7 @@ import org.redisson.misc.RPromise;
 import org.redisson.misc.RedissonPromise;
 import org.redisson.misc.URIBuilder;
 import org.redisson.pubsub.AsyncSemaphore;
+import org.redisson.pubsub.PublishSubscribeService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,10 +127,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
 
     protected final Class<? extends SocketChannel> socketChannelClass;
 
-    protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
-    
-    protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
-
     protected DNSMonitor dnsMonitor;
     
     protected MasterSlaveServersConfig config;
@@ -151,8 +138,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
 
     private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
 
-    private final Map<RedisClient, RedisClientEntry> clientEntries = PlatformDependent.newConcurrentHashMap();
-
     private IdleConnectionWatcher connectionWatcher;
 
     private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub();
@@ -161,14 +146,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
     
     private final ExecutorService executor; 
     
-    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
-    
     private final CommandSyncService commandExecutor;
     
     private final Config cfg;
 
     protected final DnsAddressResolverGroup resolverGroup;
     
+    private PublishSubscribeService subscribeService;
+    
     private final Map<Object, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
     
     {
@@ -179,8 +164,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
 
     public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
         this(config);
-        initTimer(cfg);
         this.config = cfg;
+        
+        initTimer(cfg);
         initSingleEntry();
     }
 
@@ -250,7 +236,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
         }
     }
     
-    protected RFuture<RedisConnection> connectToNode(BaseMasterSlaveServersConfig<?> cfg, final URI addr, RedisClient client) {
+    protected RFuture<RedisConnection> connectToNode(BaseMasterSlaveServersConfig<?> cfg, final URI addr, RedisClient client, String sslHostname) {
         final Object key;
         if (client != null) {
             key = client;
@@ -263,7 +249,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
         }
 
         if (addr != null) {
-            client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts());
+            client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getRetryInterval() * cfg.getRetryAttempts(), sslHostname);
         }
         final RPromise<RedisConnection> result = new RedissonPromise<RedisConnection>();
         RFuture<RedisConnection> future = client.connectAsync();
@@ -348,6 +334,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
         }
         
         connectionWatcher = new IdleConnectionWatcher(this, config);
+        subscribeService = new PublishSubscribeService(this, config);
     }
 
     protected void initSingleEntry() {
@@ -430,41 +417,36 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
     }
 
     @Override
-    public RedisClient createClient(NodeType type, URI address) {
-        RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts());
-        clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type));
+    public RedisClient createClient(NodeType type, URI address, String sslHostname) {
+        RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname);
         return client;
     }
     
     @Override
-    public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri) {
-        RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts());
-        clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type));
+    public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname) {
+        RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname);
         return client;
     }
 
     @Override
     public void shutdownAsync(RedisClient client) {
-        if (clientEntries.remove(client) == null) {
-            log.error("Can't find client {}", client);
-        }
         client.shutdownAsync();
     }
 
     @Override
-    public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout) {
-        RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout);
+    public RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) {
+        RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
         return RedisClient.create(redisConfig);
     }
     
-    private RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, int timeout, int commandTimeout) {
-        RedisClientConfig redisConfig = createRedisConfig(type, null, timeout, commandTimeout);
+    private RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, int timeout, int commandTimeout, String sslHostname) {
+        RedisClientConfig redisConfig = createRedisConfig(type, null, timeout, commandTimeout, sslHostname);
         redisConfig.setAddress(address, uri);
         return RedisClient.create(redisConfig);
     }
 
 
-    protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) {
+    protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout, String sslHostname) {
         RedisClientConfig redisConfig = new RedisClientConfig();
         redisConfig.setAddress(address)
               .setTimer(timer)
@@ -474,6 +456,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
               .setSocketChannelClass(socketChannelClass)
               .setConnectTimeout(timeout)
               .setCommandTimeout(commandTimeout)
+              .setSslHostname(sslHostname)
               .setSslEnableEndpointIdentification(config.isSslEnableEndpointIdentification())
               .setSslProvider(config.getSslProvider())
               .setSslTruststore(config.getSslTruststore())
@@ -499,304 +482,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
         return singleSlotRange.getStartSlot();
     }
 
-    @Override
-    public PubSubConnectionEntry getPubSubEntry(String channelName) {
-        return name2PubSubConnection.get(channelName);
-    }
-
-    @Override
-    public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?>... listeners) {
-        return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
-    }
-    
-    @Override
-    public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
-        RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
-        subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
-        return promise;
-    }
-
-    @Override
-    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners) {
-        return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
-    }
-
-    private RFuture<PubSubConnectionEntry> subscribe(final PubSubType type, final Codec codec, final String channelName,
-            final RPromise<PubSubConnectionEntry> promise, final RedisPubSubListener<?>... listeners) {
-        final AsyncSemaphore lock = getSemaphore(channelName);
-        lock.acquire(new Runnable() {
-            @Override
-            public void run() {
-                if (promise.isDone()) {
-                    lock.release();
-                    return;
-                }
-                
-                final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
-                result.addListener(new FutureListener<PubSubConnectionEntry>() {
-                    @Override
-                    public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
-                        if (!future.isSuccess()) {
-                            subscribe(type, codec, channelName, promise, listeners);
-                            return;
-                        }
-                        
-                        promise.trySuccess(result.getNow());
-                    }
-                });
-                subscribe(codec, channelName, result, type, lock, listeners);
-            }
-        });
-        return promise;
-    }
-    
-    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
-        RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
-        subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners);
-        return promise;
-    }
-
-    public AsyncSemaphore getSemaphore(String channelName) {
-        return locks[Math.abs(channelName.hashCode() % locks.length)];
-    }
-    
-    private void subscribe(final Codec codec, final String channelName, 
-            final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
-        final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
-        if (connEntry != null) {
-            subscribe(channelName, promise, type, lock, connEntry, listeners);
-            return;
-        }
-
-        freePubSubLock.acquire(new Runnable() {
-
-            @Override
-            public void run() {
-                if (promise.isDone()) {
-                    lock.release();
-                    freePubSubLock.release();
-                    return;
-                }
-                
-                final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
-                if (freeEntry == null) {
-                    connect(codec, channelName, promise, type, lock, listeners);
-                    return;
-                }
-                
-                int remainFreeAmount = freeEntry.tryAcquire();
-                if (remainFreeAmount == -1) {
-                    throw new IllegalStateException();
-                }
-                
-                final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
-                if (oldEntry != null) {
-                    freeEntry.release();
-                    freePubSubLock.release();
-                    
-                    subscribe(channelName, promise, type, lock, oldEntry, listeners);
-                    return;
-                }
-                
-                if (remainFreeAmount == 0) {
-                    freePubSubConnections.poll();
-                }
-                freePubSubLock.release();
-                
-                subscribe(channelName, promise, type, lock, freeEntry, listeners);
-                
-                if (PubSubType.PSUBSCRIBE == type) {
-                    freeEntry.psubscribe(codec, channelName);
-                } else {
-                    freeEntry.subscribe(codec, channelName);
-                }
-            }
-            
-        });
-    }
-
-    private void subscribe(final String channelName, final RPromise<PubSubConnectionEntry> promise,
-            final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry,
-            final RedisPubSubListener<?>... listeners) {
-        for (RedisPubSubListener<?> listener : listeners) {
-            connEntry.addListener(channelName, listener);
-        }
-        SubscribeListener listener = connEntry.getSubscribeFuture(channelName, type);
-        final Future<Void> subscribeFuture = listener.getSuccessFuture();
-        
-        subscribeFuture.addListener(new FutureListener<Void>() {
-            @Override
-            public void operationComplete(Future<Void> future) throws Exception {
-                if (!promise.trySuccess(connEntry)) {
-                    for (RedisPubSubListener<?> listener : listeners) {
-                        connEntry.removeListener(channelName, listener);
-                    }
-                    if (!connEntry.hasListeners(channelName)) {
-                        unsubscribe(channelName, lock);
-                    } else {
-                        lock.release();
-                    }
-                } else {
-                    lock.release();
-                }
-            }
-        });
-
-        newTimeout(new TimerTask() {
-            @Override
-            public void run(Timeout timeout) throws Exception {
-                if (promise.tryFailure(new RedisTimeoutException())) {
-                    subscribeFuture.cancel(false);
-                }
-            }
-        }, config.getRetryInterval(), TimeUnit.MILLISECONDS);
-    }
-
-    private void connect(final Codec codec, final String channelName,
-            final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
-        final int slot = calcSlot(channelName);
-        RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
-        connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
-
-            @Override
-            public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
-                if (!future.isSuccess()) {
-                    freePubSubLock.release();
-                    lock.release();
-                    promise.tryFailure(future.cause());
-                    return;
-                }
-
-                RedisPubSubConnection conn = future.getNow();
-                
-                final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
-                entry.tryAcquire();
-                
-                final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
-                if (oldEntry != null) {
-                    releaseSubscribeConnection(slot, entry);
-                    
-                    freePubSubLock.release();
-
-                    subscribe(channelName, promise, type, lock, oldEntry, listeners);
-                    return;
-                }
-                
-                freePubSubConnections.add(entry);
-                freePubSubLock.release();
-                
-                subscribe(channelName, promise, type, lock, entry, listeners);
-                
-                if (PubSubType.PSUBSCRIBE == type) {
-                    entry.psubscribe(codec, channelName);
-                } else {
-                    entry.subscribe(codec, channelName);
-                }
-                
-            }
-        });
-    }
-
-    public void unsubscribe(final String channelName, final AsyncSemaphore lock) {
-        final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
-        if (entry == null) {
-            lock.release();
-            return;
-        }
-        
-        entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
-            
-            @Override
-            public boolean onStatus(PubSubType type, String channel) {
-                if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
-                    
-                    if (entry.release() == 1) {
-                        freePubSubConnections.add(entry);
-                    }
-                    
-                    lock.release();
-                    return true;
-                }
-                return false;
-            }
-            
-        });
-    }
-    
-    @Override
-    public RFuture<Codec> unsubscribe(final String channelName, final PubSubType topicType) {
-        final RPromise<Codec> result = new RedissonPromise<Codec>();
-        final AsyncSemaphore lock = getSemaphore(channelName);
-        lock.acquire(new Runnable() {
-            @Override
-            public void run() {
-                final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
-                if (entry == null) {
-                    lock.release();
-                    result.trySuccess(null);
-                    return;
-                }
-
-                freePubSubLock.acquire(new Runnable() {
-                    @Override
-                    public void run() {
-                        freePubSubConnections.remove(entry);
-                        freePubSubLock.release();
-                        
-                        final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
-                        RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
-
-                            @Override
-                            public boolean onStatus(PubSubType type, String channel) {
-                                if (type == topicType && channel.equals(channelName)) {
-                                    lock.release();
-                                    result.trySuccess(entryCodec);
-                                    return true;
-                                }
-                                return false;
-                            }
-
-                        };
-
-                        if (topicType == PubSubType.PUNSUBSCRIBE) {
-                            entry.punsubscribe(channelName, listener);
-                        } else {
-                            entry.unsubscribe(channelName, listener);
-                        }
-                    }
-                });
-            }
-        });
-        
-        return result;
-    }
-    
-    @Override
-    public void punsubscribe(final String channelName, final AsyncSemaphore lock) {
-        final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
-        if (entry == null) {
-            lock.release();
-            return;
-        }
-        
-        entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
-            
-            @Override
-            public boolean onStatus(PubSubType type, String channel) {
-                if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
-                    
-                    if (entry.release() == 1) {
-                        freePubSubConnections.add(entry);
-                    }
-                    
-                    lock.release();
-                    return true;
-                }
-                return false;
-            }
-            
-        });
-    }
 
     @Override
     public MasterSlaveEntry getEntry(InetSocketAddress address) {
@@ -903,24 +588,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
         return entry.connectionReadOp(command);
     }
 
-    RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
-        MasterSlaveEntry entry = getEntry(slot);
-        if (entry == null) {
-            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet");
-            return RedissonPromise.newFailedFuture(ex);
-        }
-        return entry.nextPubSubConnection();
-    }
-
-    protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) {
-        MasterSlaveEntry entry = getEntry(slot);
-        if (entry == null) {
-            log.error("Node for slot: " + slot + " can't be found");
-        } else {
-            entry.returnPubSubConnection(pubSubEntry);
-        }
-    }
-
     @Override
     public void releaseWrite(NodeSource source, RedisConnection connection) {
         MasterSlaveEntry entry = getEntry(source);
@@ -989,11 +656,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
         return group.isTerminated();
     }
 
-    @Override
-    public Collection<RedisClientEntry> getClients() {
-        return Collections.unmodifiableCollection(clientEntries.values());
-    }
-
     @Override
     public EventLoopGroup getGroup() {
         return group;
@@ -1035,6 +697,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
         group.shutdownGracefully().syncUninterruptibly();
     }
     
+    public PublishSubscribeService getSubscribeService() {
+        return subscribeService;
+    }
+    
     public ExecutorService getExecutor() {
         return executor;
     }
diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java
index c2d49c4e1..ba06d3397 100644
--- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java
+++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java
@@ -17,6 +17,7 @@ package org.redisson.connection;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -29,12 +30,10 @@ import org.redisson.api.RFuture;
 import org.redisson.client.RedisClient;
 import org.redisson.client.RedisConnection;
 import org.redisson.client.RedisPubSubConnection;
-import org.redisson.client.RedisPubSubListener;
-import org.redisson.client.codec.Codec;
 import org.redisson.client.protocol.CommandData;
 import org.redisson.client.protocol.RedisCommand;
 import org.redisson.client.protocol.RedisCommands;
-import org.redisson.client.protocol.pubsub.PubSubType;
+import org.redisson.cluster.ClusterConnectionManager;
 import org.redisson.cluster.ClusterSlotRange;
 import org.redisson.config.MasterSlaveServersConfig;
 import org.redisson.config.ReadMode;
@@ -77,6 +76,8 @@ public class MasterSlaveEntry {
 
     final AtomicBoolean active = new AtomicBoolean(true);
     
+    String sslHostname;
+    
     public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
         for (ClusterSlotRange clusterSlotRange : slotRanges) {
             for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
@@ -89,6 +90,10 @@ public class MasterSlaveEntry {
         slaveBalancer = new LoadBalancerManager(config, connectionManager, this);
         writeConnectionPool = new MasterConnectionPool(config, connectionManager, this);
         pubSubConnectionPool = new MasterPubSubConnectionPool(config, connectionManager, this);
+        
+        if (connectionManager instanceof ClusterConnectionManager) {
+            sslHostname = ((ClusterConnectionManager) connectionManager).getConfigEndpointHostName();
+        }
     }
 
     public MasterSlaveServersConfig getConfig() {
@@ -111,13 +116,13 @@ public class MasterSlaveEntry {
     }
     
     public RFuture<RedisClient> setupMasterEntry(InetSocketAddress address, URI uri) {
-        RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri);
+        RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri, sslHostname);
         return setupMasterEntry(client);
     }
     
 
     public RFuture<RedisClient> setupMasterEntry(URI address) {
-        RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
+        RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);
         return setupMasterEntry(client);
     }
 
@@ -164,7 +169,7 @@ public class MasterSlaveEntry {
             return false;
         }
         
-        return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
+        return slaveDown(entry);
     }
     
     public boolean slaveDown(InetSocketAddress address, FreezeReason freezeReason) {
@@ -173,7 +178,7 @@ public class MasterSlaveEntry {
             return false;
         }
         
-        return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
+        return slaveDown(entry);
     }
     
     public boolean slaveDown(URI address, FreezeReason freezeReason) {
@@ -182,10 +187,10 @@ public class MasterSlaveEntry {
             return false;
         }
         
-        return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
+        return slaveDown(entry);
     }
     
-    private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
+    private boolean slaveDown(ClientConnectionsEntry entry) {
         // add master as slave if no more slaves available
         if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
             if (slaveBalancer.unfreeze(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM)) {
@@ -198,7 +203,7 @@ public class MasterSlaveEntry {
         closeConnections(entry);
         
         for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
-            reattachPubSub(connection, temporaryDown);
+            connectionManager.getSubscribeService().reattachPubSub(connection);
         }
         entry.getAllSubscribeConnections().clear();
         
@@ -231,79 +236,6 @@ public class MasterSlaveEntry {
         }
     }
     
-    private void reattachPubSub(RedisPubSubConnection redisPubSubConnection, boolean temporaryDown) {
-        for (String channelName : redisPubSubConnection.getChannels().keySet()) {
-            PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
-            Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
-            reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE);
-        }
-
-        for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
-            PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
-            Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
-            reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE);
-        }
-    }
-
-    private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners, final PubSubType topicType) {
-        RFuture<Codec> subscribeCodec = connectionManager.unsubscribe(channelName, topicType);
-        if (listeners.isEmpty()) {
-            return;
-        }
-        
-        subscribeCodec.addListener(new FutureListener<Codec>() {
-            @Override
-            public void operationComplete(Future<Codec> future) throws Exception {
-                if (future.get() == null) {
-                    return;
-                }
-                
-                Codec subscribeCodec = future.get();
-                if (topicType == PubSubType.PUNSUBSCRIBE) {
-                    psubscribe(channelName, listeners, subscribeCodec);
-                } else {
-                    subscribe(channelName, listeners, subscribeCodec);
-                }
-            }
-
-        });
-    }
-
-    private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
-            final Codec subscribeCodec) {
-        RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
-        subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
-            
-            @Override
-            public void operationComplete(Future<PubSubConnectionEntry> future)
-                    throws Exception {
-                if (!future.isSuccess()) {
-                    subscribe(channelName, listeners, subscribeCodec);
-                    return;
-                }
-                
-                log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, future.getNow().getConnection().getRedisClient());
-            }
-        });
-    }
-
-    private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
-            final Codec subscribeCodec) {
-        RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
-        subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
-            @Override
-            public void operationComplete(Future<PubSubConnectionEntry> future)
-                    throws Exception {
-                if (!future.isSuccess()) {
-                    psubscribe(channelName, listeners, subscribeCodec);
-                    return;
-                }
-                
-                log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient());
-            }
-        });
-    }
-
     private void reattachBlockingQueue(RedisConnection connection) {
         final CommandData<?, ?> commandData = connection.getCurrentCommand();
 
@@ -404,15 +336,25 @@ public class MasterSlaveEntry {
     }
     
     private RFuture<Void> addSlave(InetSocketAddress address, URI uri, final boolean freezed, final NodeType nodeType) {
-        RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri);
+        RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri, sslHostname);
         return addSlave(client, freezed, nodeType);
     }
     
     private RFuture<Void> addSlave(URI address, final boolean freezed, final NodeType nodeType) {
-        RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
+        RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, sslHostname);
         return addSlave(client, freezed, nodeType);
     }
 
+    public Collection<ClientConnectionsEntry> getSlaveEntries() {
+        List<ClientConnectionsEntry> result = new ArrayList<ClientConnectionsEntry>();
+        for (ClientConnectionsEntry slaveEntry : slaveBalancer.getEntries()) {
+            if (slaveEntry.getNodeType() == NodeType.SLAVE) {
+                result.add(slaveEntry);
+            }
+        }
+        return result;
+    }
+    
     public RedisClient getClient() {
         return masterEntry.getClient();
     }
@@ -503,7 +445,7 @@ public class MasterSlaveEntry {
                 pubSubConnectionPool.remove(oldMaster);
                 
                 oldMaster.freezeMaster(FreezeReason.MANAGER);
-                slaveDown(oldMaster, false);
+                slaveDown(oldMaster);
 
                 slaveBalancer.changeType(oldMaster.getClient(), NodeType.SLAVE);
                 slaveBalancer.changeType(newMasterClient, NodeType.MASTER);
@@ -566,7 +508,7 @@ public class MasterSlaveEntry {
         return slaveBalancer.getConnection(command, addr);
     }
 
-    RFuture<RedisPubSubConnection> nextPubSubConnection() {
+    public RFuture<RedisPubSubConnection> nextPubSubConnection() {
         if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
             return pubSubConnectionPool.get();
         }
diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java
index 9675a1683..bd0d6148f 100644
--- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java
+++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java
@@ -67,7 +67,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
         initTimer(this.config);
 
         for (URI addr : cfg.getNodeAddresses()) {
-            RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
+            RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
             connectionFuture.awaitUninterruptibly();
             RedisConnection connection = connectionFuture.getNow();
             if (connection == null) {
@@ -119,7 +119,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
                         return;
                     }
 
-                    RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);
+                    RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null, addr.getHost());
                     connectionFuture.addListener(new FutureListener<RedisConnection>() {
                         @Override
                         public void operationComplete(Future<RedisConnection> future) throws Exception {
diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java
index e9ca00f51..8a607da2e 100755
--- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java
+++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java
@@ -90,7 +90,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
         this.sentinelResolver = resolverGroup.getResolver(getGroup().next());
         
         for (URI addr : cfg.getSentinelAddresses()) {
-            RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts());
+            RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts(), null);
             try {
                 RedisConnection connection = client.connect();
                 if (!connection.isActive()) {
@@ -261,7 +261,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
         }
 
         RedisClient client = iterator.next();
-        RFuture<RedisConnection> connectionFuture = connectToNode(null, null, client);
+        RFuture<RedisConnection> connectionFuture = connectToNode(null, null, client, null);
         connectionFuture.addListener(new FutureListener<RedisConnection>() {
             @Override
             public void operationComplete(Future<RedisConnection> future) throws Exception {
@@ -433,7 +433,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
             return RedissonPromise.newSucceededFuture(null);
         }
         
-        client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
+        client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts(), null);
         RedisClient oldClient = sentinels.putIfAbsent(key, client);
         if (oldClient != null) {
             return RedissonPromise.newSucceededFuture(null);
diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
index c36197e94..35d1c5d9f 100644
--- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
+++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java
@@ -17,6 +17,8 @@ package org.redisson.connection.balancer;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import org.redisson.api.NodeType;
@@ -98,6 +100,10 @@ public class LoadBalancerManager {
         return result;
     }
 
+    public Collection<ClientConnectionsEntry> getEntries() {
+        return Collections.unmodifiableCollection(client2Entry.values());
+    }
+    
     public int getAvailableClients() {
         int count = 0;
         for (ClientConnectionsEntry connectionEntry : client2Entry.values()) {
@@ -154,12 +160,6 @@ public class LoadBalancerManager {
         return freeze(connectionEntry, freezeReason);
     }
 
-    
-    public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) {
-        ClientConnectionsEntry connectionEntry = getEntry(redisClient);
-        return freeze(connectionEntry, freezeReason);
-    }
-
     public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
         if (connectionEntry == null) {
             return null;
diff --git a/redisson/src/main/java/org/redisson/misc/URIBuilder.java b/redisson/src/main/java/org/redisson/misc/URIBuilder.java
index f25511e7d..3a9476d2f 100644
--- a/redisson/src/main/java/org/redisson/misc/URIBuilder.java
+++ b/redisson/src/main/java/org/redisson/misc/URIBuilder.java
@@ -17,34 +17,44 @@ package org.redisson.misc;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.regex.Pattern;
 
 /**
  *
  * @author Rui Gu (https://github.com/jackygurui)
  */
 public class URIBuilder {
-    
+
+    private static final Pattern ipv4Pattern = Pattern.compile("(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])", Pattern.CASE_INSENSITIVE);
+    private static final Pattern ipv6Pattern = Pattern.compile("([0-9a-f]{1,4}:){7}([0-9a-f]){1,4}", Pattern.CASE_INSENSITIVE);
+
     public static URI create(String uri) {
         URI u = URI.create(uri);
-        //Let's assuming most of the time it is OK.
+        // Let's assuming most of the time it is OK.
         if (u.getHost() != null) {
             return u;
         }
-        String s = uri.substring(0, uri.lastIndexOf(":"))
-                .replaceFirst("redis://", "")
-                .replaceFirst("rediss://", "");
-        //Assuming this is an IPv6 format, other situations will be handled by
-        //Netty at a later stage.
+        String s = uri.substring(0, uri.lastIndexOf(":")).replaceFirst("redis://", "").replaceFirst("rediss://", "");
+        // Assuming this is an IPv6 format, other situations will be handled by
+        // Netty at a later stage.
         return URI.create(uri.replace(s, "[" + s + "]"));
     }
     
+    public static boolean isValidIP(String host) {
+        if (ipv4Pattern.matcher(host).matches()) {
+            return true;
+        }
+        
+        return ipv6Pattern.matcher(host).matches();
+    }
+
     public static boolean compare(InetSocketAddress entryAddr, URI addr) {
         if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost()))
                 || entryAddr.getAddress().getHostAddress().equals(addr.getHost()))
-                    && entryAddr.getPort() == addr.getPort()) {
+                && entryAddr.getPort() == addr.getPort()) {
             return true;
         }
         return false;
     }
-    
+
 }
diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java
index 2ff1ef5b0..bb5b2255c 100644
--- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java
+++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribe.java
@@ -40,8 +40,8 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
 
     private final ConcurrentMap<String, E> entries = PlatformDependent.newConcurrentHashMap();
 
-    public void unsubscribe(final E entry, final String entryName, final String channelName, final ConnectionManager connectionManager) {
-        final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
+    public void unsubscribe(final E entry, final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
+        final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
         semaphore.acquire(new Runnable() {
             @Override
             public void run() {
@@ -51,7 +51,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
                     if (!removed) {
                         throw new IllegalStateException();
                     }
-                    connectionManager.unsubscribe(channelName, semaphore);
+                    subscribeService.unsubscribe(channelName, semaphore);
                 } else {
                     semaphore.release();
                 }
@@ -64,9 +64,9 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
         return entries.get(entryName);
     }
 
-    public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
+    public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
         final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
-        final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
+        final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
         final RPromise<E> newPromise = new RedissonPromise<E>() {
             @Override
             public boolean cancel(boolean mayInterruptIfRunning) {
@@ -98,7 +98,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
                 }
                 
                 RedisPubSubListener<Object> listener = createListener(channelName, value);
-                connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
+                subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
             }
         };
         semaphore.acquire(listener);
diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java
new file mode 100644
index 000000000..985247e99
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java
@@ -0,0 +1,463 @@
+/**
+ * Copyright 2018 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson.pubsub;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.redisson.api.RFuture;
+import org.redisson.client.BaseRedisPubSubListener;
+import org.redisson.client.RedisNodeNotFoundException;
+import org.redisson.client.RedisPubSubConnection;
+import org.redisson.client.RedisPubSubListener;
+import org.redisson.client.RedisTimeoutException;
+import org.redisson.client.SubscribeListener;
+import org.redisson.client.codec.Codec;
+import org.redisson.client.protocol.pubsub.PubSubType;
+import org.redisson.config.MasterSlaveServersConfig;
+import org.redisson.connection.ConnectionManager;
+import org.redisson.connection.MasterSlaveEntry;
+import org.redisson.connection.PubSubConnectionEntry;
+import org.redisson.misc.RPromise;
+import org.redisson.misc.RedissonPromise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * 
+ * @author Nikita Koksharov
+ *
+ */
+public class PublishSubscribeService {
+
+    private static final Logger log = LoggerFactory.getLogger(PublishSubscribeService.class);
+    
+    private final ConnectionManager connectionManager;
+    
+    private final MasterSlaveServersConfig config;
+    
+    private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
+    
+    private final AsyncSemaphore freePubSubLock = new AsyncSemaphore(1);
+    
+    protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();
+    
+    protected final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
+
+    public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
+        super();
+        this.connectionManager = connectionManager;
+        this.config = config;
+        for (int i = 0; i < locks.length; i++) {
+            locks[i] = new AsyncSemaphore(1);
+        }
+    }
+
+    public PubSubConnectionEntry getPubSubEntry(String channelName) {
+        return name2PubSubConnection.get(channelName);
+    }
+
+    public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, RedisPubSubListener<?>... listeners) {
+        return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
+    }
+    
+    public RFuture<PubSubConnectionEntry> psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
+        RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
+        subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners);
+        return promise;
+    }
+
+    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?>... listeners) {
+        return subscribe(PubSubType.SUBSCRIBE, codec, channelName, new RedissonPromise<PubSubConnectionEntry>(), listeners);
+    }
+
+    private RFuture<PubSubConnectionEntry> subscribe(final PubSubType type, final Codec codec, final String channelName,
+            final RPromise<PubSubConnectionEntry> promise, final RedisPubSubListener<?>... listeners) {
+        final AsyncSemaphore lock = getSemaphore(channelName);
+        lock.acquire(new Runnable() {
+            @Override
+            public void run() {
+                if (promise.isDone()) {
+                    lock.release();
+                    return;
+                }
+                
+                final RPromise<PubSubConnectionEntry> result = new RedissonPromise<PubSubConnectionEntry>();
+                result.addListener(new FutureListener<PubSubConnectionEntry>() {
+                    @Override
+                    public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
+                        if (!future.isSuccess()) {
+                            subscribe(type, codec, channelName, promise, listeners);
+                            return;
+                        }
+                        
+                        promise.trySuccess(result.getNow());
+                    }
+                });
+                subscribe(codec, channelName, result, type, lock, listeners);
+            }
+        });
+        return promise;
+    }
+    
+    public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
+        RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
+        subscribe(codec, channelName, promise, PubSubType.SUBSCRIBE, semaphore, listeners);
+        return promise;
+    }
+
+    public AsyncSemaphore getSemaphore(String channelName) {
+        return locks[Math.abs(channelName.hashCode() % locks.length)];
+    }
+    
+    private void subscribe(final Codec codec, final String channelName, 
+            final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
+        final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
+        if (connEntry != null) {
+            subscribe(channelName, promise, type, lock, connEntry, listeners);
+            return;
+        }
+
+        freePubSubLock.acquire(new Runnable() {
+
+            @Override
+            public void run() {
+                if (promise.isDone()) {
+                    lock.release();
+                    freePubSubLock.release();
+                    return;
+                }
+                
+                final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
+                if (freeEntry == null) {
+                    connect(codec, channelName, promise, type, lock, listeners);
+                    return;
+                }
+                
+                int remainFreeAmount = freeEntry.tryAcquire();
+                if (remainFreeAmount == -1) {
+                    throw new IllegalStateException();
+                }
+                
+                final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
+                if (oldEntry != null) {
+                    freeEntry.release();
+                    freePubSubLock.release();
+                    
+                    subscribe(channelName, promise, type, lock, oldEntry, listeners);
+                    return;
+                }
+                
+                if (remainFreeAmount == 0) {
+                    freePubSubConnections.poll();
+                }
+                freePubSubLock.release();
+                
+                subscribe(channelName, promise, type, lock, freeEntry, listeners);
+                
+                if (PubSubType.PSUBSCRIBE == type) {
+                    freeEntry.psubscribe(codec, channelName);
+                } else {
+                    freeEntry.subscribe(codec, channelName);
+                }
+            }
+            
+        });
+    }
+
+    private void subscribe(final String channelName, final RPromise<PubSubConnectionEntry> promise,
+            final PubSubType type, final AsyncSemaphore lock, final PubSubConnectionEntry connEntry,
+            final RedisPubSubListener<?>... listeners) {
+        for (RedisPubSubListener<?> listener : listeners) {
+            connEntry.addListener(channelName, listener);
+        }
+        SubscribeListener listener = connEntry.getSubscribeFuture(channelName, type);
+        final Future<Void> subscribeFuture = listener.getSuccessFuture();
+        
+        subscribeFuture.addListener(new FutureListener<Void>() {
+            @Override
+            public void operationComplete(Future<Void> future) throws Exception {
+                if (!promise.trySuccess(connEntry)) {
+                    for (RedisPubSubListener<?> listener : listeners) {
+                        connEntry.removeListener(channelName, listener);
+                    }
+                    if (!connEntry.hasListeners(channelName)) {
+                        unsubscribe(channelName, lock);
+                    } else {
+                        lock.release();
+                    }
+                } else {
+                    lock.release();
+                }
+            }
+        });
+
+        connectionManager.newTimeout(new TimerTask() {
+            @Override
+            public void run(Timeout timeout) throws Exception {
+                if (promise.tryFailure(new RedisTimeoutException())) {
+                    subscribeFuture.cancel(false);
+                }
+            }
+        }, config.getRetryInterval(), TimeUnit.MILLISECONDS);
+    }
+
+    private void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) {
+        MasterSlaveEntry entry = connectionManager.getEntry(slot);
+        if (entry == null) {
+            log.error("Node for slot: " + slot + " can't be found");
+        } else {
+            entry.returnPubSubConnection(pubSubEntry);
+        }
+    }
+    
+    private RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
+        MasterSlaveEntry entry = connectionManager.getEntry(slot);
+        if (entry == null) {
+            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet");
+            return RedissonPromise.newFailedFuture(ex);
+        }
+        return entry.nextPubSubConnection();
+    }
+    
+    private void connect(final Codec codec, final String channelName,
+            final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
+        final int slot = connectionManager.calcSlot(channelName);
+        RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
+        connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
+
+            @Override
+            public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
+                if (!future.isSuccess()) {
+                    freePubSubLock.release();
+                    lock.release();
+                    promise.tryFailure(future.cause());
+                    return;
+                }
+
+                RedisPubSubConnection conn = future.getNow();
+                
+                final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
+                entry.tryAcquire();
+                
+                final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
+                if (oldEntry != null) {
+                    releaseSubscribeConnection(slot, entry);
+                    
+                    freePubSubLock.release();
+
+                    subscribe(channelName, promise, type, lock, oldEntry, listeners);
+                    return;
+                }
+                
+                freePubSubConnections.add(entry);
+                freePubSubLock.release();
+                
+                subscribe(channelName, promise, type, lock, entry, listeners);
+                
+                if (PubSubType.PSUBSCRIBE == type) {
+                    entry.psubscribe(codec, channelName);
+                } else {
+                    entry.subscribe(codec, channelName);
+                }
+                
+            }
+        });
+    }
+
+    public void unsubscribe(final String channelName, final AsyncSemaphore lock) {
+        final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
+        if (entry == null) {
+            lock.release();
+            return;
+        }
+        
+        entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
+            
+            @Override
+            public boolean onStatus(PubSubType type, String channel) {
+                if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
+                    
+                    if (entry.release() == 1) {
+                        freePubSubConnections.add(entry);
+                    }
+                    
+                    lock.release();
+                    return true;
+                }
+                return false;
+            }
+            
+        });
+    }
+    
+    public RFuture<Codec> unsubscribe(final String channelName, final PubSubType topicType) {
+        final RPromise<Codec> result = new RedissonPromise<Codec>();
+        final AsyncSemaphore lock = getSemaphore(channelName);
+        lock.acquire(new Runnable() {
+            @Override
+            public void run() {
+                final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
+                if (entry == null) {
+                    lock.release();
+                    result.trySuccess(null);
+                    return;
+                }
+
+                freePubSubLock.acquire(new Runnable() {
+                    @Override
+                    public void run() {
+                        freePubSubConnections.remove(entry);
+                        freePubSubLock.release();
+                        
+                        final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
+                        RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {
+
+                            @Override
+                            public boolean onStatus(PubSubType type, String channel) {
+                                if (type == topicType && channel.equals(channelName)) {
+                                    lock.release();
+                                    result.trySuccess(entryCodec);
+                                    return true;
+                                }
+                                return false;
+                            }
+
+                        };
+
+                        if (topicType == PubSubType.PUNSUBSCRIBE) {
+                            entry.punsubscribe(channelName, listener);
+                        } else {
+                            entry.unsubscribe(channelName, listener);
+                        }
+                    }
+                });
+            }
+        });
+        
+        return result;
+    }
+    
+    public void punsubscribe(final String channelName, final AsyncSemaphore lock) {
+        final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
+        if (entry == null) {
+            lock.release();
+            return;
+        }
+        
+        entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
+            
+            @Override
+            public boolean onStatus(PubSubType type, String channel) {
+                if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
+                    
+                    if (entry.release() == 1) {
+                        freePubSubConnections.add(entry);
+                    }
+                    
+                    lock.release();
+                    return true;
+                }
+                return false;
+            }
+            
+        });
+    }
+    
+    public void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
+        for (String channelName : redisPubSubConnection.getChannels().keySet()) {
+            PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName);
+            Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
+            reattachPubSubListeners(channelName, listeners, PubSubType.UNSUBSCRIBE);
+        }
+
+        for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
+            PubSubConnectionEntry pubSubEntry = getPubSubEntry(channelName);
+            Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
+            reattachPubSubListeners(channelName, listeners, PubSubType.PUNSUBSCRIBE);
+        }
+    }
+
+    private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners, final PubSubType topicType) {
+        RFuture<Codec> subscribeCodec = unsubscribe(channelName, topicType);
+        if (listeners.isEmpty()) {
+            return;
+        }
+        
+        subscribeCodec.addListener(new FutureListener<Codec>() {
+            @Override
+            public void operationComplete(Future<Codec> future) throws Exception {
+                if (future.get() == null) {
+                    return;
+                }
+                
+                Codec subscribeCodec = future.get();
+                if (topicType == PubSubType.PUNSUBSCRIBE) {
+                    psubscribe(channelName, listeners, subscribeCodec);
+                } else {
+                    subscribe(channelName, listeners, subscribeCodec);
+                }
+            }
+
+        });
+    }
+
+    private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
+            final Codec subscribeCodec) {
+        RFuture<PubSubConnectionEntry> subscribeFuture = subscribe(subscribeCodec, channelName, listeners.toArray(new RedisPubSubListener[listeners.size()]));
+        subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
+            
+            @Override
+            public void operationComplete(Future<PubSubConnectionEntry> future)
+                    throws Exception {
+                if (!future.isSuccess()) {
+                    subscribe(channelName, listeners, subscribeCodec);
+                    return;
+                }
+                
+                log.info("listeners of '{}' channel to '{}' have been resubscribed", channelName, future.getNow().getConnection().getRedisClient());
+            }
+        });
+    }
+
+    private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
+            final Codec subscribeCodec) {
+        RFuture<PubSubConnectionEntry> subscribeFuture = psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()]));
+        subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
+            @Override
+            public void operationComplete(Future<PubSubConnectionEntry> future)
+                    throws Exception {
+                if (!future.isSuccess()) {
+                    psubscribe(channelName, listeners, subscribeCodec);
+                    return;
+                }
+                
+                log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient());
+            }
+        });
+    }
+    
+    
+}
diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java
index bb3cb16a2..ba2d8a117 100644
--- a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java
+++ b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java
@@ -35,6 +35,7 @@ import org.redisson.connection.PubSubConnectionEntry;
 import org.redisson.misc.RPromise;
 import org.redisson.misc.RedissonPromise;
 import org.redisson.pubsub.AsyncSemaphore;
+import org.redisson.pubsub.PublishSubscribeService;
 
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -48,6 +49,7 @@ import io.netty.util.concurrent.FutureListener;
  */
 public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M> {
 
+    final PublishSubscribeService subscribeService;
     final CommandReactiveExecutor commandExecutor;
     private final String name;
     private final Codec codec;
@@ -60,6 +62,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
         this.commandExecutor = commandExecutor;
         this.name = name;
         this.codec = codec;
+        this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService();
     }
 
     @Override
@@ -88,7 +91,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
     }
 
     private void addListener(final RedisPubSubListener<M> pubSubListener, final RPromise<Integer> promise) {
-        RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
+        RFuture<PubSubConnectionEntry> future = subscribeService.psubscribe(name, codec, pubSubListener);
         future.addListener(new FutureListener<PubSubConnectionEntry>() {
             @Override
             public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
@@ -112,10 +115,10 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
 
     @Override
     public void removeListener(int listenerId) {
-        AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
+        AsyncSemaphore semaphore = subscribeService.getSemaphore(name);
         acquire(semaphore);
 
-        PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
+        PubSubConnectionEntry entry = subscribeService.getPubSubEntry(name);
         if (entry == null) {
             semaphore.release();
             return;
@@ -123,7 +126,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
         
         entry.removeListener(name, listenerId);
         if (!entry.hasListeners(name)) {
-            commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
+            subscribeService.punsubscribe(name, semaphore);
         } else {
             semaphore.release();
         }
diff --git a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java
index 940d30b47..3a1bf0519 100644
--- a/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java
+++ b/redisson/src/test/java/org/redisson/connection/balancer/WeightedRoundRobinBalancerTest.java
@@ -11,12 +11,13 @@ import org.redisson.RedisRunner;
 import org.redisson.RedisRunner.RedisProcess;
 import org.redisson.Redisson;
 import org.redisson.api.RedissonClient;
+import org.redisson.client.WriteRedisConnectionException;
 import org.redisson.config.Config;
 import org.redisson.config.ReadMode;
 
 public class WeightedRoundRobinBalancerTest {
 
-    @Test
+    @Test(expected = WriteRedisConnectionException.class)
     public void testUseMasterForReadsIfNoConnectionsToSlaves() throws IOException, InterruptedException {
         RedisProcess master = null;
         RedisProcess slave = null;