|
|
@ -304,6 +304,10 @@ public class PublishSubscribeService {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public RFuture<Void> unsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
|
|
|
|
public RFuture<Void> unsubscribe(final ChannelName channelName, final AsyncSemaphore lock) {
|
|
|
|
|
|
|
|
if (connectionManager.isShuttingDown()) {
|
|
|
|
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
|
|
|
|
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
|
|
|
|
if (entry == null) {
|
|
|
|
if (entry == null) {
|
|
|
|
lock.release();
|
|
|
|
lock.release();
|
|
|
@ -333,6 +337,10 @@ public class PublishSubscribeService {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public RFuture<Codec> unsubscribe(final ChannelName channelName, final PubSubType topicType) {
|
|
|
|
public RFuture<Codec> unsubscribe(final ChannelName channelName, final PubSubType topicType) {
|
|
|
|
|
|
|
|
if (connectionManager.isShuttingDown()) {
|
|
|
|
|
|
|
|
return RedissonPromise.newSucceededFuture(null);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final RPromise<Codec> result = new RedissonPromise<Codec>();
|
|
|
|
final RPromise<Codec> result = new RedissonPromise<Codec>();
|
|
|
|
final AsyncSemaphore lock = getSemaphore(channelName);
|
|
|
|
final AsyncSemaphore lock = getSemaphore(channelName);
|
|
|
|
lock.acquire(new Runnable() {
|
|
|
|
lock.acquire(new Runnable() {
|
|
|
|