diff --git a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java index 18f123805..bfa0c2253 100644 --- a/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java +++ b/redisson/src/main/java/org/redisson/client/handler/RedisChannelInitializer.java @@ -179,7 +179,7 @@ public class RedisChannelInitializer extends ChannelInitializer { SslContext sslContext = sslContextBuilder.build(); String hostname = config.getSslHostname(); if (hostname == null || NetUtil.createByteArrayFromIpAddressString(hostname) != null) { - hostname = config.getAddress().getHost(); + hostname = redisClient.getAddr().getHostName(); } SSLEngine sslEngine = sslContext.newEngine(ch.alloc(), hostname, config.getAddress().getPort()); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 184a13181..12e22922b 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -552,6 +552,11 @@ public class MasterSlaveEntry { return slaveBalancer.unfreezeAsync(address, freezeReason); } + public CompletableFuture slaveUpNoMasterExclusionAsync(InetSocketAddress address, FreezeReason freezeReason) { + noPubSubSlaves.set(false); + return slaveBalancer.unfreezeAsync(address, freezeReason); + } + public CompletableFuture slaveUpAsync(InetSocketAddress address, FreezeReason freezeReason) { noPubSubSlaves.set(false); CompletableFuture f = slaveBalancer.unfreezeAsync(address, freezeReason); diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 0282ef3da..6a7225ac7 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -111,9 +111,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!"); } - RedisURI masterHost = serviceManager.resolveIP(scheme, master).join(); - this.config.setMasterAddress(masterHost.toString()); - currentMaster.set(masterHost); + this.config.setMasterAddress(master.toString()); + InetSocketAddress masterHost = serviceManager.resolve(master).join(); + currentMaster.set(toURI(masterHost)); log.info("master: {} added", masterHost); List> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); @@ -127,7 +127,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String flags = map.getOrDefault("flags", ""); String masterLinkStatus = map.getOrDefault("master-link-status", ""); - RedisURI uri = resolveIP(host, port).join(); + RedisURI uri = serviceManager.toURI(scheme, host, port); this.config.addSlaveAddress(uri.toString()); log.debug("slave {} state: {}", uri, map); @@ -149,13 +149,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = map.get("ip"); String port = map.get("port"); - RedisURI uri = resolveIP(ip, port).join(); - CompletionStage future = registerSentinel(uri, this.config, null); + InetSocketAddress sentinelAddr = resolveIP(ip, port).join(); + CompletionStage future = registerSentinel(sentinelAddr); connectionFutures.add(future.toCompletableFuture()); } - RedisURI sentinelIp = toURI(connection.getRedisClient().getAddr()); - CompletionStage f = registerSentinel(sentinelIp, this.config, null); + CompletionStage f = registerSentinel(connection.getRedisClient().getAddr()); connectionFutures.add(f.toCompletableFuture()); CompletableFuture future = CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[0])); @@ -203,6 +202,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { scheduleChangeCheck(cfg, null); } + private static boolean isHostname(String host) { + return NetUtil.createByteArrayFromIpAddressString(host) == null; + } + private void checkAuth(SentinelServersConfig cfg) { if (cfg.getPassword() == null) { return; @@ -288,9 +291,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } future.getNow().stream() - .map(addr -> toURI(addr)) - .filter(uri -> !sentinels.containsKey(uri) && !disconnectedSentinels.contains(uri)) - .forEach(uri -> registerSentinel(uri, serviceManager.getConfig(), host.getHost())); + .filter(addr -> { + RedisURI uri = toURI(addr); + return !sentinels.containsKey(uri) && !disconnectedSentinels.contains(uri); + }) + .forEach(addr -> registerSentinel(addr)); }); if (commonListener != null) { allNodes.addListener(commonListener); @@ -331,7 +336,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { RedisClient client = iterator.next(); RedisURI addr = toURI(client.getAddr()); - CompletionStage connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, null); + String hostname = null; + if (isHostname(client.getAddr().getHostName())) { + hostname = client.getAddr().getHostName(); + } + CompletionStage connectionFuture = connectToNode(NodeType.SENTINEL, cfg, addr, hostname); connectionFuture.whenComplete((connection, e) -> { if (e != null) { lastException.set(e); @@ -384,7 +393,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return CompletableFuture.completedFuture(null); } - List> newUris = list.stream().filter(m -> { + List> newUris = list.stream().filter(m -> { String flags = m.getOrDefault("flags", ""); String masterLinkStatus = m.getOrDefault("master-link-status", ""); if (!m.isEmpty() && !isSlaveDown(flags, masterLinkStatus)) { @@ -394,9 +403,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }).map(m -> { String ip = m.get("ip"); String port = m.get("port"); - return serviceManager.toURI(scheme, ip, port); - }).map(addr -> { - CompletionStage f = serviceManager.resolveIP(addr); + CompletionStage f = resolveIP(ip, port); return f.exceptionally(ex -> { log.error("unable to resolve hostname", ex); return null; @@ -405,7 +412,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { CompletableFuture futures = CompletableFuture.allOf(newUris.toArray(new CompletableFuture[0])); return futures.whenComplete((r, ex) -> { - List uris = newUris.stream().map(u -> { + List uris = newUris.stream().map(u -> { try { return u.getNow(null); } catch (Exception exc) { @@ -414,8 +421,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }).filter(u -> u != null).collect(Collectors.toList()); InetSocketAddress addr = connection.getRedisClient().getAddr(); - RedisURI currentAddr = toURI(addr); - uris.add(currentAddr); + uris.add(addr); updateSentinels(uris); }); @@ -439,8 +445,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String masterHost = map.get("master-host"); String masterPort = map.get("master-port"); - CompletableFuture slaveAddrFuture = resolveIP(host, port); - CompletableFuture masterAddrFuture; + CompletableFuture slaveAddrFuture = resolveIP(host, port); + CompletableFuture masterAddrFuture; if ("?".equals(masterHost)) { masterAddrFuture = CompletableFuture.completedFuture(null); } else { @@ -456,8 +462,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } }) .thenCompose(res -> { - RedisURI slaveAddr = slaveAddrFuture.getNow(null); - RedisURI masterAddr = masterAddrFuture.getNow(null); + InetSocketAddress slaveAddr = slaveAddrFuture.getNow(null); + InetSocketAddress masterAddr = masterAddrFuture.getNow(null); if (isSlaveDown(flags, masterLinkStatus)) { slaveDown(slaveAddr); return CompletableFuture.completedFuture(res); @@ -466,7 +472,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return CompletableFuture.completedFuture(res); } - currentSlaves.add(slaveAddr); + RedisURI uri = toURI(slaveAddr); + currentSlaves.add(uri); return addSlave(slaveAddr).whenComplete((r, e) -> { if (e != null) { log.error("Unable to add slave {}", slaveAddr, e); @@ -480,8 +487,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); entry.getAllEntries().stream() .map(e -> e.getClient().getAddr()) - .map(a -> serviceManager.toURI(scheme, a.getAddress().getHostAddress(), String.valueOf(a.getPort()))) - .filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get())) + .filter(a -> { + RedisURI uri = toURI(a); + return !currentSlaves.contains(uri) && !uri.equals(currentMaster.get()); + }) .forEach(a -> slaveDown(a)); }); }); @@ -495,7 +504,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { RedisURI current = currentMaster.get(); if (!newMaster.equals(current) && currentMaster.compareAndSet(current, newMaster)) { - CompletableFuture changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster); + RedisURI host = newMaster; + if (newMaster.isSsl()) { + RedisURI h = masterFuture.toCompletableFuture().join(); + if (!h.isIP()) { + host = new RedisURI(scheme, h.getHost(), h.getPort()); + } + } + CompletableFuture changeFuture = changeMaster(singleSlotRange.getStartSlot(), host); return changeFuture.exceptionally(ex -> { currentMaster.compareAndSet(newMaster, current); return null; @@ -505,16 +521,27 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }); } - private void updateSentinels(Collection newUris) { - newUris.stream() - .filter(uri -> !sentinels.containsKey(uri)) - .forEach(uri -> { + private void updateSentinels(Collection newAddrs) { + newAddrs.stream() + .filter(addr -> { + RedisURI uri = toURI(addr); + return !sentinels.containsKey(uri); + }) + .forEach(addr -> { + RedisURI uri = toURI(addr); disconnectedSentinels.remove(uri); - registerSentinel(uri, serviceManager.getConfig(), null); + registerSentinel(addr); }); sentinels.keySet().stream() - .filter(uri -> !newUris.contains(uri)) + .filter(uri -> { + for (InetSocketAddress addr : newAddrs) { + if (uri.equals(addr)) { + return false; + } + } + return true; + }) .forEach(uri -> { RedisClient sentinel = sentinels.remove(uri); if (sentinel != null) { @@ -531,24 +558,21 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return disconnectedSlaves; } - private CompletionStage registerSentinel(RedisURI addr, MasterSlaveServersConfig c, String sslHostname) { - boolean isHostname = NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null; - if (!isHostname) { - RedisClient sentinel = sentinels.get(addr); - if (sentinel != null) { - return CompletableFuture.completedFuture(null); - } + private CompletionStage registerSentinel(InetSocketAddress addr) { + RedisURI uri = toURI(addr); + RedisClient sentinel = sentinels.get(uri); + if (sentinel != null) { + return CompletableFuture.completedFuture(null); } - RedisClient client = createClient(NodeType.SENTINEL, addr, c.getConnectTimeout(), c.getTimeout(), sslHostname); + RedisURI hostname = serviceManager.toURI(scheme, addr.getAddress().getHostName(), "" + addr.getPort()); + RedisClient client = createClient(NodeType.SENTINEL, addr, hostname, null); CompletableFuture future = client.resolveAddr(); return future.thenCompose(res -> { - RedisURI ipAddr = toURI(client.getAddr()); - if (isHostname) { - RedisClient sentinel = sentinels.get(ipAddr); - if (sentinel != null) { - return CompletableFuture.completedFuture(null); - } + RedisURI ipAddr = toURI(res); + RedisClient s = sentinels.get(ipAddr); + if (s != null) { + return CompletableFuture.completedFuture(null); } CompletionStage f = client.connectAsync(); @@ -559,55 +583,58 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } if (sentinels.putIfAbsent(ipAddr, client) == null) { log.info("sentinel: {} added", ipAddr); + } else { + client.shutdownAsync(); } return null; }); }); } - private CompletableFuture resolveIP(String host, String port) { + private CompletableFuture resolveIP(String host, String port) { RedisURI uri = serviceManager.toURI(scheme, host, port); - return serviceManager.resolveIP(uri); + return serviceManager.resolve(uri); } private RedisURI toURI(InetSocketAddress addr) { return serviceManager.toURI(scheme, addr.getAddress().getHostAddress(), "" + addr.getPort()); } - private CompletableFuture addSlave(RedisURI uri) { + private CompletableFuture addSlave(InetSocketAddress addr) { if (config.isSlaveNotUsed()) { - log.info("slave: {} is up", uri); + log.info("slave: {} is up", addr); return CompletableFuture.completedFuture(null); } // to avoid addition twice MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - if (!entry.hasSlave(uri)) { - CompletableFuture future = entry.addSlave(uri); + if (!entry.hasSlave(addr)) { + RedisURI uri = serviceManager.toURI(scheme, addr.getHostName(), "" + addr.getPort()); + CompletableFuture future = entry.addSlave(addr, uri); return future.thenApply(res -> { - log.info("slave: {} added", uri); + log.info("slave: {} added", addr); return null; }); } - CompletableFuture f = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER); + CompletableFuture f = entry.slaveUpNoMasterExclusionAsync(addr, FreezeReason.MANAGER); return f.thenCompose(e -> { if (e) { - log.info("slave: {} is up", uri); - return entry.excludeMasterFromSlaves(uri); + log.info("slave: {} is up", addr); + return entry.excludeMasterFromSlaves(addr); } return CompletableFuture.completedFuture(e); }).thenApply(e -> null); } - private void slaveDown(RedisURI uri) { + private void slaveDown(InetSocketAddress addr) { if (config.isSlaveNotUsed()) { - log.warn("slave: {} is down", uri); + log.warn("slave: {} is down", addr); } else { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - entry.slaveDownAsync(uri, FreezeReason.MANAGER).thenAccept(r -> { + entry.slaveDownAsync(addr, FreezeReason.MANAGER).thenAccept(r -> { if (r) { - log.warn("slave: {} is down", uri); + log.warn("slave: {} is down", addr); } }); } @@ -621,9 +648,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return baseStatus; } - private boolean isUseSameMaster(RedisURI slaveAddr, RedisURI slaveMasterAddr) { + private boolean isUseSameMaster(InetSocketAddress slaveAddr, InetSocketAddress slaveMasterAddr) { RedisURI master = currentMaster.get(); - if (!master.equals(slaveMasterAddr) && !slaveAddr.equals(master)) { + if (!master.equals(slaveMasterAddr) && !master.equals(slaveAddr)) { log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMasterAddr, master); return false; } diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 4a237d312..d70c0a9f9 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -356,6 +356,34 @@ public class ServiceManager { return result; } + public CompletableFuture resolve(RedisURI address) { + if (address.isIP()) { + try { + InetAddress ip = InetAddress.getByName(address.getHost()); + InetSocketAddress addr = new InetSocketAddress(InetAddress.getByAddress(address.getHost(), ip.getAddress()), address.getPort()); + return CompletableFuture.completedFuture(addr); + } catch (UnknownHostException e) { + throw new IllegalArgumentException(e); + } + } + + CompletableFuture result = new CompletableFuture<>(); + AddressResolver resolver = resolverGroup.getResolver(group.next()); + InetSocketAddress addr = InetSocketAddress.createUnresolved(address.getHost(), address.getPort()); + Future future = resolver.resolve(addr); + future.addListener((FutureListener) f -> { + if (!f.isSuccess()) { + log.error("Unable to resolve {}", address, f.cause()); + result.completeExceptionally(f.cause()); + return; + } + + InetSocketAddress s = f.getNow(); + result.complete(s); + }); + return result; + } + public RedisURI toURI(String scheme, String host, String port) { // convert IPv6 address to unified compressed format if (NetUtil.isValidIpV6Address(host)) {