From 59b6a02463f160ca0290b53e0dd4e1a08e237354 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 5 Nov 2018 14:03:24 +0300 Subject: [PATCH] Feature - async methods for listener removal added to RTopic object --- .../main/java/org/redisson/RedissonTopic.java | 57 ++++++++++++++++++- .../java/org/redisson/api/RTopicAsync.java | 16 ++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 407484100..6bdd9e5ab 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -28,12 +28,12 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonPromise; +import org.redisson.misc.TransferListener; import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PublishSubscribeService; @@ -190,6 +190,61 @@ public class RedissonTopic implements RTopic { } + @Override + public RFuture removeListenerAsync(MessageListener listener) { + RPromise promise = new RedissonPromise(); + AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); + semaphore.acquire(new Runnable() { + @Override + public void run() { + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); + if (entry == null) { + semaphore.release(); + promise.trySuccess(null); + return; + } + + entry.removeListener(channelName, listener); + if (!entry.hasListeners(channelName)) { + subscribeService.unsubscribe(channelName, semaphore) + .addListener(new TransferListener(promise)); + } else { + semaphore.release(); + promise.trySuccess(null); + } + + } + }); + return promise; + } + + @Override + public RFuture removeListenerAsync(int listenerId) { + RPromise promise = new RedissonPromise(); + AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); + semaphore.acquire(new Runnable() { + @Override + public void run() { + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); + if (entry == null) { + semaphore.release(); + promise.trySuccess(null); + return; + } + + entry.removeListener(channelName, listenerId); + if (!entry.hasListeners(channelName)) { + subscribeService.unsubscribe(channelName, semaphore) + .addListener(new TransferListener(promise)); + } else { + semaphore.release(); + promise.trySuccess(null); + } + } + }); + return promise; + } + @Override public void removeListener(int listenerId) { AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); diff --git a/redisson/src/main/java/org/redisson/api/RTopicAsync.java b/redisson/src/main/java/org/redisson/api/RTopicAsync.java index 1aaef85bf..380d8851c 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicAsync.java +++ b/redisson/src/main/java/org/redisson/api/RTopicAsync.java @@ -55,5 +55,21 @@ public interface RTopicAsync { * @see org.redisson.api.listener.MessageListener */ RFuture addListenerAsync(Class type, MessageListener listener); + + /** + * Removes the listener by id for listening this topic + * + * @param listenerId - listener id + * @return void + */ + RFuture removeListenerAsync(int listenerId); + /** + * Removes the listener by its instance + * + * @param listener - listener instance + * @return void + */ + RFuture removeListenerAsync(MessageListener listener); + }