diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index ad1eac228..f7b4cd7ba 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -91,7 +91,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { List failedMasters = new ArrayList(); for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); - RFuture connectionFuture = connectToNode(cfg, addr, null, addr.getHost()); + RFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); try { RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); @@ -276,7 +276,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } RPromise result = new RedissonPromise<>(); - RFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null, configEndpointHostName); + RFuture connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName); connectionFuture.onComplete((connection, ex1) -> { if (ex1 != null) { log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges()); @@ -425,7 +425,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return; } RedisURI uri = iterator.next(); - RFuture connectionFuture = connectToNode(cfg, uri, null, configEndpointHostName); + RFuture connectionFuture = connectToNode(cfg, uri, configEndpointHostName); connectionFuture.onComplete((connection, e) -> { if (e != null) { lastException.set(e); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index caac1e439..9dc3c3e41 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -36,10 +36,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.cluster.ClusterSlotRange; import org.redisson.command.CommandSyncService; -import org.redisson.config.BaseMasterSlaveServersConfig; -import org.redisson.config.Config; -import org.redisson.config.MasterSlaveServersConfig; -import org.redisson.config.TransportMode; +import org.redisson.config.*; import org.redisson.misc.CountableListener; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; @@ -149,7 +146,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected PublishSubscribeService subscribeService; - private final Map nodeConnections = new ConcurrentHashMap<>(); + private final Map nodeConnections = new ConcurrentHashMap<>(); public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) { this(config, id); @@ -244,34 +241,19 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - protected final void disconnectNode(RedisClient client) { - RedisConnection conn = nodeConnections.remove(client); - if (conn != null) { - conn.closeAsync(); - } - } - - protected final RFuture connectToNode(BaseMasterSlaveServersConfig cfg, RedisURI addr, RedisClient client, String sslHostname) { - final Object key; - if (client != null) { - key = client; - } else { - key = addr; - } - RedisConnection conn = nodeConnections.get(key); + protected final RFuture connectToNode(BaseConfig cfg, RedisURI addr, String sslHostname) { + RedisConnection conn = nodeConnections.get(addr); if (conn != null) { if (!conn.isActive()) { - nodeConnections.remove(key); + nodeConnections.remove(addr); conn.closeAsync(); } else { return RedissonPromise.newSucceededFuture(conn); } } - if (addr != null) { - client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname); - } - final RPromise result = new RedissonPromise(); + RedisClient client = createClient(NodeType.MASTER, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname); + RPromise result = new RedissonPromise<>(); RFuture future = client.connectAsync(); future.onComplete((connection, e) -> { if (e != null) { @@ -280,7 +262,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } if (connection.isActive()) { - nodeConnections.put(key, connection); + nodeConnections.put(addr, connection); result.trySuccess(connection); } else { connection.closeAsync(); diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 3d27e91e2..de29c68ac 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -71,7 +71,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); - RFuture connectionFuture = connectToNode(cfg, addr, null, addr.getHost()); + RFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); connectionFuture.awaitUninterruptibly(); RedisConnection connection = connectionFuture.getNow(); if (connection == null) { @@ -131,7 +131,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size()); for (String address : cfg.getNodeAddresses()) { RedisURI addr = new RedisURI(address); - RFuture connectionFuture = connectToNode(cfg, addr, null, addr.getHost()); + RFuture connectionFuture = connectToNode(cfg, addr, addr.getHost()); connectionFuture.onComplete((connection, exc) -> { if (exc != null) { log.error(exc.getMessage(), exc); diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index a39cb3177..ead8f7740 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -296,7 +296,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } Set newUris = future.getNow().stream() - .map(addr -> toURI(addr.getAddress().getHostAddress(), "" + addr.getPort())) + .map(addr -> getIpAddr(addr)) .collect(Collectors.toSet()); for (RedisURI uri : newUris) { @@ -343,7 +343,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } RedisClient client = iterator.next(); - RFuture connectionFuture = connectToNode(null, null, client, null); + RedisURI addr = getIpAddr(client.getAddr()); + RFuture connectionFuture = connectToNode(cfg, addr, null); connectionFuture.onComplete((connection, e) -> { if (e != null) { lastException.set(e); @@ -484,7 +485,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { }).collect(Collectors.toSet()); InetSocketAddress addr = connection.getRedisClient().getAddr(); - RedisURI currentAddr = toURI(addr.getAddress().getHostAddress(), "" + addr.getPort()); + RedisURI currentAddr = getIpAddr(addr); newUris.add(currentAddr); updateSentinels(newUris); @@ -505,7 +506,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { for (RedisURI uri : currentUris) { RedisClient sentinel = SentinelConnectionManager.this.sentinels.remove(uri); if (sentinel != null) { - disconnectNode(sentinel); + disconnectNode(uri); sentinel.shutdownAsync(); log.warn("sentinel: {} is down", uri); } @@ -540,7 +541,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - RedisURI ipAddr = toURI(client.getAddr().getAddress().getHostAddress(), "" + client.getAddr().getPort()); + RedisURI ipAddr = getIpAddr(client.getAddr()); if (isHostname) { RedisClient sentinel = sentinels.get(ipAddr); if (sentinel != null) { @@ -574,6 +575,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return result; } + private RedisURI getIpAddr(InetSocketAddress addr) { + return toURI(addr.getAddress().getHostAddress(), "" + addr.getPort()); + } + private RFuture addSlave(RedisURI uri) { RPromise result = new RedissonPromise(); // to avoid addition twice