From a66ac15e118a2030db355d7091092480f844ccaa Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 30 Jan 2024 12:50:17 +0300 Subject: [PATCH] Fixed - Repeated new connections with AWS Elasticache serverless #5589 --- .../cluster/ClusterConnectionManager.java | 38 +++++++-------- .../connection/SentinelConnectionManager.java | 48 ++++++++++--------- .../redisson/connection/ServiceManager.java | 23 +++++++-- 3 files changed, 60 insertions(+), 49 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 8a68e8945..82576d780 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -17,8 +17,6 @@ package org.redisson.cluster; import io.netty.buffer.ByteBuf; import io.netty.util.Timeout; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.*; @@ -360,25 +358,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (configEndpointHostName != null) { String address = cfg.getNodeAddresses().iterator().next(); RedisURI uri = new RedisURI(address); - Future> allNodes = serviceManager.resolveAll(uri); - allNodes.addListener(new FutureListener>() { - @Override - public void operationComplete(Future> future) throws Exception { - AtomicReference lastException = new AtomicReference(future.cause()); - if (!future.isSuccess()) { - checkClusterState(cfg, Collections.emptyIterator(), lastException); - return; - } - - List nodes = new ArrayList<>(); - for (InetSocketAddress addr : future.getNow()) { - RedisURI address = serviceManager.toURI(uri.getScheme(), addr.getAddress().getHostAddress(), "" + addr.getPort()); - nodes.add(address); - } - - Iterator nodesIterator = nodes.iterator(); - checkClusterState(cfg, nodesIterator, lastException); + CompletableFuture> allNodes = serviceManager.resolveAll(uri); + allNodes.whenComplete((nodes, ex) -> { + AtomicReference lastException = new AtomicReference<>(ex); + if (ex != null) { + checkClusterState(cfg, Collections.emptyIterator(), lastException); + return; } + + Iterator nodesIterator = nodes.iterator(); + checkClusterState(cfg, nodesIterator, lastException); }); } else { AtomicReference lastException = new AtomicReference<>(); @@ -855,8 +844,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { continue; } - CompletableFuture ipFuture = serviceManager.resolveIP(clusterNodeInfo.getAddress()); - CompletableFuture f = ipFuture.thenAccept(address -> { + CompletableFuture> ipsFuture = serviceManager.resolveAll(clusterNodeInfo.getAddress()); + CompletableFuture f = ipsFuture.thenAccept(addresses -> { + if (addresses.size() > 1 && clusterNodeInfo.containsFlag(Flag.MASTER)) { + addresses.sort(null); + Collections.shuffle(addresses, new Random(serviceManager.getId().hashCode())); + } + RedisURI address = addresses.get(0); if (clusterNodeInfo.containsFlag(Flag.SLAVE)) { ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId)); diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index a4d040065..d8c765f98 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -17,8 +17,6 @@ package org.redisson.connection; import io.netty.util.NetUtil; import io.netty.util.Timeout; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.StringUtil; import org.redisson.api.NodeType; import org.redisson.api.RFuture; @@ -35,10 +33,11 @@ import org.redisson.misc.RedisURI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -277,35 +276,38 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private void scheduleSentinelDNSCheck() { monitorFuture = serviceManager.newTimeout(t -> { - AtomicInteger sentinelsCounter = new AtomicInteger(sentinelHosts.size()); - performSentinelDNSCheck(future -> { - if (sentinelsCounter.decrementAndGet() == 0) { - scheduleSentinelDNSCheck(); - } - }); + CompletableFuture f = performSentinelDNSCheck(); + f.thenAccept(r -> scheduleSentinelDNSCheck()); }, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS); } - private void performSentinelDNSCheck(FutureListener> commonListener) { + private CompletableFuture performSentinelDNSCheck() { + List>> futures = new ArrayList<>(); for (RedisURI host : sentinelHosts) { - Future> allNodes = serviceManager.resolveAll(host); - allNodes.addListener((FutureListener>) future -> { - if (!future.isSuccess()) { - log.error("Unable to resolve {}", host.getHost(), future.cause()); + CompletableFuture> allNodes = serviceManager.resolveAll(host); + CompletableFuture> f = allNodes.whenComplete((nodes, ex) -> { + if (ex != null) { + log.error("Unable to resolve {}", host.getHost(), ex); return; } - future.getNow().stream() - .filter(addr -> { - RedisURI uri = toURI(addr); + nodes.stream() + .filter(uri -> { return !sentinels.containsKey(uri) && !disconnectedSentinels.contains(uri); }) - .forEach(addr -> registerSentinel(addr)); + .forEach(uri -> { + try { + byte[] addr = NetUtil.createByteArrayFromIpAddressString(uri.getHost()); + InetSocketAddress address = new InetSocketAddress(InetAddress.getByAddress(host.getHost(), addr), uri.getPort()); + registerSentinel(address); + } catch (UnknownHostException e) { + log.error(e.getMessage(), e); + } + }); }); - if (commonListener != null) { - allNodes.addListener(commonListener); - } + futures.add(f); } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator iterator) { @@ -328,8 +330,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.error("Can't update cluster state", lastException.get()); } disconnectedSentinels.clear(); - performSentinelDNSCheck(null); - scheduleChangeCheck(cfg, null); + CompletableFuture f = performSentinelDNSCheck(); + f.thenAccept(r -> scheduleChangeCheck(cfg, null)); return; } if (!serviceManager.getShutdownLatch().acquire()) { diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 21405e121..a308242cd 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -37,8 +37,10 @@ import io.netty.resolver.dns.DnsServerAddressStreamProviders; import io.netty.util.Timer; import io.netty.util.TimerTask; import io.netty.util.*; +import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.*; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.PlatformDependent; import org.redisson.ElementsSubscribeService; import org.redisson.QueueTransferService; @@ -74,6 +76,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * @@ -277,11 +280,23 @@ public final class ServiceManager { return group; } - public Future> resolveAll(RedisURI uri) { + public CompletableFuture> resolveAll(RedisURI uri) { + if (uri.isIP()) { + return CompletableFuture.completedFuture(Collections.singletonList(uri)); + } + AddressResolver resolver = resolverGroup.getResolver(group.next()); - return resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())); + Future> f = resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())); + CompletableFuture> result = new CompletableFuture<>(); + f.addListener((GenericFutureListener>>) future -> { + List nodes = future.getNow().stream().map(addr -> { + return toURI(uri.getScheme(), addr.getAddress().getHostAddress(), "" + addr.getPort()); + }).collect(Collectors.toList()); + result.complete(nodes); + }); + return result; } - + public AddressResolverGroup getResolverGroup() { return resolverGroup; }