diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 9f80bbc2e..e8d661155 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -54,7 +54,6 @@ import org.redisson.config.TransportMode; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; -import org.redisson.misc.TransferListener; import org.redisson.misc.URIBuilder; import org.redisson.pubsub.AsyncSemaphore; import org.slf4j.Logger; @@ -432,33 +431,35 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } @Override - public RFuture psubscribe(final String channelName, final Codec codec, final RedisPubSubListener... listeners) { - final AsyncSemaphore lock = getSemaphore(channelName); - final RPromise result = new RedissonPromise(); - lock.acquire(new Runnable() { - @Override - public void run() { - RFuture future = psubscribe(channelName, codec, lock, listeners); - future.addListener(new TransferListener(result)); - } - }); - return result; + public RFuture psubscribe(String channelName, Codec codec, RedisPubSubListener... listeners) { + return subscribe(PubSubType.PSUBSCRIBE, codec, channelName, listeners); } + @Override public RFuture psubscribe(String channelName, Codec codec, AsyncSemaphore semaphore, RedisPubSubListener... listeners) { RPromise promise = new RedissonPromise(); subscribe(codec, channelName, promise, PubSubType.PSUBSCRIBE, semaphore, listeners); return promise; } - public RFuture subscribe(final Codec codec, final String channelName, final RedisPubSubListener... listeners) { + @Override + public RFuture subscribe(Codec codec, String channelName, RedisPubSubListener... listeners) { + return subscribe(PubSubType.SUBSCRIBE, codec, channelName, listeners); + } + + protected RFuture subscribe(PubSubType type, final Codec codec, final String channelName, + final RedisPubSubListener... listeners) { final AsyncSemaphore lock = getSemaphore(channelName); final RPromise result = new RedissonPromise(); lock.acquire(new Runnable() { @Override public void run() { - RFuture future = subscribe(codec, channelName, lock, listeners); - future.addListener(new TransferListener(result)); + if (result.isDone()) { + lock.release(); + return; + } + + subscribe(codec, channelName, result, type, lock, listeners); } }); return result; @@ -487,6 +488,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void run() { if (promise.isDone()) { + lock.release(); + freePubSubLock.release(); return; } @@ -536,8 +539,18 @@ public class MasterSlaveConnectionManager implements ConnectionManager { connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - lock.release(); - promise.trySuccess(connEntry); + if (!promise.trySuccess(connEntry)) { + for (RedisPubSubListener listener : listeners) { + connEntry.removeListener(channelName, listener); + } + if (!connEntry.hasListeners(channelName)) { + unsubscribe(channelName, lock); + } else { + lock.release(); + } + } else { + lock.release(); + } } }); }