diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 95d891aed..59515c435 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -103,11 +103,11 @@ public class PubSubConnectionEntry { } } - public void removeListener(String channelName, RedisPubSubListener listener) { + private void removeListener(String channelName, RedisPubSubListener listener) { Queue queue = channelListeners.get(channelName); synchronized (queue) { - if (queue.remove(listener)) { - channelListeners.remove(channelName, new ConcurrentLinkedQueue()); + if (queue.remove(listener) && queue.isEmpty()) { + channelListeners.remove(channelName); } } conn.removeListener(listener); @@ -139,34 +139,36 @@ public class PubSubConnectionEntry { @Override public boolean onStatus(PubSubType type, String ch) { if (type == PubSubType.UNSUBSCRIBE && channel.equals(ch)) { - Queue listeners = channelListeners.get(channel); - if (listeners != null) { - for (RedisPubSubListener listener : listeners) { - removeListener(channel, listener); - } - } - subscribedChannelsAmount.release(); + removeListeners(channel); return true; } return false; } + }); conn.addOneShotListener(listener); conn.unsubscribe(channel); } + private void removeListeners(String channel) { + Queue queue = channelListeners.get(channel); + if (queue != null) { + synchronized (queue) { + channelListeners.remove(channel); + } + for (RedisPubSubListener listener : queue) { + conn.removeListener(listener); + } + } + subscribedChannelsAmount.release(); + } + public void punsubscribe(final String channel, RedisPubSubListener listener) { conn.addOneShotListener(new BaseRedisPubSubListener() { @Override public boolean onStatus(PubSubType type, String ch) { if (type == PubSubType.PUNSUBSCRIBE && channel.equals(ch)) { - Queue listeners = channelListeners.get(channel); - if (listeners != null) { - for (RedisPubSubListener listener : listeners) { - removeListener(channel, listener); - } - } - subscribedChannelsAmount.release(); + removeListeners(channel); return true; } return false;