From f72e6a3cf2159a996ba655abcfbfb01da9f5072a Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 16 Dec 2022 10:32:02 +0300 Subject: [PATCH] Fixed - new Redis node isn't discovered between PubSub subscription attempts #4745 --- .../pubsub/PublishSubscribeService.java | 72 ++++++++++--------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index c76b6b743..177bab2d3 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -269,6 +269,37 @@ public class PublishSubscribeService { }); } + private void trySubscribe(Codec codec, ChannelName channelName, + CompletableFuture promise, PubSubType type, + AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener... listeners) { + if (attempts.get() == config.getRetryAttempts()) { + lock.release(); + MasterSlaveEntry entry = getEntry(channelName); + if (entry == null) { + RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); + promise.completeExceptionally(ex); + return; + } + + promise.completeExceptionally(new RedisTimeoutException( + "Unable to acquire connection for subscription after " + attempts.get() + " attempts. " + + "Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); + return; + } + + attempts.incrementAndGet(); + + MasterSlaveEntry entry = getEntry(channelName); + if (entry == null) { + connectionManager.newTimeout(tt -> { + trySubscribe(codec, channelName, promise, type, lock, attempts, listeners); + }, config.getRetryInterval(), TimeUnit.MILLISECONDS); + return; + } + + subscribeNoTimeout(codec, channelName, entry, promise, type, lock, attempts, listeners); + } + private void subscribeNoTimeout(Codec codec, ChannelName channelName, MasterSlaveEntry entry, CompletableFuture promise, PubSubType type, AsyncSemaphore lock, AtomicInteger attempts, RedisPubSubListener... listeners) { @@ -285,26 +316,19 @@ public class PublishSubscribeService { return; } - MasterSlaveEntry msEntry = Optional.ofNullable(connectionManager.getEntry(entry.getClient())).orElse(entry); - PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(msEntry, new PubSubEntry()); + PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(entry, new PubSubEntry()); PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek(); if (freeEntry == null) { freePubSubLock.release(); - CompletableFuture connectFuture = connect(codec, channelName, msEntry, promise, type, lock, listeners); + CompletableFuture connectFuture = connect(codec, channelName, entry, promise, type, lock, listeners); connectionManager.newTimeout(t -> { - if (attempts.get() == config.getRetryAttempts()) { - connectFuture.completeExceptionally(new RedisTimeoutException( - "Unable to acquire connection for subscription after " + attempts.get() + " attempts. " + - "Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.")); + if (!connectFuture.cancel(false)) { return; } - if (connectFuture.cancel(true)) { - subscribe(codec, channelName, entry, promise, type, lock, attempts, listeners); - attempts.incrementAndGet(); - } + trySubscribe(codec, channelName, promise, type, lock, attempts, listeners); }, config.getRetryInterval(), TimeUnit.MILLISECONDS); return; } @@ -314,7 +338,7 @@ public class PublishSubscribeService { throw new IllegalStateException(); } - PubSubKey key = new PubSubKey(channelName, msEntry); + PubSubKey key = new PubSubKey(channelName, entry); PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, freeEntry); if (oldEntry != null) { freeEntry.release(); @@ -379,38 +403,18 @@ public class PublishSubscribeService { return subscribeFuture; } - private CompletableFuture nextPubSubConnection(MasterSlaveEntry entry, ChannelName channelName) { - if (entry == null) { - int slot = connectionManager.calcSlot(channelName.getName()); - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for slot: " + slot + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); - CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally(ex); - return result; - } - return entry.nextPubSubConnection(); - } - private CompletableFuture connect(Codec codec, ChannelName channelName, MasterSlaveEntry msEntry, CompletableFuture promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener... listeners) { - CompletableFuture connFuture = nextPubSubConnection(msEntry, channelName); + CompletableFuture connFuture = msEntry.nextPubSubConnection(); promise.whenComplete((res, e) -> { if (e != null) { connFuture.completeExceptionally(e); } }); - connFuture.whenComplete((conn, ex) -> { - if (ex != null) { -// freePubSubLock.release(); - lock.release(); - if (!connFuture.isCancelled()) { - promise.completeExceptionally(ex); - } - return; - } - + connFuture.thenAccept(conn -> { freePubSubLock.acquire().thenAccept(c -> { PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager); int remainFreeAmount = entry.tryAcquire();