Feature - async methods for listener removal added to RTopic object

pull/1721/head
Nikita Koksharov 6 years ago
parent acf2a82f4a
commit 59b6a02463

@ -28,12 +28,12 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
@ -190,6 +190,61 @@ public class RedissonTopic implements RTopic {
}
@Override
public RFuture<Void> removeListenerAsync(MessageListener<?> listener) {
RPromise<Void> promise = new RedissonPromise<Void>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
entry.removeListener(channelName, listener);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore)
.addListener(new TransferListener<Void>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
}
});
return promise;
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> promise = new RedissonPromise<Void>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName);
if (entry == null) {
semaphore.release();
promise.trySuccess(null);
return;
}
entry.removeListener(channelName, listenerId);
if (!entry.hasListeners(channelName)) {
subscribeService.unsubscribe(channelName, semaphore)
.addListener(new TransferListener<Void>(promise));
} else {
semaphore.release();
promise.trySuccess(null);
}
}
});
return promise;
}
@Override
public void removeListener(int listenerId) {
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);

@ -55,5 +55,21 @@ public interface RTopicAsync {
* @see org.redisson.api.listener.MessageListener
*/
<M> RFuture<Integer> addListenerAsync(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerId - listener id
* @return void
*/
RFuture<Void> removeListenerAsync(int listenerId);
/**
* Removes the listener by its instance
*
* @param listener - listener instance
* @return void
*/
RFuture<Void> removeListenerAsync(MessageListener<?> listener);
}

Loading…
Cancel
Save