diff --git a/redisson/src/main/java/org/redisson/PubSubMessageListener.java b/redisson/src/main/java/org/redisson/PubSubMessageListener.java index 412be84bf..9b1b0ff61 100644 --- a/redisson/src/main/java/org/redisson/PubSubMessageListener.java +++ b/redisson/src/main/java/org/redisson/PubSubMessageListener.java @@ -64,6 +64,10 @@ public class PubSubMessageListener implements RedisPubSubListener { return false; return true; } + + public MessageListener getListener() { + return listener; + } @Override public void onMessage(String channel, Object message) { diff --git a/redisson/src/main/java/org/redisson/PubSubPatternMessageListener.java b/redisson/src/main/java/org/redisson/PubSubPatternMessageListener.java index 56997aaed..1ce1372b9 100644 --- a/redisson/src/main/java/org/redisson/PubSubPatternMessageListener.java +++ b/redisson/src/main/java/org/redisson/PubSubPatternMessageListener.java @@ -65,6 +65,10 @@ public class PubSubPatternMessageListener implements RedisPubSubListener { return true; } + public PatternMessageListener getListener() { + return listener; + } + @Override public void onMessage(String channel, V message) { } diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index 8c8c2a19f..cc965006b 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -86,7 +86,46 @@ public class RedissonPatternTopic implements RPatternTopic { semaphore.release(); } } + + @Override + public void removeAllListeners() { + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + if (entry == null) { + semaphore.release(); + return; + } + entry.removeAllListeners(name); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().punsubscribe(name, semaphore); + } else { + semaphore.release(); + } + } + + @Override + public void removeListener(PatternMessageListener listener) { + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + if (entry == null) { + semaphore.release(); + return; + } + + entry.removeListener(name, listener); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().punsubscribe(name, semaphore); + } else { + semaphore.release(); + } + + } + @Override public List getPatternNames() { return Collections.singletonList(name); diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 6faf4c193..b5c0242c0 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -83,6 +83,7 @@ public class RedissonTopic implements RTopic { return System.identityHashCode(pubSubListener); } + @Override public void removeAllListeners() { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); semaphore.acquireUninterruptibly(); @@ -101,6 +102,26 @@ public class RedissonTopic implements RTopic { } } + @Override + public void removeListener(MessageListener listener) { + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + if (entry == null) { + semaphore.release(); + return; + } + + entry.removeListener(name, listener); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().unsubscribe(name, semaphore); + } else { + semaphore.release(); + } + + } + @Override public void removeListener(int listenerId) { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); diff --git a/redisson/src/main/java/org/redisson/api/RPatternTopic.java b/redisson/src/main/java/org/redisson/api/RPatternTopic.java index eab373961..661b773b0 100644 --- a/redisson/src/main/java/org/redisson/api/RPatternTopic.java +++ b/redisson/src/main/java/org/redisson/api/RPatternTopic.java @@ -62,5 +62,18 @@ public interface RPatternTopic { * @param listenerId - id of message listener */ void removeListener(int listenerId); + + /** + * Removes the listener by its instance + * + * @param listener - listener instance + */ + void removeListener(PatternMessageListener listener); + + /** + * Removes all listeners from this topic + */ + void removeAllListeners(); + } diff --git a/redisson/src/main/java/org/redisson/api/RTopic.java b/redisson/src/main/java/org/redisson/api/RTopic.java index 868ba406f..1c722e80d 100644 --- a/redisson/src/main/java/org/redisson/api/RTopic.java +++ b/redisson/src/main/java/org/redisson/api/RTopic.java @@ -64,6 +64,13 @@ public interface RTopic extends RTopicAsync { */ int addListener(StatusListener listener); + /** + * Removes the listener by its instance + * + * @param listener - listener instance + */ + void removeListener(MessageListener listener); + /** * Removes the listener by id for listening this topic * diff --git a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 9fe9da637..f953f3ac4 100644 --- a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -17,12 +17,16 @@ package org.redisson.connection; import java.util.Collection; import java.util.Collections; +import java.util.EventListener; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import org.redisson.PubSubMessageListener; +import org.redisson.PubSubPatternMessageListener; +import org.redisson.api.listener.MessageListener; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; @@ -97,6 +101,25 @@ public class PubSubConnectionEntry { } // TODO optimize + public boolean removeListener(String channelName, EventListener msgListener) { + Queue> listeners = channelListeners.get(channelName); + for (RedisPubSubListener listener : listeners) { + if (listener instanceof PubSubMessageListener) { + if (((PubSubMessageListener)listener).getListener() == msgListener) { + removeListener(channelName, listener); + return true; + } + } + if (listener instanceof PubSubPatternMessageListener) { + if (((PubSubPatternMessageListener)listener).getListener() == msgListener) { + removeListener(channelName, listener); + return true; + } + } + } + return false; + } + public boolean removeListener(String channelName, int listenerId) { Queue> listeners = channelListeners.get(channelName); for (RedisPubSubListener listener : listeners) { diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 3d9939602..3270fa6bf 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -334,6 +334,27 @@ public class RedissonTopicTest { redisson.shutdown(); } + + @Test + public void testRemoveByInstance() throws InterruptedException { + RedissonClient redisson = BaseTest.createInstance(); + RTopic topic1 = redisson.getTopic("topic1"); + MessageListener listener = new MessageListener() { + @Override + public void onMessage(String channel, Object msg) { + Assert.fail(); + } + }; + + topic1.addListener(listener); + + topic1 = redisson.getTopic("topic1"); + topic1.removeListener(listener); + topic1.publish(new Message("123")); + + redisson.shutdown(); + } + @Test public void testLazyUnsubscribe() throws InterruptedException {