|
|
|
@ -17,6 +17,7 @@ package org.redisson.pubsub;
|
|
|
|
|
|
|
|
|
|
import io.netty.channel.ChannelFuture;
|
|
|
|
|
import io.netty.channel.ChannelFutureListener;
|
|
|
|
|
import io.netty.util.Timeout;
|
|
|
|
|
import org.redisson.PubSubPatternStatusListener;
|
|
|
|
|
import org.redisson.client.*;
|
|
|
|
|
import org.redisson.client.codec.Codec;
|
|
|
|
@ -221,7 +222,14 @@ public class PublishSubscribeService {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Timeout lockTimeout = connectionManager.newTimeout(timeout -> {
|
|
|
|
|
promise.completeExceptionally(new RedisTimeoutException(
|
|
|
|
|
"Unable to acquire subscribe lock after " + config.getTimeout() + "ms. " +
|
|
|
|
|
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
|
|
|
|
|
}, config.getTimeout(), TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
freePubSubLock.acquire(() -> {
|
|
|
|
|
lockTimeout.cancel();
|
|
|
|
|
if (promise.isDone()) {
|
|
|
|
|
lock.release();
|
|
|
|
|
freePubSubLock.release();
|
|
|
|
@ -352,6 +360,7 @@ public class PublishSubscribeService {
|
|
|
|
|
|
|
|
|
|
connFuture.whenComplete((conn, ex) -> {
|
|
|
|
|
if (ex != null) {
|
|
|
|
|
// freePubSubLock.release();
|
|
|
|
|
lock.release();
|
|
|
|
|
if (!connFuture.isCancelled()) {
|
|
|
|
|
promise.completeExceptionally(ex);
|
|
|
|
|