|
|
|
@ -382,12 +382,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
AtomicReference<Throwable> lastException = new AtomicReference<>(ex);
|
|
|
|
|
if (ex != null) {
|
|
|
|
|
checkClusterState(cfg, Collections.emptyIterator(), lastException);
|
|
|
|
|
checkClusterState(cfg, Collections.emptyIterator(), lastException, nodes);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Iterator<RedisURI> nodesIterator = nodes.iterator();
|
|
|
|
|
checkClusterState(cfg, nodesIterator, lastException);
|
|
|
|
|
checkClusterState(cfg, nodesIterator, lastException, nodes);
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
AtomicReference<Throwable> lastException = new AtomicReference<>();
|
|
|
|
@ -411,15 +411,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
|
|
|
|
|
Iterator<RedisURI> nodesIterator = nodes.iterator();
|
|
|
|
|
|
|
|
|
|
checkClusterState(cfg, nodesIterator, lastException);
|
|
|
|
|
checkClusterState(cfg, nodesIterator, lastException, nodes);
|
|
|
|
|
}
|
|
|
|
|
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkClusterState(ClusterServersConfig cfg, Iterator<RedisURI> iterator, AtomicReference<Throwable> lastException) {
|
|
|
|
|
private void checkClusterState(ClusterServersConfig cfg, Iterator<RedisURI> iterator, AtomicReference<Throwable> lastException, List<RedisURI> allNodes) {
|
|
|
|
|
if (!iterator.hasNext()) {
|
|
|
|
|
if (lastException.get() != null) {
|
|
|
|
|
log.error("Can't update cluster state", lastException.get());
|
|
|
|
|
log.error("Can't update cluster state using nodes: {}", allNodes, lastException.getAndSet(null));
|
|
|
|
|
}
|
|
|
|
|
scheduleClusterChangeCheck(cfg);
|
|
|
|
|
return;
|
|
|
|
@ -431,29 +431,33 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, uri, configEndpointHostName);
|
|
|
|
|
connectionFuture.whenComplete((connection, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
lastException.set(e);
|
|
|
|
|
checkClusterState(cfg, iterator, lastException);
|
|
|
|
|
if (!lastException.compareAndSet(null, e)) {
|
|
|
|
|
lastException.get().addSuppressed(e);
|
|
|
|
|
}
|
|
|
|
|
checkClusterState(cfg, iterator, lastException, allNodes);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
updateClusterState(cfg, connection, iterator, uri, lastException);
|
|
|
|
|
updateClusterState(cfg, connection, iterator, uri, lastException, allNodes);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void updateClusterState(ClusterServersConfig cfg, RedisConnection connection,
|
|
|
|
|
Iterator<RedisURI> iterator, RedisURI uri, AtomicReference<Throwable> lastException) {
|
|
|
|
|
Iterator<RedisURI> iterator, RedisURI uri, AtomicReference<Throwable> lastException, List<RedisURI> allNodes) {
|
|
|
|
|
RFuture<List<ClusterNodeInfo>> future = connection.async(clusterNodesCommand);
|
|
|
|
|
future.whenComplete((nodes, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
log.error("Unable to execute {}", clusterNodesCommand, e);
|
|
|
|
|
lastException.set(e);
|
|
|
|
|
checkClusterState(cfg, iterator, lastException);
|
|
|
|
|
if (!lastException.compareAndSet(null, e)) {
|
|
|
|
|
lastException.get().addSuppressed(e);
|
|
|
|
|
}
|
|
|
|
|
checkClusterState(cfg, iterator, lastException, allNodes);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (nodes.isEmpty()) {
|
|
|
|
|
log.debug("cluster nodes state got from {}: doesn't contain any nodes", connection.getRedisClient().getAddr());
|
|
|
|
|
checkClusterState(cfg, iterator, lastException);
|
|
|
|
|
checkClusterState(cfg, iterator, lastException, allNodes);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -476,8 +480,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
|
|
|
|
|
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
|
|
|
|
|
}
|
|
|
|
|
log.error("Unable to parse cluster nodes state got from: {}:\n{}", connection.getRedisClient().getAddr(), nodesValue, ex);
|
|
|
|
|
lastException.set(ex);
|
|
|
|
|
checkClusterState(cfg, iterator, lastException);
|
|
|
|
|
|
|
|
|
|
if (!lastException.compareAndSet(null, ex)) {
|
|
|
|
|
lastException.get().addSuppressed(ex);
|
|
|
|
|
}
|
|
|
|
|
checkClusterState(cfg, iterator, lastException, allNodes);
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.thenCompose(newPartitions -> checkMasterNodesChange(cfg, newPartitions))
|
|
|
|
|