RTopic.removeAllListeners method added. #752

pull/766/head
Nikita 8 years ago
parent 18f37017b5
commit 01fc8f5f57

@ -83,6 +83,24 @@ public class RedissonTopic<M> implements RTopic<M> {
return System.identityHashCode(pubSubListener);
}
public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.removeAllListeners(name);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
} else {
semaphore.release();
}
}
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);

@ -71,4 +71,9 @@ public interface RTopic<M> extends RTopicAsync<M> {
*/
void removeListener(int listenerId);
/**
* Removes all listeners from this topic
*/
void removeAllListeners();
}

@ -88,6 +88,14 @@ public class PubSubConnectionEntry {
conn.addListener(listener);
}
public boolean removeAllListeners(String channelName) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);
for (RedisPubSubListener<?> listener : listeners) {
removeListener(channelName, listener);
}
return !listeners.isEmpty();
}
// TODO optimize
public boolean removeListener(String channelName, int listenerId) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);

@ -318,6 +318,22 @@ public class RedissonTopicTest {
redisson.shutdown();
}
@Test
public void testRemoveAllListeners() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic<Message> topic1 = redisson.getTopic("topic1");
for (int i = 0; i < 10; i++) {
topic1.addListener((channel, msg) -> {
Assert.fail();
});
}
topic1 = redisson.getTopic("topic1");
topic1.removeAllListeners();
topic1.publish(new Message("123"));
redisson.shutdown();
}
@Test
public void testLazyUnsubscribe() throws InterruptedException {

Loading…
Cancel
Save