Fixed - retryAttempts and retryInterval settings don't apply to INFO_REPLICATION, SENTINEL_GET_MASTER_ADDR_BY_NAME, SENTINEL_SENTINELS, SENTINEL_SLAVES and CLUSTER_NODES commands. #4884, #5973, #5696

pull/6120/head
Nikita Koksharov 5 months ago
parent be4fb3d3f3
commit 95af622db2

@ -430,7 +430,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private void checkClusterState(ClusterServersConfig cfg, Iterator<RedisURI> iterator, AtomicReference<Throwable> lastException, List<RedisURI> allNodes) {
if (!iterator.hasNext()) {
if (lastException.get() != null) {
log.error("Can't update cluster state using nodes: {}", allNodes, lastException.getAndSet(null));
log.error("Can't update cluster state using nodes: {}. A new attempt will be made.", allNodes, lastException.getAndSet(null));
}
scheduleClusterChangeCheck(cfg);
return;
@ -455,11 +455,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private void updateClusterState(ClusterServersConfig cfg, RedisConnection connection,
Iterator<RedisURI> iterator, RedisURI uri, AtomicReference<Throwable> lastException, List<RedisURI> allNodes) {
RFuture<List<ClusterNodeInfo>> future = connection.async(cfg.getRetryAttempts(), cfg.getRetryInterval(), cfg.getTimeout(),
StringCodec.INSTANCE, clusterNodesCommand);
RFuture<List<ClusterNodeInfo>> future = connection.async(StringCodec.INSTANCE, clusterNodesCommand);
future.whenComplete((nodes, e) -> {
if (e != null) {
log.error("Unable to execute {}", clusterNodesCommand, e);
if (!lastException.compareAndSet(null, e)) {
lastException.get().addSuppressed(e);
}

@ -237,7 +237,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
})
.whenComplete((r, ex) -> {
if (ex != null) {
log.error(ex.getMessage(), ex);
log.error("Unable to update node {} status. A new attempt will be made.", uri, ex);
}
})
.toCompletableFuture();

@ -206,7 +206,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
super.doConnect(uri2hostname::get);
scheduleChangeCheck(cfg, null);
scheduleChangeCheck(cfg, null, null);
}
private static boolean isHostname(String host) {
@ -310,28 +310,30 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator<RedisClient> iterator) {
private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator<RedisClient> iterator, AtomicReference<Throwable> lastException) {
AtomicReference<Throwable> exceptionReference = Optional.ofNullable(lastException)
.orElseGet(() -> new AtomicReference<>());
monitorFuture = serviceManager.newTimeout(t -> {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<RedisClient> iter = iterator;
if (iter == null) {
// Shuffle the list so all clients don't prefer the same sentinel
List<RedisClient> clients = new ArrayList<>(sentinels.values());
Collections.shuffle(clients);
iter = clients.iterator();
}
checkState(cfg, iter, lastException);
Iterator<RedisClient> iter = Optional.ofNullable(iterator)
.orElseGet(() -> {
// Shuffle the list so all clients don't prefer the same sentinel
List<RedisClient> clients = new ArrayList<>(sentinels.values());
Collections.shuffle(clients);
return clients.iterator();
});
checkState(cfg, iter, exceptionReference);
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
private void checkState(SentinelServersConfig cfg, Iterator<RedisClient> iterator, AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
if (lastException.get() != null) {
log.error("Can't update cluster state", lastException.get());
log.error("Can't update cluster state. A new attempt will be made.", lastException.getAndSet(null));
}
disconnectedSentinels.clear();
CompletableFuture<Void> f = performSentinelDNSCheck();
f.whenComplete((r, e) -> scheduleChangeCheck(cfg, null));
f.whenComplete((r, e) -> scheduleChangeCheck(cfg, null, null));
return;
}
if (serviceManager.isShuttingDown()) {
@ -347,17 +349,20 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
CompletionStage<RedisConnection> connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, hostname);
connectionFuture.whenComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
if (!lastException.compareAndSet(null, e)) {
lastException.get().addSuppressed(e);
}
checkState(cfg, iterator, lastException);
return;
}
updateState(cfg, connection, iterator);
updateState(cfg, connection, iterator, lastException);
});
}
private void updateState(SentinelServersConfig cfg, RedisConnection connection, Iterator<RedisClient> iterator) {
private void updateState(SentinelServersConfig cfg, RedisConnection connection, Iterator<RedisClient> iterator,
AtomicReference<Throwable> lastException) {
List<CompletableFuture<?>> futures = new ArrayList<>();
CompletionStage<RedisClient> masterFuture = checkMasterChange(cfg, connection);
futures.add(masterFuture.toCompletableFuture());
@ -373,14 +378,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future.whenComplete((r, e) -> {
if (e != null) {
log.error("Can't execute SENTINEL commands on {}", connection.getRedisClient().getAddr(), e);
if (!lastException.compareAndSet(null, e)) {
lastException.get().addSuppressed(e);
}
scheduleChangeCheck(cfg, iterator, lastException);
return;
}
if (e != null) {
scheduleChangeCheck(cfg, iterator);
} else {
scheduleChangeCheck(cfg, null);
}
scheduleChangeCheck(cfg, null, null);
});
}

Loading…
Cancel
Save