RTopic.removeListener by instance added. #752

pull/777/head
Nikita 8 years ago
parent 61eda2b8dc
commit 9be8f5a4b0

@ -64,6 +64,10 @@ public class PubSubMessageListener<V> implements RedisPubSubListener<Object> {
return false;
return true;
}
public MessageListener<V> getListener() {
return listener;
}
@Override
public void onMessage(String channel, Object message) {

@ -65,6 +65,10 @@ public class PubSubPatternMessageListener<V> implements RedisPubSubListener<V> {
return true;
}
public PatternMessageListener<V> getListener() {
return listener;
}
@Override
public void onMessage(String channel, V message) {
}

@ -86,7 +86,46 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
semaphore.release();
}
}
@Override
public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.removeAllListeners(name);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
} else {
semaphore.release();
}
}
@Override
public void removeListener(PatternMessageListener<M> listener) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.removeListener(name, listener);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().punsubscribe(name, semaphore);
} else {
semaphore.release();
}
}
@Override
public List<String> getPatternNames() {
return Collections.singletonList(name);

@ -83,6 +83,7 @@ public class RedissonTopic<M> implements RTopic<M> {
return System.identityHashCode(pubSubListener);
}
@Override
public void removeAllListeners() {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
@ -101,6 +102,26 @@ public class RedissonTopic<M> implements RTopic<M> {
}
}
@Override
public void removeListener(MessageListener<?> listener) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
semaphore.acquireUninterruptibly();
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
semaphore.release();
return;
}
entry.removeListener(name, listener);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
} else {
semaphore.release();
}
}
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);

@ -62,5 +62,18 @@ public interface RPatternTopic<M> {
* @param listenerId - id of message listener
*/
void removeListener(int listenerId);
/**
* Removes the listener by its instance
*
* @param listener - listener instance
*/
void removeListener(PatternMessageListener<M> listener);
/**
* Removes all listeners from this topic
*/
void removeAllListeners();
}

@ -64,6 +64,13 @@ public interface RTopic<M> extends RTopicAsync<M> {
*/
int addListener(StatusListener listener);
/**
* Removes the listener by its instance
*
* @param listener - listener instance
*/
void removeListener(MessageListener<?> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*

@ -17,12 +17,16 @@ package org.redisson.connection;
import java.util.Collection;
import java.util.Collections;
import java.util.EventListener;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.PubSubMessageListener;
import org.redisson.PubSubPatternMessageListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
@ -97,6 +101,25 @@ public class PubSubConnectionEntry {
}
// TODO optimize
public boolean removeListener(String channelName, EventListener msgListener) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);
for (RedisPubSubListener<?> listener : listeners) {
if (listener instanceof PubSubMessageListener) {
if (((PubSubMessageListener)listener).getListener() == msgListener) {
removeListener(channelName, listener);
return true;
}
}
if (listener instanceof PubSubPatternMessageListener) {
if (((PubSubPatternMessageListener)listener).getListener() == msgListener) {
removeListener(channelName, listener);
return true;
}
}
}
return false;
}
public boolean removeListener(String channelName, int listenerId) {
Queue<RedisPubSubListener<?>> listeners = channelListeners.get(channelName);
for (RedisPubSubListener<?> listener : listeners) {

@ -334,6 +334,27 @@ public class RedissonTopicTest {
redisson.shutdown();
}
@Test
public void testRemoveByInstance() throws InterruptedException {
RedissonClient redisson = BaseTest.createInstance();
RTopic<Message> topic1 = redisson.getTopic("topic1");
MessageListener listener = new MessageListener() {
@Override
public void onMessage(String channel, Object msg) {
Assert.fail();
}
};
topic1.addListener(listener);
topic1 = redisson.getTopic("topic1");
topic1.removeListener(listener);
topic1.publish(new Message("123"));
redisson.shutdown();
}
@Test
public void testLazyUnsubscribe() throws InterruptedException {

Loading…
Cancel
Save