diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index bce6b1297..6faf4c193 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -83,6 +83,24 @@ public class RedissonTopic implements RTopic { return System.identityHashCode(pubSubListener); } + public void removeAllListeners() { + AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); + semaphore.acquireUninterruptibly(); + + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + if (entry == null) { + semaphore.release(); + return; + } + + entry.removeAllListeners(name); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().unsubscribe(name, semaphore); + } else { + semaphore.release(); + } + } + @Override public void removeListener(int listenerId) { AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name); diff --git a/redisson/src/main/java/org/redisson/api/RTopic.java b/redisson/src/main/java/org/redisson/api/RTopic.java index 5cc2e2cec..868ba406f 100644 --- a/redisson/src/main/java/org/redisson/api/RTopic.java +++ b/redisson/src/main/java/org/redisson/api/RTopic.java @@ -71,4 +71,9 @@ public interface RTopic extends RTopicAsync { */ void removeListener(int listenerId); + /** + * Removes all listeners from this topic + */ + void removeAllListeners(); + } diff --git a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index 88ad18529..9fe9da637 100644 --- a/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/redisson/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -88,6 +88,14 @@ public class PubSubConnectionEntry { conn.addListener(listener); } + public boolean removeAllListeners(String channelName) { + Queue> listeners = channelListeners.get(channelName); + for (RedisPubSubListener listener : listeners) { + removeListener(channelName, listener); + } + return !listeners.isEmpty(); + } + // TODO optimize public boolean removeListener(String channelName, int listenerId) { Queue> listeners = channelListeners.get(channelName); diff --git a/redisson/src/test/java/org/redisson/RedissonTopicTest.java b/redisson/src/test/java/org/redisson/RedissonTopicTest.java index 6417786f6..3d9939602 100644 --- a/redisson/src/test/java/org/redisson/RedissonTopicTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTopicTest.java @@ -318,6 +318,22 @@ public class RedissonTopicTest { redisson.shutdown(); } + @Test + public void testRemoveAllListeners() throws InterruptedException { + RedissonClient redisson = BaseTest.createInstance(); + RTopic topic1 = redisson.getTopic("topic1"); + for (int i = 0; i < 10; i++) { + topic1.addListener((channel, msg) -> { + Assert.fail(); + }); + } + + topic1 = redisson.getTopic("topic1"); + topic1.removeAllListeners(); + topic1.publish(new Message("123")); + + redisson.shutdown(); + } @Test public void testLazyUnsubscribe() throws InterruptedException {