Feature - cluster nodes state added in "node ... hasn't been discovered yet" error

pull/6370/head
Nikita Koksharov 1 month ago
parent e2e50c446a
commit cb538bb5f4

@ -489,7 +489,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (ClusterNodeInfo clusterNodeInfo : nodes) {
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
log.debug("Cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
serviceManager.setLastClusterNodes(nodesValue.toString());
}
CompletableFuture<Collection<ClusterPartition>> newPartitionsFuture = parsePartitions(nodes);

@ -371,15 +371,30 @@ public final class ServiceManager {
shutdownLatch.set(true);
}
private volatile String lastClusterNodes;
public void setLastClusterNodes(String lastClusterNodes) {
this.lastClusterNodes = lastClusterNodes;
}
public <T> CompletableFuture<T> createNodeNotFoundFuture(String channelName, int slot) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " slot: " + slot
+ " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. " +
"Increase value of retryAttempts and/or retryInterval settings. Last cluster nodes topology: " + lastClusterNodes);
CompletableFuture<T> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
}
public RedisNodeNotFoundException createNodeNotFoundException(NodeSource source) {
RedisNodeNotFoundException ex;
if (cfg.isClusterConfig()
&& source.getSlot() != null
&& source.getAddr() == null
&& source.getRedisClient() == null) {
ex = new RedisNodeNotFoundException("Node for slot: " + source.getSlot() + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
ex = new RedisNodeNotFoundException("Node for slot: " + source.getSlot() + " hasn't been discovered yet. Increase value of retryAttempts and/or retryInterval settings. Last cluster nodes topology: " + lastClusterNodes);
} else {
ex = new RedisNodeNotFoundException("Node: " + source + " hasn't been discovered yet. Increase value of retryAttempts and/or retryInterval settings.");
ex = new RedisNodeNotFoundException("Node: " + source + " hasn't been discovered yet. Increase value of retryAttempts and/or retryInterval settings. Last cluster nodes topology: " + lastClusterNodes);
}
return ex;
}

@ -191,10 +191,8 @@ public class PublishSubscribeService {
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<Collection<PubSubConnectionEntry>> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
int slot = connectionManager.calcSlot(channelName.getName());
return connectionManager.getServiceManager().createNodeNotFoundFuture(channelName.toString(), slot);
}
CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.PSUBSCRIBE, codec, channelName, entry, null, listeners);
@ -383,10 +381,8 @@ public class PublishSubscribeService {
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<List<PubSubConnectionEntry>> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
int slot = connectionManager.calcSlot(channelName.getName());
return connectionManager.getServiceManager().createNodeNotFoundFuture(channelName.toString(), slot);
}
CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, listeners);
return f.thenApply(res -> Collections.singletonList(res));
@ -395,10 +391,8 @@ public class PublishSubscribeService {
public CompletableFuture<PubSubConnectionEntry> ssubscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
MasterSlaveEntry entry = getEntry(channelName);
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
promise.completeExceptionally(ex);
return promise;
int slot = connectionManager.calcSlot(channelName.getName());
return connectionManager.getServiceManager().createNodeNotFoundFuture(channelName.toString(), slot);
}
return subscribe(PubSubType.SSUBSCRIBE, codec, channelName, entry, null, listeners);
}
@ -432,10 +426,8 @@ public class PublishSubscribeService {
AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
MasterSlaveEntry entry = getEntry(new ChannelName(channelName));
if (entry == null) {
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node for name: " + channelName + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings.");
promise.completeExceptionally(ex);
return promise;
int slot = connectionManager.calcSlot(channelName);
return connectionManager.getServiceManager().createNodeNotFoundFuture(channelName, slot);
}
PubSubType type;

Loading…
Cancel
Save