From 48920c232bb986c270653d3882e93d1aaea8ebe9 Mon Sep 17 00:00:00 2001 From: Nicola Dardanis Date: Wed, 8 Feb 2023 08:51:35 +0100 Subject: [PATCH] feat: add auto-detection of unavailable master in replicated mode Signed-off-by: Nicola Dardanis --- .../config/ReplicatedServersConfig.java | 26 ++++++++++ .../ReplicatedConnectionManager.java | 50 ++++++++++++++----- 2 files changed, 64 insertions(+), 12 deletions(-) diff --git a/redisson/src/main/java/org/redisson/config/ReplicatedServersConfig.java b/redisson/src/main/java/org/redisson/config/ReplicatedServersConfig.java index 4c098161a..0d38837c4 100644 --- a/redisson/src/main/java/org/redisson/config/ReplicatedServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/ReplicatedServersConfig.java @@ -38,6 +38,12 @@ public class ReplicatedServersConfig extends BaseMasterSlaveServersConfig + * Default is 0, indicating that the client will never throw, + * but only log the error. + * + * @param masterUnreachableTimeout + * @return config + */ + public ReplicatedServersConfig setMasterUnreachableTimeout(int masterUnreachableTimeout) { + this.masterUnreachableTimeout = masterUnreachableTimeout; + return this; + } + + public int getMasterUnreachableTimeout() { + return masterUnreachableTimeout; + } } diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index e4538a3e3..abde41b25 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -16,6 +16,8 @@ package org.redisson.connection; import io.netty.util.concurrent.ScheduledFuture; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.redisson.api.NodeType; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; @@ -58,6 +60,9 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { private ScheduledFuture monitorFuture; + private static final long MASTER_AVAILABLE_TIMESTAMP = -1; + private final AtomicLong lastTimestampMasterAvailable = new AtomicLong(MASTER_AVAILABLE_TIMESTAMP); + private enum Role { master, slave @@ -138,10 +143,30 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { scheduleMasterChangeCheck(cfg); }, cfg.getNodeAddresses().size()); - for (String address : cfg.getNodeAddresses()) { - RedisURI uri = new RedisURI(address); - checkNode(latch, uri, cfg, slaveIPs); + List> roles = cfg.getNodeAddresses().stream() + .map(address -> { + RedisURI uri = new RedisURI(address); + return checkNode(latch, uri, cfg, slaveIPs); + }) + .collect(Collectors.toList()); + CompletableFuture[] completableFutures = new CompletableFuture[roles.size()]; + CompletableFuture.allOf(roles.toArray(completableFutures)); + long currentTimeMillis = System.currentTimeMillis(); + if (roles.stream().noneMatch(role -> Role.master.equals(role.getNow(Role.slave)))) { + log.error("No master available among the configured addresses, " + + "please check your configuration."); + if (lastTimestampMasterAvailable.get() == MASTER_AVAILABLE_TIMESTAMP) { + lastTimestampMasterAvailable.set(currentTimeMillis); + } else if (cfg.getMasterUnreachableTimeout() > 0 + && currentTimeMillis - lastTimestampMasterAvailable.get() > cfg.getMasterUnreachableTimeout()) { + currentMaster.set(null); + stopThreads(); + throw new RedisConnectionException("Can't connect to servers!"); + } + } else { + lastTimestampMasterAvailable.set(MASTER_AVAILABLE_TIMESTAMP); } + }, cfg.getScanInterval(), TimeUnit.MILLISECONDS); } @@ -163,15 +188,16 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { } } - private void checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set slaveIPs) { + private CompletableFuture checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set slaveIPs) { CompletionStage connectionFuture = connectToNode(cfg, uri, uri.getHost()); - connectionFuture + return connectionFuture .thenCompose(c -> { if (cfg.isMonitorIPChanges()) { return resolveIP(uri); } return CompletableFuture.completedFuture(uri); }) + .thenCompose(c -> resolveIP(uri)) .thenCompose(ip -> { if (isShuttingDown()) { return CompletableFuture.completedFuture(null); @@ -192,33 +218,33 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { } RedisConnection connection = connectionFuture.toCompletableFuture().join(); InetSocketAddress addr = connection.getRedisClient().getAddr(); - Role role = Role.valueOf(r.get(ROLE_KEY)); + final Role role = Role.valueOf(r.get(ROLE_KEY)); if (Role.master.equals(role)) { InetSocketAddress master = currentMaster.get(); if (master.equals(addr)) { log.debug("Current master {} unchanged", master); - return CompletableFuture.completedFuture(null); } else if (currentMaster.compareAndSet(master, addr)) { CompletableFuture changeFuture = changeMaster(singleSlotRange.getStartSlot(), uri); - return changeFuture.exceptionally(e -> { + return changeFuture.handle((ignored, e) -> { log.error("Unable to change master to {}", addr, e); currentMaster.compareAndSet(addr, master); - return null; + return role; }); } } else if (!config.checkSkipSlavesInit()) { CompletableFuture f = slaveUp(addr, uri); slaveIPs.add(addr); - return f.thenApply(re -> null); + return f.thenApply(re -> role); } - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(role); }) .whenComplete((r, ex) -> { if (ex != null) { log.error(ex.getMessage(), ex); } latch.countDown(); - }); + }) + .toCompletableFuture(); } private CompletableFuture slaveUp(InetSocketAddress address, RedisURI uri) {