diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index 8e340919b..9457a2cf1 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -113,33 +113,14 @@ public class RedissonPatternTopic implements RPatternTopic { } } - + @Override public RFuture removeListenerAsync(int listenerId) { - RPromise result = new RedissonPromise<>(); - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); - semaphore.acquire(() -> { - PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); - if (entry == null) { - semaphore.release(); - result.trySuccess(null); - return; - } - - entry.removeListener(channelName, listenerId); - if (!entry.hasListeners(channelName)) { - subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore) - .onComplete(new TransferListener<>(result)); - } else { - semaphore.release(); - result.trySuccess(null); - } - }); - return result; + return subscribeService.removeListenerAsync(PubSubType.PUNSUBSCRIBE, channelName, listenerId); } @Override public void removeListener(int listenerId) { - removeListenerAsync(listenerId).syncUninterruptibly(); + commandExecutor.syncSubscription(removeListenerAsync(listenerId)); } @Override @@ -162,22 +143,8 @@ public class RedissonPatternTopic implements RPatternTopic { @Override public void removeListener(PatternMessageListener listener) { - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); - acquire(semaphore); - - PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); - if (entry == null) { - semaphore.release(); - return; - } - - entry.removeListener(channelName, listener); - if (!entry.hasListeners(channelName)) { - subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore).syncUninterruptibly(); - } else { - semaphore.release(); - } - + RFuture future = subscribeService.removeListenerAsync(PubSubType.PUNSUBSCRIBE, channelName, listener); + commandExecutor.syncSubscription(future); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 61f6c3645..7b3ac8237 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -163,93 +163,22 @@ public class RedissonTopic implements RTopic { @Override public void removeListener(MessageListener listener) { RFuture future = removeListenerAsync(listener); - MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig(); - int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); - if (!future.awaitUninterruptibly(timeout)) { - throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic"); - } + commandExecutor.syncSubscription(future); } @Override public RFuture removeListenerAsync(MessageListener listener) { - return removeListenerAsync(channelName, listener); - } - - protected RFuture removeListenerAsync(ChannelName channelName, MessageListener listener) { - RPromise promise = new RedissonPromise(); - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); - semaphore.acquire(new Runnable() { - @Override - public void run() { - PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); - if (entry == null) { - semaphore.release(); - promise.trySuccess(null); - return; - } - - entry.removeListener(channelName, listener); - if (!entry.hasListeners(channelName)) { - subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore) - .onComplete(new TransferListener(promise)); - } else { - semaphore.release(); - promise.trySuccess(null); - } - - } - }); - return promise; + return subscribeService.removeListenerAsync(PubSubType.UNSUBSCRIBE, channelName, listener); } @Override public RFuture removeListenerAsync(Integer... listenerIds) { - RPromise promise = new RedissonPromise(); - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); - semaphore.acquire(new Runnable() { - @Override - public void run() { - PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); - if (entry == null) { - semaphore.release(); - promise.trySuccess(null); - return; - } - - for (int id : listenerIds) { - entry.removeListener(channelName, id); - } - if (!entry.hasListeners(channelName)) { - subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore) - .onComplete(new TransferListener(promise)); - } else { - semaphore.release(); - promise.trySuccess(null); - } - } - }); - return promise; + return subscribeService.removeListenerAsync(PubSubType.UNSUBSCRIBE, channelName, listenerIds); } @Override public void removeListener(Integer... listenerIds) { - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); - acquire(semaphore); - - PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); - if (entry == null) { - semaphore.release(); - return; - } - - for (int id : listenerIds) { - entry.removeListener(channelName, id); - } - if (!entry.hasListeners(channelName)) { - subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore).syncUninterruptibly(); - } else { - semaphore.release(); - } + commandExecutor.syncSubscription(removeListenerAsync(listenerIds)); } @Override diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index a924b9f6f..03d988211 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -16,6 +16,7 @@ package org.redisson.pubsub; import java.util.Collection; +import java.util.EventListener; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.api.RFuture; +import org.redisson.api.listener.MessageListener; import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.ChannelName; import org.redisson.client.RedisNodeNotFoundException; @@ -39,6 +41,7 @@ import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; +import org.redisson.misc.TransferListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -575,6 +578,54 @@ public class PublishSubscribeService { }); } + public RFuture removeListenerAsync(PubSubType type, ChannelName channelName, EventListener listener) { + RPromise promise = new RedissonPromise<>(); + AsyncSemaphore semaphore = getSemaphore(channelName); + semaphore.acquire(() -> { + PubSubConnectionEntry entry = getPubSubEntry(channelName); + if (entry == null) { + semaphore.release(); + promise.trySuccess(null); + return; + } + + entry.removeListener(channelName, listener); + if (!entry.hasListeners(channelName)) { + unsubscribe(type, channelName, semaphore) + .onComplete(new TransferListener<>(promise)); + } else { + semaphore.release(); + promise.trySuccess(null); + } + }); + return promise; + } + + public RFuture removeListenerAsync(PubSubType type, ChannelName channelName, Integer... listenerIds) { + RPromise promise = new RedissonPromise<>(); + AsyncSemaphore semaphore = getSemaphore(channelName); + semaphore.acquire(() -> { + PubSubConnectionEntry entry = getPubSubEntry(channelName); + if (entry == null) { + semaphore.release(); + promise.trySuccess(null); + return; + } + + for (int id : listenerIds) { + entry.removeListener(channelName, id); + } + if (!entry.hasListeners(channelName)) { + unsubscribe(type, channelName, semaphore) + .onComplete(new TransferListener<>(promise)); + } else { + semaphore.release(); + promise.trySuccess(null); + } + }); + return promise; + } + @Override public String toString() { return "PublishSubscribeService [name2PubSubConnection=" + name2PubSubConnection + ", entry2PubSubConnection=" + entry2PubSubConnection + "]";