From cb538bb5f4a23063e986f79bb8bbe179ac76788f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov <nkoksharov@redisson.pro> Date: Mon, 30 Dec 2024 14:36:53 +0300 Subject: [PATCH] Feature - cluster nodes state added in "node ... hasn't been discovered yet" error --- .../connection/ClusterConnectionManager.java | 4 +++- .../redisson/connection/ServiceManager.java | 19 +++++++++++++-- .../pubsub/PublishSubscribeService.java | 24 +++++++------------ 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ClusterConnectionManager.java index da19da001..a229a13f4 100644 --- a/redisson/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ClusterConnectionManager.java @@ -489,7 +489,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (ClusterNodeInfo clusterNodeInfo : nodes) { nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n"); } - log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); + + log.debug("Cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); + serviceManager.setLastClusterNodes(nodesValue.toString()); } CompletableFuture<Collection<ClusterPartition>> newPartitionsFuture = parsePartitions(nodes); diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 37ea7a804..ce3508d48 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -371,15 +371,30 @@ public final class ServiceManager { shutdownLatch.set(true); } + private volatile String lastClusterNodes; + + public void setLastClusterNodes(String lastClusterNodes) { + this.lastClusterNodes = lastClusterNodes; + } + + public <T> CompletableFuture<T> createNodeNotFoundFuture(String channelName, int slot) { + RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " slot: " + slot + + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. " + + "Increase value of retryAttempts and/or retryInterval settings. Last cluster nodes topology: " + lastClusterNodes); + CompletableFuture<T> promise = new CompletableFuture<>(); + promise.completeExceptionally(ex); + return promise; + } + public RedisNodeNotFoundException createNodeNotFoundException(NodeSource source) { RedisNodeNotFoundException ex; if (cfg.isClusterConfig() && source.getSlot() != null && source.getAddr() == null && source.getRedisClient() == null) { - ex = new RedisNodeNotFoundException("Node for slot: " + source.getSlot() + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); + ex = new RedisNodeNotFoundException("Node for slot: " + source.getSlot() + " hasn't been discovered yet. Increase value of retryAttempts and/or retryInterval settings. Last cluster nodes topology: " + lastClusterNodes); } else { - ex = new RedisNodeNotFoundException("Node: " + source + " hasn't been discovered yet. Increase value of retryAttempts and/or retryInterval settings."); + ex = new RedisNodeNotFoundException("Node: " + source + " hasn't been discovered yet. Increase value of retryAttempts and/or retryInterval settings. Last cluster nodes topology: " + lastClusterNodes); } return ex; } diff --git a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java index 74d679100..c4f40b623 100644 --- a/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java +++ b/redisson/src/main/java/org/redisson/pubsub/PublishSubscribeService.java @@ -191,10 +191,8 @@ public class PublishSubscribeService { MasterSlaveEntry entry = getEntry(channelName); if (entry == null) { - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); - CompletableFuture<Collection<PubSubConnectionEntry>> promise = new CompletableFuture<>(); - promise.completeExceptionally(ex); - return promise; + int slot = connectionManager.calcSlot(channelName.getName()); + return connectionManager.getServiceManager().createNodeNotFoundFuture(channelName.toString(), slot); } CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, null, listeners); @@ -383,10 +381,8 @@ public class PublishSubscribeService { MasterSlaveEntry entry = getEntry(channelName); if (entry == null) { - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); - CompletableFuture<List<PubSubConnectionEntry>> promise = new CompletableFuture<>(); - promise.completeExceptionally(ex); - return promise; + int slot = connectionManager.calcSlot(channelName.getName()); + return connectionManager.getServiceManager().createNodeNotFoundFuture(channelName.toString(), slot); } CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, listeners); return f.thenApply(res -> Collections.singletonList(res)); @@ -395,10 +391,8 @@ public class PublishSubscribeService { public CompletableFuture<PubSubConnectionEntry> ssubscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) { MasterSlaveEntry entry = getEntry(channelName); if (entry == null) { - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); - CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>(); - promise.completeExceptionally(ex); - return promise; + int slot = connectionManager.calcSlot(channelName.getName()); + return connectionManager.getServiceManager().createNodeNotFoundFuture(channelName.toString(), slot); } return subscribe(PubSubType.SSUBSCRIBE, codec, channelName, entry, null, listeners); } @@ -432,10 +426,8 @@ public class PublishSubscribeService { AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) { MasterSlaveEntry entry = getEntry(new ChannelName(channelName)); if (entry == null) { - CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>(); - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings."); - promise.completeExceptionally(ex); - return promise; + int slot = connectionManager.calcSlot(channelName); + return connectionManager.getServiceManager().createNodeNotFoundFuture(channelName, slot); } PubSubType type;