Fixed - master-host of Slave node isn't resolved in Sentinel mode. #3819

pull/3826/head
Nikita Koksharov 4 years ago
parent b1cda4feab
commit 0b97fb5225

@ -40,9 +40,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -141,8 +139,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String flags = map.getOrDefault("flags", "");
String masterLinkStatus = map.getOrDefault("master-link-status", "");
RedisURI uri = toURI(host, port);
uri = resolveIP(uri).syncUninterruptibly().getNow();
RedisURI uri = resolveIP(host, port).syncUninterruptibly().getNow();
this.config.addSlaveAddress(uri.toString());
log.debug("slave {} state: {}", uri, map);
@ -164,8 +161,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = map.get("ip");
String port = map.get("port");
RedisURI uri = toURI(ip, port);
uri = resolveIP(uri).syncUninterruptibly().getNow();
RedisURI uri = resolveIP(ip, port).syncUninterruptibly().getNow();
RFuture<Void> future = registerSentinel(uri, this.config, null);
connectionFutures.add(future);
}
@ -376,90 +372,21 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
};
RFuture<RedisURI> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
masterFuture.thenCompose(u -> resolveIP(scheme, u))
.whenComplete((newMaster, e) -> {
if (e != null) {
return;
}
RedisURI current = currentMaster.get();
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster);
changeFuture.onComplete((res, ex) -> {
if (ex != null) {
currentMaster.compareAndSet(newMaster, current);
}
});
}
});
RFuture<RedisURI> masterFuture = checkMasterChange(cfg, connection);
masterFuture.onComplete(commonListener);
if (!config.checkSkipSlavesInit()) {
RFuture<List<Map<String, String>>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
commands.incrementAndGet();
slavesFuture.onComplete((slavesMap, ex) -> {
if (ex != null) {
return;
}
Set<RedisURI> currentSlaves = new HashSet<>(slavesMap.size());
AsyncCountDownLatch latch = new AsyncCountDownLatch();
for (Map<String, String> map : slavesMap) {
if (map.isEmpty()) {
latch.countDown();
continue;
}
String host = map.get("ip");
String port = map.get("port");
String flags = map.getOrDefault("flags", "");
String masterLinkStatus = map.getOrDefault("master-link-status", "");
String masterHost = map.get("master-host");
String masterPort = map.get("master-port");
RedisURI addr = toURI(host, port);
resolveIP(addr).onComplete((slaveAddr, exc) -> {
if (exc != null) {
log.error("Unable to add slave " + addr, exc);
latch.countDown();
return;
}
if (isSlaveDown(flags, masterLinkStatus)) {
slaveDown(slaveAddr);
latch.countDown();
return;
}
if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) {
latch.countDown();
return;
}
currentSlaves.add(slaveAddr);
addSlave(slaveAddr).onComplete((r, e2) -> {
latch.countDown();
if (e2 != null) {
log.error("Unable to add slave " + slaveAddr, e2);
}
});
});
}
latch.latch(() -> {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
entry.getAllEntries().stream()
.map(e -> e.getClient().getAddr())
.map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort())))
.filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get()))
.forEach(a -> slaveDown(a));
}, slavesMap.size());
});
RFuture<List<Map<String, String>>> slavesFuture = checkSlavesChange(cfg, connection);
slavesFuture.onComplete(commonListener);
}
RFuture<List<Map<String, String>>> sentinelsFuture = checkSentinelsChange(cfg, connection);
sentinelsFuture.onComplete(commonListener);
}
private RFuture<List<Map<String, String>>> checkSentinelsChange(SentinelServersConfig cfg, RedisConnection connection) {
RFuture<List<Map<String, String>>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());
sentinelsFuture.onComplete((list, e) -> {
if (e != null || list.isEmpty()) {
@ -498,7 +425,96 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
updateSentinels(uris);
}, newUris.size());
});
sentinelsFuture.onComplete(commonListener);
return sentinelsFuture;
}
private RFuture<List<Map<String, String>>> checkSlavesChange(SentinelServersConfig cfg, RedisConnection connection) {
RFuture<List<Map<String, String>>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
slavesFuture.onComplete((slavesMap, ex) -> {
if (ex != null) {
return;
}
Set<RedisURI> currentSlaves = new HashSet<>(slavesMap.size());
AsyncCountDownLatch latch = new AsyncCountDownLatch();
for (Map<String, String> map : slavesMap) {
if (map.isEmpty()) {
latch.countDown();
continue;
}
String host = map.get("ip");
String port = map.get("port");
String flags = map.getOrDefault("flags", "");
String masterLinkStatus = map.getOrDefault("master-link-status", "");
String masterHost = map.get("master-host");
String masterPort = map.get("master-port");
RFuture<RedisURI> slaveAddrFuture = resolveIP(host, port);
RFuture<RedisURI> masterAddrFuture = resolveIP(masterHost, masterPort);
CompletableFuture<Void> resolvedFuture = CompletableFuture.allOf(masterAddrFuture.toCompletableFuture(),
slaveAddrFuture.toCompletableFuture());
resolvedFuture.whenComplete((res, exc) -> {
if (exc != null) {
log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc);
latch.countDown();
return;
}
RedisURI slaveAddr = slaveAddrFuture.getNow();
RedisURI masterAddr = masterAddrFuture.getNow();
if (isSlaveDown(flags, masterLinkStatus)) {
slaveDown(slaveAddr);
latch.countDown();
return;
}
if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterAddr)) {
latch.countDown();
return;
}
currentSlaves.add(slaveAddr);
addSlave(slaveAddr).onComplete((r, e2) -> {
latch.countDown();
if (e2 != null) {
log.error("Unable to add slave " + slaveAddr, e2);
}
});
});
}
latch.latch(() -> {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
entry.getAllEntries().stream()
.map(e -> e.getClient().getAddr())
.map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort())))
.filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get()))
.forEach(a -> slaveDown(a));
}, slavesMap.size());
});
return slavesFuture;
}
private RFuture<RedisURI> checkMasterChange(SentinelServersConfig cfg, RedisConnection connection) {
RFuture<RedisURI> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
masterFuture.thenCompose(u -> resolveIP(scheme, u))
.whenComplete((newMaster, e) -> {
if (e != null) {
return;
}
RedisURI current = currentMaster.get();
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster);
changeFuture.onComplete((res, ex) -> {
if (ex != null) {
currentMaster.compareAndSet(newMaster, current);
}
});
}
});
return masterFuture;
}
private void updateSentinels(Collection<RedisURI> newUris) {
@ -590,6 +606,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private RFuture<RedisURI> resolveIP(String host, String port) {
RedisURI uri = toURI(host, port);
return resolveIP(uri);
}
private RedisURI toURI(InetSocketAddress addr) {
return toURI(addr.getAddress().getHostAddress(), "" + addr.getPort());
}
@ -640,10 +661,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return baseStatus;
}
private boolean isUseSameMaster(RedisURI slaveAddr, String slaveMasterHost, String slaveMasterPort) {
private boolean isUseSameMaster(RedisURI slaveAddr, RedisURI slaveMasterAddr) {
RedisURI master = currentMaster.get();
RedisURI slaveMaster = toURI(slaveMasterHost, slaveMasterPort);
if (!master.equals(slaveMaster)) {
if (!master.equals(slaveMasterAddr)) {
log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMasterAddr, master);
return false;
}
return true;

Loading…
Cancel
Save