feat: add auto-detection of unavailable master in replicated mode

Signed-off-by: Nicola Dardanis <nicdard@gmail.com>
pull/4853/head
Nicola Dardanis 2 years ago committed by Nicola Dardanis
parent 8a3cdd21fe
commit 48920c232b

@ -38,6 +38,12 @@ public class ReplicatedServersConfig extends BaseMasterSlaveServersConfig<Replic
*/
private int scanInterval = 1000;
/**
* Master node unreachable maximum time before failure in milliseconds
* A value of 0 indicates that the client will never fail
*/
private int masterUnreachableTimeout = 0;
/**
* Database index used for Redis connection
*/
@ -54,6 +60,7 @@ public class ReplicatedServersConfig extends BaseMasterSlaveServersConfig<Replic
setScanInterval(config.getScanInterval());
setDatabase(config.getDatabase());
setMonitorIPChanges(config.isMonitorIPChanges());
setMasterUnreachableTimeout(config.getMasterUnreachableTimeout());
}
/**
@ -122,4 +129,23 @@ public class ReplicatedServersConfig extends BaseMasterSlaveServersConfig<Replic
public boolean isMonitorIPChanges() {
return monitorIPChanges;
}
/**
* Master unreachable timeout in milliseconds. After the configured amount of time
* the client will throw {@link org.redisson.client.RedisConnectionException}.
* <p>
* Default is <code>0</code>, indicating that the client will never throw,
* but only log the error.
*
* @param masterUnreachableTimeout
* @return config
*/
public ReplicatedServersConfig setMasterUnreachableTimeout(int masterUnreachableTimeout) {
this.masterUnreachableTimeout = masterUnreachableTimeout;
return this;
}
public int getMasterUnreachableTimeout() {
return masterUnreachableTimeout;
}
}

@ -16,6 +16,8 @@
package org.redisson.connection;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.api.NodeType;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
@ -58,6 +60,9 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
private ScheduledFuture<?> monitorFuture;
private static final long MASTER_AVAILABLE_TIMESTAMP = -1;
private final AtomicLong lastTimestampMasterAvailable = new AtomicLong(MASTER_AVAILABLE_TIMESTAMP);
private enum Role {
master,
slave
@ -138,10 +143,30 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
scheduleMasterChangeCheck(cfg);
}, cfg.getNodeAddresses().size());
for (String address : cfg.getNodeAddresses()) {
List<CompletableFuture<Role>> roles = cfg.getNodeAddresses().stream()
.map(address -> {
RedisURI uri = new RedisURI(address);
checkNode(latch, uri, cfg, slaveIPs);
return checkNode(latch, uri, cfg, slaveIPs);
})
.collect(Collectors.toList());
CompletableFuture[] completableFutures = new CompletableFuture[roles.size()];
CompletableFuture.allOf(roles.toArray(completableFutures));
long currentTimeMillis = System.currentTimeMillis();
if (roles.stream().noneMatch(role -> Role.master.equals(role.getNow(Role.slave)))) {
log.error("No master available among the configured addresses, "
+ "please check your configuration.");
if (lastTimestampMasterAvailable.get() == MASTER_AVAILABLE_TIMESTAMP) {
lastTimestampMasterAvailable.set(currentTimeMillis);
} else if (cfg.getMasterUnreachableTimeout() > 0
&& currentTimeMillis - lastTimestampMasterAvailable.get() > cfg.getMasterUnreachableTimeout()) {
currentMaster.set(null);
stopThreads();
throw new RedisConnectionException("Can't connect to servers!");
}
} else {
lastTimestampMasterAvailable.set(MASTER_AVAILABLE_TIMESTAMP);
}
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
@ -163,15 +188,16 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}
}
private void checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set<InetSocketAddress> slaveIPs) {
private CompletableFuture<Role> checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set<InetSocketAddress> slaveIPs) {
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, uri, uri.getHost());
connectionFuture
return connectionFuture
.thenCompose(c -> {
if (cfg.isMonitorIPChanges()) {
return resolveIP(uri);
}
return CompletableFuture.completedFuture(uri);
})
.thenCompose(c -> resolveIP(uri))
.thenCompose(ip -> {
if (isShuttingDown()) {
return CompletableFuture.completedFuture(null);
@ -192,33 +218,33 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}
RedisConnection connection = connectionFuture.toCompletableFuture().join();
InetSocketAddress addr = connection.getRedisClient().getAddr();
Role role = Role.valueOf(r.get(ROLE_KEY));
final Role role = Role.valueOf(r.get(ROLE_KEY));
if (Role.master.equals(role)) {
InetSocketAddress master = currentMaster.get();
if (master.equals(addr)) {
log.debug("Current master {} unchanged", master);
return CompletableFuture.completedFuture(null);
} else if (currentMaster.compareAndSet(master, addr)) {
CompletableFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), uri);
return changeFuture.exceptionally(e -> {
return changeFuture.handle((ignored, e) -> {
log.error("Unable to change master to {}", addr, e);
currentMaster.compareAndSet(addr, master);
return null;
return role;
});
}
} else if (!config.checkSkipSlavesInit()) {
CompletableFuture<Void> f = slaveUp(addr, uri);
slaveIPs.add(addr);
return f.thenApply(re -> null);
return f.thenApply(re -> role);
}
return CompletableFuture.completedFuture(null);
return CompletableFuture.completedFuture(role);
})
.whenComplete((r, ex) -> {
if (ex != null) {
log.error(ex.getMessage(), ex);
}
latch.countDown();
});
})
.toCompletableFuture();
}
private CompletableFuture<Void> slaveUp(InetSocketAddress address, RedisURI uri) {

Loading…
Cancel
Save