From 0b97fb52253c69403ca9ca8bc0e9890b228eb549 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 8 Sep 2021 20:09:23 +0300 Subject: [PATCH] Fixed - master-host of Slave node isn't resolved in Sentinel mode. #3819 --- .../connection/SentinelConnectionManager.java | 199 ++++++++++-------- 1 file changed, 110 insertions(+), 89 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index fe614bffc..cc5e61853 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -40,9 +40,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -141,8 +139,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String flags = map.getOrDefault("flags", ""); String masterLinkStatus = map.getOrDefault("master-link-status", ""); - RedisURI uri = toURI(host, port); - uri = resolveIP(uri).syncUninterruptibly().getNow(); + RedisURI uri = resolveIP(host, port).syncUninterruptibly().getNow(); this.config.addSlaveAddress(uri.toString()); log.debug("slave {} state: {}", uri, map); @@ -164,8 +161,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = map.get("ip"); String port = map.get("port"); - RedisURI uri = toURI(ip, port); - uri = resolveIP(uri).syncUninterruptibly().getNow(); + RedisURI uri = resolveIP(ip, port).syncUninterruptibly().getNow(); RFuture future = registerSentinel(uri, this.config, null); connectionFutures.add(future); } @@ -376,90 +372,21 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } }; - - RFuture masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); - masterFuture.thenCompose(u -> resolveIP(scheme, u)) - .whenComplete((newMaster, e) -> { - if (e != null) { - return; - } - RedisURI current = currentMaster.get(); - if (!newMaster.equals(current) - && currentMaster.compareAndSet(current, newMaster)) { - RFuture changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster); - changeFuture.onComplete((res, ex) -> { - if (ex != null) { - currentMaster.compareAndSet(newMaster, current); - } - }); - } - }); + RFuture masterFuture = checkMasterChange(cfg, connection); masterFuture.onComplete(commonListener); - + if (!config.checkSkipSlavesInit()) { - RFuture>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); commands.incrementAndGet(); - slavesFuture.onComplete((slavesMap, ex) -> { - if (ex != null) { - return; - } - - Set currentSlaves = new HashSet<>(slavesMap.size()); - AsyncCountDownLatch latch = new AsyncCountDownLatch(); - for (Map map : slavesMap) { - if (map.isEmpty()) { - latch.countDown(); - continue; - } - - String host = map.get("ip"); - String port = map.get("port"); - String flags = map.getOrDefault("flags", ""); - String masterLinkStatus = map.getOrDefault("master-link-status", ""); - String masterHost = map.get("master-host"); - String masterPort = map.get("master-port"); - - RedisURI addr = toURI(host, port); - resolveIP(addr).onComplete((slaveAddr, exc) -> { - if (exc != null) { - log.error("Unable to add slave " + addr, exc); - latch.countDown(); - return; - } - - if (isSlaveDown(flags, masterLinkStatus)) { - slaveDown(slaveAddr); - latch.countDown(); - return; - } - if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) { - latch.countDown(); - return; - } - - currentSlaves.add(slaveAddr); - addSlave(slaveAddr).onComplete((r, e2) -> { - latch.countDown(); - if (e2 != null) { - log.error("Unable to add slave " + slaveAddr, e2); - } - }); - }); - } - - latch.latch(() -> { - MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - entry.getAllEntries().stream() - .map(e -> e.getClient().getAddr()) - .map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort()))) - .filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get())) - .forEach(a -> slaveDown(a)); - }, slavesMap.size()); - }); + RFuture>> slavesFuture = checkSlavesChange(cfg, connection); slavesFuture.onComplete(commonListener); } - + + RFuture>> sentinelsFuture = checkSentinelsChange(cfg, connection); + sentinelsFuture.onComplete(commonListener); + } + + private RFuture>> checkSentinelsChange(SentinelServersConfig cfg, RedisConnection connection) { RFuture>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName()); sentinelsFuture.onComplete((list, e) -> { if (e != null || list.isEmpty()) { @@ -498,7 +425,96 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { updateSentinels(uris); }, newUris.size()); }); - sentinelsFuture.onComplete(commonListener); + return sentinelsFuture; + } + + private RFuture>> checkSlavesChange(SentinelServersConfig cfg, RedisConnection connection) { + RFuture>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); + slavesFuture.onComplete((slavesMap, ex) -> { + if (ex != null) { + return; + } + + Set currentSlaves = new HashSet<>(slavesMap.size()); + AsyncCountDownLatch latch = new AsyncCountDownLatch(); + for (Map map : slavesMap) { + if (map.isEmpty()) { + latch.countDown(); + continue; + } + + String host = map.get("ip"); + String port = map.get("port"); + String flags = map.getOrDefault("flags", ""); + String masterLinkStatus = map.getOrDefault("master-link-status", ""); + String masterHost = map.get("master-host"); + String masterPort = map.get("master-port"); + + RFuture slaveAddrFuture = resolveIP(host, port); + RFuture masterAddrFuture = resolveIP(masterHost, masterPort); + CompletableFuture resolvedFuture = CompletableFuture.allOf(masterAddrFuture.toCompletableFuture(), + slaveAddrFuture.toCompletableFuture()); + resolvedFuture.whenComplete((res, exc) -> { + if (exc != null) { + log.error("Unable to resolve addresses " + host + " and/or " + masterHost, exc); + latch.countDown(); + return; + } + + RedisURI slaveAddr = slaveAddrFuture.getNow(); + RedisURI masterAddr = masterAddrFuture.getNow(); + if (isSlaveDown(flags, masterLinkStatus)) { + slaveDown(slaveAddr); + latch.countDown(); + return; + } + if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterAddr)) { + latch.countDown(); + return; + } + + currentSlaves.add(slaveAddr); + addSlave(slaveAddr).onComplete((r, e2) -> { + latch.countDown(); + if (e2 != null) { + log.error("Unable to add slave " + slaveAddr, e2); + } + }); + }); + } + + latch.latch(() -> { + MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); + entry.getAllEntries().stream() + .map(e -> e.getClient().getAddr()) + .map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort()))) + .filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get())) + .forEach(a -> slaveDown(a)); + }, slavesMap.size()); + }); + return slavesFuture; + } + + private RFuture checkMasterChange(SentinelServersConfig cfg, RedisConnection connection) { + RFuture masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); + masterFuture.thenCompose(u -> resolveIP(scheme, u)) + .whenComplete((newMaster, e) -> { + if (e != null) { + return; + } + + RedisURI current = currentMaster.get(); + if (!newMaster.equals(current) + && currentMaster.compareAndSet(current, newMaster)) { + RFuture changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster); + changeFuture.onComplete((res, ex) -> { + if (ex != null) { + currentMaster.compareAndSet(newMaster, current); + } + }); + } + }); + return masterFuture; } private void updateSentinels(Collection newUris) { @@ -590,6 +606,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return result; } + private RFuture resolveIP(String host, String port) { + RedisURI uri = toURI(host, port); + return resolveIP(uri); + } + private RedisURI toURI(InetSocketAddress addr) { return toURI(addr.getAddress().getHostAddress(), "" + addr.getPort()); } @@ -640,10 +661,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return baseStatus; } - private boolean isUseSameMaster(RedisURI slaveAddr, String slaveMasterHost, String slaveMasterPort) { + private boolean isUseSameMaster(RedisURI slaveAddr, RedisURI slaveMasterAddr) { RedisURI master = currentMaster.get(); - RedisURI slaveMaster = toURI(slaveMasterHost, slaveMasterPort); - if (!master.equals(slaveMaster)) { + if (!master.equals(slaveMasterAddr)) { + log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMasterAddr, master); return false; } return true;