Fixed - Repeated new connections with AWS Elasticache serverless #5589

pull/5599/head
Nikita Koksharov 1 year ago
parent ff56c8942d
commit a66ac15e11

@ -17,8 +17,6 @@ package org.redisson.cluster;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.*;
@ -360,25 +358,16 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (configEndpointHostName != null) {
String address = cfg.getNodeAddresses().iterator().next();
RedisURI uri = new RedisURI(address);
Future<List<InetSocketAddress>> allNodes = serviceManager.resolveAll(uri);
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(future.cause());
if (!future.isSuccess()) {
checkClusterState(cfg, Collections.emptyIterator(), lastException);
return;
}
List<RedisURI> nodes = new ArrayList<>();
for (InetSocketAddress addr : future.getNow()) {
RedisURI address = serviceManager.toURI(uri.getScheme(), addr.getAddress().getHostAddress(), "" + addr.getPort());
nodes.add(address);
}
Iterator<RedisURI> nodesIterator = nodes.iterator();
checkClusterState(cfg, nodesIterator, lastException);
CompletableFuture<List<RedisURI>> allNodes = serviceManager.resolveAll(uri);
allNodes.whenComplete((nodes, ex) -> {
AtomicReference<Throwable> lastException = new AtomicReference<>(ex);
if (ex != null) {
checkClusterState(cfg, Collections.emptyIterator(), lastException);
return;
}
Iterator<RedisURI> nodesIterator = nodes.iterator();
checkClusterState(cfg, nodesIterator, lastException);
});
} else {
AtomicReference<Throwable> lastException = new AtomicReference<>();
@ -855,8 +844,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
continue;
}
CompletableFuture<RedisURI> ipFuture = serviceManager.resolveIP(clusterNodeInfo.getAddress());
CompletableFuture<Void> f = ipFuture.thenAccept(address -> {
CompletableFuture<List<RedisURI>> ipsFuture = serviceManager.resolveAll(clusterNodeInfo.getAddress());
CompletableFuture<Void> f = ipsFuture.thenAccept(addresses -> {
if (addresses.size() > 1 && clusterNodeInfo.containsFlag(Flag.MASTER)) {
addresses.sort(null);
Collections.shuffle(addresses, new Random(serviceManager.getId().hashCode()));
}
RedisURI address = addresses.get(0);
if (clusterNodeInfo.containsFlag(Flag.SLAVE)) {
ClusterPartition masterPartition = partitions.computeIfAbsent(masterId, k -> new ClusterPartition(masterId));

@ -17,8 +17,6 @@ package org.redisson.connection;
import io.netty.util.NetUtil;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.StringUtil;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
@ -35,10 +33,11 @@ import org.redisson.misc.RedisURI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -277,35 +276,38 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private void scheduleSentinelDNSCheck() {
monitorFuture = serviceManager.newTimeout(t -> {
AtomicInteger sentinelsCounter = new AtomicInteger(sentinelHosts.size());
performSentinelDNSCheck(future -> {
if (sentinelsCounter.decrementAndGet() == 0) {
scheduleSentinelDNSCheck();
}
});
CompletableFuture<Void> f = performSentinelDNSCheck();
f.thenAccept(r -> scheduleSentinelDNSCheck());
}, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
}
private void performSentinelDNSCheck(FutureListener<List<InetSocketAddress>> commonListener) {
private CompletableFuture<Void> performSentinelDNSCheck() {
List<CompletableFuture<List<RedisURI>>> futures = new ArrayList<>();
for (RedisURI host : sentinelHosts) {
Future<List<InetSocketAddress>> allNodes = serviceManager.resolveAll(host);
allNodes.addListener((FutureListener<List<InetSocketAddress>>) future -> {
if (!future.isSuccess()) {
log.error("Unable to resolve {}", host.getHost(), future.cause());
CompletableFuture<List<RedisURI>> allNodes = serviceManager.resolveAll(host);
CompletableFuture<List<RedisURI>> f = allNodes.whenComplete((nodes, ex) -> {
if (ex != null) {
log.error("Unable to resolve {}", host.getHost(), ex);
return;
}
future.getNow().stream()
.filter(addr -> {
RedisURI uri = toURI(addr);
nodes.stream()
.filter(uri -> {
return !sentinels.containsKey(uri) && !disconnectedSentinels.contains(uri);
})
.forEach(addr -> registerSentinel(addr));
.forEach(uri -> {
try {
byte[] addr = NetUtil.createByteArrayFromIpAddressString(uri.getHost());
InetSocketAddress address = new InetSocketAddress(InetAddress.getByAddress(host.getHost(), addr), uri.getPort());
registerSentinel(address);
} catch (UnknownHostException e) {
log.error(e.getMessage(), e);
}
});
});
if (commonListener != null) {
allNodes.addListener(commonListener);
}
futures.add(f);
}
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator<RedisClient> iterator) {
@ -328,8 +330,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.error("Can't update cluster state", lastException.get());
}
disconnectedSentinels.clear();
performSentinelDNSCheck(null);
scheduleChangeCheck(cfg, null);
CompletableFuture<Void> f = performSentinelDNSCheck();
f.thenAccept(r -> scheduleChangeCheck(cfg, null));
return;
}
if (!serviceManager.getShutdownLatch().acquire()) {

@ -37,8 +37,10 @@ import io.netty.resolver.dns.DnsServerAddressStreamProviders;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.*;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.*;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.PlatformDependent;
import org.redisson.ElementsSubscribeService;
import org.redisson.QueueTransferService;
@ -74,6 +76,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
*
@ -277,11 +280,23 @@ public final class ServiceManager {
return group;
}
public Future<List<InetSocketAddress>> resolveAll(RedisURI uri) {
public CompletableFuture<List<RedisURI>> resolveAll(RedisURI uri) {
if (uri.isIP()) {
return CompletableFuture.completedFuture(Collections.singletonList(uri));
}
AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(group.next());
return resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
Future<List<InetSocketAddress>> f = resolver.resolveAll(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
CompletableFuture<List<RedisURI>> result = new CompletableFuture<>();
f.addListener((GenericFutureListener<Future<List<InetSocketAddress>>>) future -> {
List<RedisURI> nodes = future.getNow().stream().map(addr -> {
return toURI(uri.getScheme(), addr.getAddress().getHostAddress(), "" + addr.getPort());
}).collect(Collectors.toList());
result.complete(nodes);
});
return result;
}
public AddressResolverGroup<InetSocketAddress> getResolverGroup() {
return resolverGroup;
}

Loading…
Cancel
Save