Fixed - pubsub channel isn't released if subscription timeout occurred. #4064

pull/4087/head
Nikita Koksharov 3 years ago
parent 36bafe4935
commit 528ca6afdd

@ -26,12 +26,10 @@ import org.redisson.cache.LRUCacheMap;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.RedisRedirectException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
@ -46,7 +44,8 @@ import java.io.IOException;
import java.security.MessageDigest;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@ -83,46 +82,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public void syncSubscription(CompletableFuture<?> future) {
MasterSlaveServersConfig config = connectionManager.getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
try {
future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CancellationException e) {
// skip
} catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
} catch (TimeoutException e) {
future.completeExceptionally(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
try {
future.join();
} catch (CompletionException e) {
throw (RuntimeException) e.getCause();
}
get(future);
}
@Override
public void syncSubscriptionInterrupted(CompletableFuture<?> future) throws InterruptedException {
MasterSlaveServersConfig config = connectionManager.getConfig();
int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts();
try {
future.get(timeout, TimeUnit.MILLISECONDS);
} catch (CancellationException e) {
// skip
} catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
} catch (TimeoutException e) {
future.completeExceptionally(new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
}
try {
future.join();
} catch (CompletionException e) {
throw (RuntimeException) e.getCause();
}
getInterrupted(future);
}
@Override

@ -45,11 +45,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
semaphore.acquire(() -> {
if (entry.release() == 0) {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (!removed) {
throw new IllegalStateException();
}
entries.remove(entryName);
service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName))
.whenComplete((r, e) -> {
semaphore.release();
@ -97,7 +93,15 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
}
RedisPubSubListener<Object> listener = createListener(channelName, value);
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
CompletableFuture<PubSubConnectionEntry> s = service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
s.whenComplete((r, e) -> {
if (e != null) {
value.getPromise().completeExceptionally(e);
return;
}
value.getPromise().complete(value);
});
});
return newPromise;
@ -118,20 +122,6 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
PublishSubscribe.this.onMessage(value, (Long) message);
}
@Override
public boolean onStatus(PubSubType type, CharSequence channel) {
if (!channelName.equals(channel.toString())) {
return false;
}
if (type == PubSubType.SUBSCRIBE) {
value.getPromise().complete(value);
return true;
}
return false;
}
};
return listener;
}

@ -236,11 +236,14 @@ public class PublishSubscribeService {
freePubSubLock.release();
CompletableFuture<RedisPubSubConnection> connectFuture = connect(codec, channelName, msEntry, promise, type, lock, listeners);
connectionManager.newTimeout(t -> {
if (attempts.get() == config.getRetryAttempts()) {
connectFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire connection for subscription after " + attempts.get() + " attempts. " +
"Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."));
return;
}
connectionManager.newTimeout(t -> {
if (connectFuture.cancel(true)) {
subscribe(codec, channelName, entry, promise, type, lock, attempts, listeners);
attempts.incrementAndGet();
@ -269,27 +272,12 @@ public class PublishSubscribeService {
}
freePubSubLock.release();
CompletableFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);
ChannelFuture future;
addListeners(channelName, promise, type, lock, freeEntry, listeners);
if (PubSubType.PSUBSCRIBE == type) {
future = freeEntry.psubscribe(codec, channelName);
freeEntry.psubscribe(codec, channelName);
} else {
future = freeEntry.subscribe(codec, channelName);
freeEntry.subscribe(codec, channelName);
}
future.addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
}
connectionManager.newTimeout(timeout ->
subscribeFuture.cancel(false),
config.getTimeout(), TimeUnit.MILLISECONDS);
});
});
}
@ -298,7 +286,7 @@ public class PublishSubscribeService {
return connectionManager.getEntry(slot);
}
private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
private void addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
@ -307,7 +295,13 @@ public class PublishSubscribeService {
SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
CompletableFuture<Void> subscribeFuture = list.getSuccessFuture();
return subscribeFuture.whenComplete((res, e) -> {
subscribeFuture.whenComplete((res, e) -> {
if (e != null) {
promise.completeExceptionally(e);
lock.release();
return;
}
if (!promise.complete(connEntry)) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.removeListener(channelName, listener);
@ -324,6 +318,14 @@ public class PublishSubscribeService {
lock.release();
}
});
connectionManager.newTimeout(timeout -> {
if (subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Subscription timeout after " + config.getTimeout() + "ms. " +
"Check network and/or increase 'timeout' parameter."))) {
unsubscribe(channelName, type);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}
private CompletableFuture<RedisPubSubConnection> nextPubSubConnection(MasterSlaveEntry entry, ChannelName channelName) {

Loading…
Cancel
Save