diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 24d39b79e..1fc880831 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -16,7 +16,6 @@ package org.redisson.cluster; import io.netty.resolver.AddressResolver; -import io.netty.util.NetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; @@ -32,8 +31,12 @@ import org.redisson.config.ClusterServersConfig; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; -import org.redisson.connection.*; +import org.redisson.connection.CRC16; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; +import org.redisson.connection.MasterSlaveConnectionManager; +import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.SingleEntry; +import org.redisson.misc.AsyncCountDownLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedisURI; import org.redisson.misc.RedissonPromise; @@ -94,7 +97,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { try { RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); - if (cfg.getNodeAddresses().size() == 1 && NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null) { + if (cfg.getNodeAddresses().size() == 1 && !addr.isIP()) { configEndpointHostName = addr.getHost(); } @@ -113,7 +116,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { lastClusterNode = addr; - Collection partitions = parsePartitions(nodes); + RFuture> partitionsFuture = parsePartitions(nodes); + Collection partitions = partitionsFuture.syncUninterruptibly().getNow(); List> masterFutures = new ArrayList<>(); for (ClusterPartition partition : partitions) { if (partition.isMasterFail()) { @@ -466,14 +470,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); } - Collection newPartitions = parsePartitions(nodes); - RFuture masterFuture = checkMasterNodesChange(cfg, newPartitions); - checkSlaveNodesChange(newPartitions); - masterFuture.onComplete((res, ex) -> { - checkSlotsMigration(newPartitions); - checkSlotsChange(newPartitions); - getShutdownLatch().release(); - scheduleClusterChangeCheck(cfg); + RFuture> newPartitionsFuture = parsePartitions(nodes); + newPartitionsFuture.onComplete((newPartitions, ex) -> { + RFuture masterFuture = checkMasterNodesChange(cfg, newPartitions); + checkSlaveNodesChange(newPartitions); + masterFuture.onComplete((res, exc) -> { + checkSlotsMigration(newPartitions); + checkSlotsChange(newPartitions); + getShutdownLatch().release(); + scheduleClusterChangeCheck(cfg); + }); }); }); } @@ -764,12 +770,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return natMapper.map(address); } - private Collection parsePartitions(List nodes) { + private RFuture> parsePartitions(List nodes) { Map partitions = new HashMap<>(); + AsyncCountDownLatch latch = new AsyncCountDownLatch(); + int counter = 0; for (ClusterNodeInfo clusterNodeInfo : nodes) { if (clusterNodeInfo.containsFlag(Flag.NOADDR) || clusterNodeInfo.containsFlag(Flag.HANDSHAKE) - || clusterNodeInfo.getAddress() == null) { + || clusterNodeInfo.getAddress() == null + || (clusterNodeInfo.getSlotRanges().isEmpty() && clusterNodeInfo.containsFlag(Flag.MASTER))) { // skip it continue; } @@ -786,32 +795,45 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { continue; } - RedisURI address = applyNatMap(clusterNodeInfo.getAddress()); - if (clusterNodeInfo.containsFlag(Flag.SLAVE)) { - ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId)); - ClusterPartition slavePartition = partitions.computeIfAbsent(clusterNodeInfo.getNodeId(), - k -> new ClusterPartition(clusterNodeInfo.getNodeId())); - slavePartition.setType(Type.SLAVE); - slavePartition.setParent(masterPartition); - - masterPartition.addSlaveAddress(address); - if (clusterNodeInfo.containsFlag(Flag.FAIL)) { - masterPartition.addFailedSlaveAddress(address); + RFuture ipFuture = resolveIP(clusterNodeInfo.getAddress()); + counter++; + ipFuture.onComplete((addr, e) -> { + if (e != null) { + latch.countDown(); + return; } - } else if (clusterNodeInfo.containsFlag(Flag.MASTER)) { - ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId)); - masterPartition.addSlotRanges(clusterNodeInfo.getSlotRanges()); - masterPartition.setMasterAddress(address); - masterPartition.setType(Type.MASTER); - if (clusterNodeInfo.containsFlag(Flag.FAIL)) { - masterPartition.setMasterFail(true); + + RedisURI address = applyNatMap(addr); + if (clusterNodeInfo.containsFlag(Flag.SLAVE)) { + ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId)); + ClusterPartition slavePartition = partitions.computeIfAbsent(clusterNodeInfo.getNodeId(), + k -> new ClusterPartition(clusterNodeInfo.getNodeId())); + slavePartition.setType(Type.SLAVE); + slavePartition.setParent(masterPartition); + + masterPartition.addSlaveAddress(address); + if (clusterNodeInfo.containsFlag(Flag.FAIL)) { + masterPartition.addFailedSlaveAddress(address); + } + } else if (clusterNodeInfo.containsFlag(Flag.MASTER)) { + ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId)); + masterPartition.addSlotRanges(clusterNodeInfo.getSlotRanges()); + masterPartition.setMasterAddress(address); + masterPartition.setType(Type.MASTER); + if (clusterNodeInfo.containsFlag(Flag.FAIL)) { + masterPartition.setMasterFail(true); + } } - } + latch.countDown(); + }); } - - addCascadeSlaves(partitions); - - return partitions.values(); + + RPromise> result = new RedissonPromise<>(); + latch.latch(() -> { + addCascadeSlaves(partitions); + result.trySuccess(partitions.values()); + }, counter); + return result; } private void addCascadeSlaves(Map partitions) { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index cbb799878..5ccc518cc 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -26,16 +26,14 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.resolver.AddressResolver; import io.netty.resolver.AddressResolverGroup; import io.netty.resolver.DefaultAddressResolverGroup; import io.netty.resolver.dns.DnsServerAddressStreamProviders; import io.netty.util.Timer; import io.netty.util.TimerTask; import io.netty.util.*; -import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ImmediateEventExecutor; -import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.*; import io.netty.util.internal.PlatformDependent; import org.redisson.ElementsSubscribeService; import org.redisson.Version; @@ -691,4 +689,28 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public RedisURI applyNatMap(RedisURI address) { return address; } + + protected RFuture resolveIP(RedisURI address) { + if (address.isIP()) { + return RedissonPromise.newSucceededFuture(address); + } + + RPromise result = new RedissonPromise<>(); + AddressResolver resolver = resolverGroup.getResolver(getGroup().next()); + InetSocketAddress addr = InetSocketAddress.createUnresolved(address.getHost(), address.getPort()); + Future future = resolver.resolve(addr); + future.addListener((FutureListener) f -> { + if (!f.isSuccess()) { + log.error("Unable to resolve " + address, f.cause()); + result.tryFailure(f.cause()); + return; + } + + InetSocketAddress s = f.getNow(); + RedisURI uri = new RedisURI(address.getScheme() + "://" + s.getAddress().getHostAddress() + ":" + address.getPort()); + result.trySuccess(uri); + }); + return result; + } + }