diff --git a/redisson/src/main/java/org/redisson/api/redisnode/RedisSentinel.java b/redisson/src/main/java/org/redisson/api/redisnode/RedisSentinel.java index 4aa330041..c953b16c4 100644 --- a/redisson/src/main/java/org/redisson/api/redisnode/RedisSentinel.java +++ b/redisson/src/main/java/org/redisson/api/redisnode/RedisSentinel.java @@ -15,7 +15,8 @@ */ package org.redisson.api.redisnode; -import java.net.InetSocketAddress; +import org.redisson.misc.RedisURI; + import java.util.List; import java.util.Map; @@ -33,7 +34,7 @@ public interface RedisSentinel extends RedisNode, RedisSentinelAsync { * @param masterName - name of master * @return network address */ - InetSocketAddress getMasterAddr(String masterName); + RedisURI getMasterAddr(String masterName); /** * Returns list of map containing info regarding Redis Sentinel server diff --git a/redisson/src/main/java/org/redisson/api/redisnode/RedisSentinelAsync.java b/redisson/src/main/java/org/redisson/api/redisnode/RedisSentinelAsync.java index 365b6cea6..c0cddcd74 100644 --- a/redisson/src/main/java/org/redisson/api/redisnode/RedisSentinelAsync.java +++ b/redisson/src/main/java/org/redisson/api/redisnode/RedisSentinelAsync.java @@ -16,8 +16,8 @@ package org.redisson.api.redisnode; import org.redisson.api.RFuture; +import org.redisson.misc.RedisURI; -import java.net.InetSocketAddress; import java.util.List; import java.util.Map; @@ -35,7 +35,7 @@ public interface RedisSentinelAsync extends RedisNodeAsync { * @param masterName - name of master * @return network address */ - RFuture getMasterAddrAsync(String masterName); + RFuture getMasterAddrAsync(String masterName); /** * Returns list of map containing info regarding Redis Sentinel server diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index ca785ee4f..89006465f 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -24,8 +24,8 @@ import org.redisson.client.protocol.convertor.*; import org.redisson.client.protocol.decoder.*; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; import org.redisson.cluster.ClusterNodeInfo; +import org.redisson.misc.RedisURI; -import java.net.InetSocketAddress; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -459,8 +459,8 @@ public interface RedisCommands { RedisStrictCommand SENTINEL_REMOVE = new RedisStrictCommand("SENTINEL", "REMOVE", new VoidReplayConvertor()); RedisStrictCommand SENTINEL_MONITOR = new RedisStrictCommand("SENTINEL", "MONITOR", new VoidReplayConvertor()); - RedisStrictCommand SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", - new InetSocketAddressDecoder()); + RedisStrictCommand SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", + new RedisURIDecoder()); RedisCommand>> SENTINEL_MASTERS = new RedisCommand>>("SENTINEL", "MASTERS", new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder())); RedisCommand> SENTINEL_MASTER = new RedisCommand("SENTINEL", "MASTER", new ObjectMapReplayDecoder()); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/InetSocketAddressDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/RedisURIDecoder.java similarity index 69% rename from redisson/src/main/java/org/redisson/client/protocol/decoder/InetSocketAddressDecoder.java rename to redisson/src/main/java/org/redisson/client/protocol/decoder/RedisURIDecoder.java index ac2ecdbe5..6ed32ec36 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/InetSocketAddressDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/RedisURIDecoder.java @@ -19,10 +19,8 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; +import org.redisson.misc.RedisURI; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; import java.util.List; /** @@ -30,7 +28,7 @@ import java.util.List; * @author Nikita Koksharov * */ -public class InetSocketAddressDecoder implements MultiDecoder { +public class RedisURIDecoder implements MultiDecoder { @Override public Decoder getDecoder(Codec codec, int paramNum, State state) { @@ -38,15 +36,11 @@ public class InetSocketAddressDecoder implements MultiDecoder } @Override - public InetSocketAddress decode(List parts, State state) { + public RedisURI decode(List parts, State state) { if (parts.isEmpty()) { return null; } - try { - return new InetSocketAddress(InetAddress.getByName((String) parts.get(0)), Integer.valueOf((String) parts.get(1))); - } catch (UnknownHostException e) { - throw new IllegalStateException(e); - } + return new RedisURI("redis", (String) parts.get(0), Integer.valueOf((String) parts.get(1))); } } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 5ccc518cc..5a919dae8 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -691,6 +691,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } protected RFuture resolveIP(RedisURI address) { + return resolveIP(address.getScheme(), address); + } + + protected RFuture resolveIP(String scheme, RedisURI address) { if (address.isIP()) { return RedissonPromise.newSucceededFuture(address); } @@ -707,7 +711,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } InetSocketAddress s = f.getNow(); - RedisURI uri = new RedisURI(address.getScheme() + "://" + s.getAddress().getHostAddress() + ":" + address.getPort()); + RedisURI uri = new RedisURI(scheme + "://" + s.getAddress().getHostAddress() + ":" + address.getPort()); result.trySuccess(uri); }); return result; diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index da13e2a71..fe614bffc 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -29,7 +29,7 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.config.*; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; -import org.redisson.misc.CountableListener; +import org.redisson.misc.AsyncCountDownLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedisURI; import org.redisson.misc.RedissonPromise; @@ -120,12 +120,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { continue; } - InetSocketAddress master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); + RedisURI master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); if (master == null) { throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!"); } - RedisURI masterHost = toURI(master.getHostString(), String.valueOf(master.getPort())); + RedisURI masterHost = resolveIP(scheme, master).syncUninterruptibly().getNow(); this.config.setMasterAddress(masterHost.toString()); currentMaster.set(masterHost); log.info("master: {} added", masterHost); @@ -136,20 +136,21 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { continue; } - String ip = map.get("ip"); + String host = map.get("ip"); String port = map.get("port"); String flags = map.getOrDefault("flags", ""); String masterLinkStatus = map.getOrDefault("master-link-status", ""); - RedisURI host = toURI(ip, port); + RedisURI uri = toURI(host, port); + uri = resolveIP(uri).syncUninterruptibly().getNow(); - this.config.addSlaveAddress(host.toString()); - log.debug("slave {} state: {}", host, map); - log.info("slave: {} added", host); + this.config.addSlaveAddress(uri.toString()); + log.debug("slave {} state: {}", uri, map); + log.info("slave: {} added", uri); if (isSlaveDown(flags, masterLinkStatus)) { - disconnectedSlaves.add(host); - log.warn("slave: {} is down", host); + disconnectedSlaves.add(uri); + log.warn("slave: {} is down", uri); } } @@ -163,8 +164,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = map.get("ip"); String port = map.get("port"); - RedisURI sentinelAddr = toURI(ip, port); - RFuture future = registerSentinel(sentinelAddr, this.config, null); + RedisURI uri = toURI(ip, port); + uri = resolveIP(uri).syncUninterruptibly().getNow(); + RFuture future = registerSentinel(uri, this.config, null); connectionFutures.add(future); } @@ -375,14 +377,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } }; - RFuture masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); - masterFuture.onComplete((master, e) -> { + RFuture masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); + masterFuture.thenCompose(u -> resolveIP(scheme, u)) + .whenComplete((newMaster, e) -> { if (e != null) { return; } RedisURI current = currentMaster.get(); - RedisURI newMaster = toURI(master.getHostString(), String.valueOf(master.getPort())); if (!newMaster.equals(current) && currentMaster.compareAndSet(current, newMaster)) { RFuture changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster); @@ -398,55 +400,62 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { if (!config.checkSkipSlavesInit()) { RFuture>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName()); commands.incrementAndGet(); - slavesFuture.onComplete((slavesMap, e) -> { - if (e != null) { + slavesFuture.onComplete((slavesMap, ex) -> { + if (ex != null) { return; } - + Set currentSlaves = new HashSet<>(slavesMap.size()); - List> futures = new ArrayList<>(); + AsyncCountDownLatch latch = new AsyncCountDownLatch(); for (Map map : slavesMap) { if (map.isEmpty()) { + latch.countDown(); continue; } - - String ip = map.get("ip"); + + String host = map.get("ip"); String port = map.get("port"); String flags = map.getOrDefault("flags", ""); String masterLinkStatus = map.getOrDefault("master-link-status", ""); String masterHost = map.get("master-host"); String masterPort = map.get("master-port"); - RedisURI slaveAddr = toURI(ip, port); - if (isSlaveDown(flags, masterLinkStatus)) { - slaveDown(slaveAddr); - continue; - } - if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) { - continue; - } - - currentSlaves.add(slaveAddr); - RFuture slaveFuture = addSlave(slaveAddr); - futures.add(slaveFuture); - } - - CountableListener listener = new CountableListener() { - @Override - protected void onSuccess(Void value) { - MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - entry.getAllEntries().stream() - .map(e -> e.getClient().getAddr()) - .map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort()))) - .filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get())) - .forEach(a -> slaveDown(a)); - }; - }; - - listener.setCounter(futures.size()); - for (RFuture f : futures) { - f.onComplete(listener); + RedisURI addr = toURI(host, port); + resolveIP(addr).onComplete((slaveAddr, exc) -> { + if (exc != null) { + log.error("Unable to add slave " + addr, exc); + latch.countDown(); + return; + } + + if (isSlaveDown(flags, masterLinkStatus)) { + slaveDown(slaveAddr); + latch.countDown(); + return; + } + if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) { + latch.countDown(); + return; + } + + currentSlaves.add(slaveAddr); + addSlave(slaveAddr).onComplete((r, e2) -> { + latch.countDown(); + if (e2 != null) { + log.error("Unable to add slave " + slaveAddr, e2); + } + }); + }); } + + latch.latch(() -> { + MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); + entry.getAllEntries().stream() + .map(e -> e.getClient().getAddr()) + .map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort()))) + .filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get())) + .forEach(a -> slaveDown(a)); + }, slavesMap.size()); }); slavesFuture.onComplete(commonListener); } @@ -456,8 +465,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { if (e != null || list.isEmpty()) { return; } - - Set newUris = list.stream().filter(m -> { + + AsyncCountDownLatch latch = new AsyncCountDownLatch(); + List> newUris = list.stream().filter(m -> { String flags = m.getOrDefault("flags", ""); String masterLinkStatus = m.getOrDefault("master-link-status", ""); if (!m.isEmpty() && !isSlaveDown(flags, masterLinkStatus)) { @@ -468,18 +478,30 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = m.get("ip"); String port = m.get("port"); return toURI(ip, port); - }).collect(Collectors.toSet()); - - InetSocketAddress addr = connection.getRedisClient().getAddr(); - RedisURI currentAddr = toURI(addr); - newUris.add(currentAddr); - - updateSentinels(newUris); + }).map(addr -> { + RFuture f = resolveIP(addr); + f.onComplete((res, ex) -> { + if (ex != null) { + log.error("unable to resolve hostname", ex); + } + latch.countDown(); + }); + return f; + }).collect(Collectors.toList()); + + latch.latch(() -> { + List uris = newUris.stream().map(u -> u.getNow()).filter(u -> u != null).collect(Collectors.toList()); + InetSocketAddress addr = connection.getRedisClient().getAddr(); + RedisURI currentAddr = toURI(addr); + uris.add(currentAddr); + + updateSentinels(uris); + }, newUris.size()); }); sentinelsFuture.onComplete(commonListener); } - private void updateSentinels(Set newUris) { + private void updateSentinels(Collection newUris) { newUris.stream() .filter(uri -> !sentinels.containsKey(uri)) .forEach(uri -> registerSentinel(uri, getConfig(), null)); diff --git a/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java b/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java index 81452e102..8e09c34fa 100644 --- a/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java +++ b/redisson/src/main/java/org/redisson/redisnode/SentinelRedisNode.java @@ -28,6 +28,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.Time; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.RPromise; +import org.redisson.misc.RedisURI; import org.redisson.misc.RedissonPromise; import java.net.InetSocketAddress; @@ -199,7 +200,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync { } @Override - public InetSocketAddress getMasterAddr(String masterName) { + public RedisURI getMasterAddr(String masterName) { return commandAsyncService.get(getMasterAddrAsync(masterName)); } @@ -229,7 +230,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync { } @Override - public RFuture getMasterAddrAsync(String masterName) { + public RFuture getMasterAddrAsync(String masterName) { return executeAsync(null, StringCodec.INSTANCE, -1, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName); } diff --git a/redisson/src/test/java/org/redisson/RedissonRedisNodesTest.java b/redisson/src/test/java/org/redisson/RedissonRedisNodesTest.java index 300e56f18..3ff4e5d79 100644 --- a/redisson/src/test/java/org/redisson/RedissonRedisNodesTest.java +++ b/redisson/src/test/java/org/redisson/RedissonRedisNodesTest.java @@ -9,6 +9,7 @@ import org.redisson.client.protocol.Time; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.Config; import org.redisson.connection.balancer.RandomLoadBalancer; +import org.redisson.misc.RedisURI; import java.io.IOException; import java.net.InetSocketAddress; @@ -275,8 +276,8 @@ public class RedissonRedisNodesTest extends BaseTest { for (RedisSentinel sentinel : nodes.getSentinels()) { Assertions.assertTrue(sentinel.ping()); - InetSocketAddress addr = sentinel.getMasterAddr("myMaster"); - assertThat(addr.getAddress().getHostAddress()).isEqualTo("127.0.0.1"); + RedisURI addr = sentinel.getMasterAddr("myMaster"); + assertThat(addr.getHost()).isEqualTo("127.0.0.1"); assertThat(addr.getPort()).isEqualTo(master.getRedisServerPort()); Map masterMap = sentinel.getMaster("myMaster");