Fixed - DNS change isn't detected in replicated mode #4437

pull/4550/merge
Nikita Koksharov 3 years ago
parent df4809d1ab
commit e6e961f127

@ -17,7 +17,6 @@ package org.redisson.connection;
import io.netty.util.concurrent.ScheduledFuture;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
@ -163,49 +162,56 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
private void checkNode(AsyncCountDownLatch latch, RedisURI uri, ReplicatedServersConfig cfg, Set<InetSocketAddress> slaveIPs) {
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, uri, uri.getHost());
connectionFuture.whenComplete((connection, exc) -> {
if (exc != null) {
log.error(exc.getMessage(), exc);
latch.countDown();
return;
}
if (isShuttingDown()) {
return;
}
connectionFuture
.thenCompose(c -> resolveIP(uri))
.thenCompose(ip -> {
if (isShuttingDown()) {
return CompletableFuture.completedFuture(null);
}
RFuture<Map<String, String>> result = connection.async(RedisCommands.INFO_REPLICATION);
result.whenComplete((r, ex) -> {
if (ex != null) {
log.error(ex.getMessage(), ex);
latch.countDown();
return;
}
RedisConnection connection = connectionFuture.toCompletableFuture().join();
if (!RedisURI.compare(connection.getRedisClient().getAddr(), ip)) {
closeNodeConnection(connection);
log.info("Hostname: " + uri + " has changed IP from: "
+ connection.getRedisClient().getAddr() + " to " + ip);
return CompletableFuture.<Map<String, String>>completedFuture(null);
}
InetSocketAddress addr = connection.getRedisClient().getAddr();
Role role = Role.valueOf(r.get(ROLE_KEY));
if (Role.master.equals(role)) {
InetSocketAddress master = currentMaster.get();
if (master.equals(addr)) {
log.debug("Current master {} unchanged", master);
} else if (currentMaster.compareAndSet(master, addr)) {
CompletableFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), uri);
changeFuture.exceptionally(e -> {
log.error("Unable to change master to " + addr, e);
currentMaster.compareAndSet(addr, master);
return null;
});
return connection.async(RedisCommands.INFO_REPLICATION);
})
.thenCompose(r -> {
if (r == null) {
return CompletableFuture.completedFuture(null);
}
RedisConnection connection = connectionFuture.toCompletableFuture().join();
InetSocketAddress addr = connection.getRedisClient().getAddr();
Role role = Role.valueOf(r.get(ROLE_KEY));
if (Role.master.equals(role)) {
InetSocketAddress master = currentMaster.get();
if (master.equals(addr)) {
log.debug("Current master {} unchanged", master);
return CompletableFuture.completedFuture(null);
} else if (currentMaster.compareAndSet(master, addr)) {
CompletableFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), uri);
return changeFuture.exceptionally(e -> {
log.error("Unable to change master to " + addr, e);
currentMaster.compareAndSet(addr, master);
return null;
});
}
} else if (!config.checkSkipSlavesInit()) {
CompletableFuture<Void> f = slaveUp(addr, uri);
slaveIPs.add(addr);
return f.thenApply(re -> null);
}
return CompletableFuture.completedFuture(null);
})
.whenComplete((r, ex) -> {
if (ex != null) {
log.error(ex.getMessage(), ex);
}
latch.countDown();
} else if (!config.checkSkipSlavesInit()) {
CompletableFuture<Void> f = slaveUp(addr, uri);
slaveIPs.add(addr);
f.whenComplete((res, e) -> {
latch.countDown();
});
}
});
});
});
}
private CompletableFuture<Void> slaveUp(InetSocketAddress address, RedisURI uri) {

Loading…
Cancel
Save