diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index e8d661155..c0f9222ae 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -447,7 +447,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners); } - protected RFuture subscribe(PubSubType type, final Codec codec, final String channelName, + private RFuture subscribe(final PubSubType type, final Codec codec, final String channelName, final RedisPubSubListener... listeners) { final AsyncSemaphore lock = getSemaphore(channelName); final RPromise result = new RedissonPromise(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index bf37aebe1..ab3db3cbf 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -268,9 +268,12 @@ public class MasterSlaveEntry { @Override public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - log.debug("resubscribed listeners of '{}' channel to {}", channelName, future.getNow().getConnection().getRedisClient()); + if (!future.isSuccess()) { + subscribe(channelName, listeners, subscribeCodec); + return; } + + log.debug("resubscribed listeners of '{}' channel to '{}'", channelName, future.getNow().getConnection().getRedisClient()); } }); } @@ -292,7 +295,7 @@ public class MasterSlaveEntry { private void psubscribe(final String channelName, final Collection> listeners, final Codec subscribeCodec) { - RFuture subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, null); + RFuture subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, listeners.toArray(new RedisPubSubListener[listeners.size()])); subscribeFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) @@ -302,11 +305,7 @@ public class MasterSlaveEntry { return; } - PubSubConnectionEntry newEntry = future.getNow(); - for (RedisPubSubListener redisPubSubListener : listeners) { - newEntry.addListener(channelName, redisPubSubListener); - } - log.debug("resubscribed listeners for '{}' channel-pattern", channelName); + log.debug("resubscribed listeners for '{}' channel-pattern to '{}'", channelName, future.getNow().getConnection().getRedisClient()); } }); }