diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index e18b37d04..a21ee9689 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -76,7 +76,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private ScheduledFuture monitorFuture; private volatile URI lastClusterNode; - + public ClusterConnectionManager(ClusterServersConfig cfg, Config config) { super(config); @@ -92,13 +92,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { RedisConnection connection = connectionFuture.syncUninterruptibly().getNow(); List nodes = connection.sync(RedisCommands.CLUSTER_NODES); - if (log.isDebugEnabled()) { - StringBuilder nodesValue = new StringBuilder(); - for (ClusterNodeInfo clusterNodeInfo : nodes) { - nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n"); - } - log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); + StringBuilder nodesValue = new StringBuilder(); + for (ClusterNodeInfo clusterNodeInfo : nodes) { + nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n"); } + log.info("Redis cluster nodes configuration got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue); lastClusterNode = addr; @@ -185,15 +183,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } RedisConnection connection = future.getNow(); - if (connection.isActive()) { - nodeConnections.put(addr, connection); - result.trySuccess(connection); - } else { - connection.closeAsync(); - result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); - } - } - }); + if (connection.isActive()) { + nodeConnections.put(addr, connection); + result.trySuccess(connection); + } else { + connection.closeAsync(); + result.tryFailure(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!")); + } + } + }); return result; } @@ -426,7 +424,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { aliveSlaves.removeAll(newPart.getFailedSlaveAddresses()); for (URI uri : aliveSlaves) { currentPart.removeFailedSlaveAddress(uri); - if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + if (entry.slaveUp(uri, FreezeReason.MANAGER)) { log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges()); } } @@ -435,7 +433,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { failedSlaves.removeAll(currentPart.getFailedSlaveAddresses()); for (URI uri : failedSlaves) { currentPart.addFailedSlaveAddress(uri); - if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + if (entry.slaveDown(uri, FreezeReason.MANAGER)) { log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); } } @@ -448,7 +446,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (URI uri : removedSlaves) { currentPart.removeSlaveAddress(uri); - if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + if (entry.slaveDown(uri, FreezeReason.MANAGER)) { log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); } } @@ -466,7 +464,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } currentPart.addSlaveAddress(uri); - entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); + entry.slaveUp(uri, FreezeReason.MANAGER); log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); } }); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 145c56a1f..fa78f53a2 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -683,10 +683,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return entries.get(slot); } - protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) { - getEntry(slotRange.getStartSlot()).slaveDown(host, port, freezeReason); - } - protected void changeMaster(int slot, URI address) { getEntry(slot).changeMaster(address); } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index c0d5b7444..8b78fc115 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -129,8 +129,8 @@ public class MasterSlaveEntry { return slaveDown(e, freezeReason == FreezeReason.SYSTEM); } - public boolean slaveDown(String host, int port, FreezeReason freezeReason) { - ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason); + public boolean slaveDown(URI address, FreezeReason freezeReason) { + ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason); if (entry == null) { return false; } @@ -141,9 +141,9 @@ public class MasterSlaveEntry { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { // add master as slave if no more slaves available if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { - InetSocketAddress addr = masterEntry.getClient().getAddr(); - if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) { - log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); + URI addr = masterEntry.getClient().getConfig().getAddress(); + if (slaveUp(addr, FreezeReason.SYSTEM)) { + log.info("master {} used as slave", addr); } } @@ -309,7 +309,11 @@ public class MasterSlaveEntry { public boolean hasSlave(InetSocketAddress addr) { return slaveBalancer.contains(addr); } - + + public boolean hasSlave(String addr) { + return slaveBalancer.contains(addr); + } + public RFuture addSlave(URI address) { return addSlave(address, true, NodeType.SLAVE); } @@ -334,17 +338,18 @@ public class MasterSlaveEntry { return masterEntry.getClient(); } - public boolean slaveUp(String host, int port, FreezeReason freezeReason) { - if (!slaveBalancer.unfreeze(host, port, freezeReason)) { + public boolean slaveUp(URI address, FreezeReason freezeReason) { + if (!slaveBalancer.unfreeze(address, freezeReason)) { return false; } + InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort()); InetSocketAddress addr = masterEntry.getClient().getAddr(); // exclude master from slaves if (config.getReadMode() == ReadMode.SLAVE - && (!addr.getHostName().equals(host) || port != addr.getPort())) { - slaveDown(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); - log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort()); + && (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) { + slaveDown(address, FreezeReason.SYSTEM); + log.info("master {} excluded from slaves", addr); } return true; } @@ -369,7 +374,7 @@ public class MasterSlaveEntry { // more than one slave available, so master can be removed from slaves if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() > 1) { - slaveDown(address.getHost(), address.getPort(), FreezeReason.SYSTEM); + slaveDown(address, FreezeReason.SYSTEM); } connectionManager.shutdownAsync(oldMaster.getClient()); } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 4e9c3588c..d0cfbb35f 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -171,7 +171,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - log.warn("Can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort()); + log.warn("Can't connect to sentinel: {}", addr); return; } @@ -220,8 +220,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String ip = parts[2]; String port = parts[3]; - String addr = createAddress(ip, port); - URI uri = URIBuilder.create(addr); + URI uri = convert(ip, port); registerSentinel(cfg, uri, c); } } @@ -253,11 +252,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - if (entry.slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + URI uri = convert(ip, port); + if (entry.slaveUp(uri, FreezeReason.MANAGER)) { String slaveAddr = ip + ":" + port; log.info("slave: {} added", slaveAddr); } } + }); } else { slaveUp(ip, port); @@ -267,6 +268,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } } + protected URI convert(String ip, String port) { + String addr = createAddress(ip, port); + URI uri = URIBuilder.create(addr); + return uri; + } + private void onNodeDown(URI sentinelAddr, String msg) { String[] parts = msg.split(" "); @@ -309,7 +316,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.warn("slave: {}:{} has down", ip, port); } else { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - if (entry.slaveDown(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + URI uri = convert(ip, port); + if (entry.slaveDown(uri, FreezeReason.MANAGER)) { log.warn("slave: {}:{} has down", ip, port); } } @@ -367,7 +375,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + URI uri = convert(ip, port); + if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) { String slaveAddr = ip + ":" + port; log.info("slave: {} has up", slaveAddr); } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 40e46a5c8..6c047a076 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -16,6 +16,7 @@ package org.redisson.connection.balancer; import java.net.InetSocketAddress; +import java.net.URI; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -44,9 +45,10 @@ public class LoadBalancerManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final ConnectionManager connectionManager; - private final Map addr2Entry = PlatformDependent.newConcurrentHashMap(); private final PubSubConnectionPool pubSubConnectionPool; private final SlaveConnectionPool slaveConnectionPool; + + private final Map ip2Entry = PlatformDependent.newConcurrentHashMap(); public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { this.connectionManager = connectionManager; @@ -65,7 +67,8 @@ public class LoadBalancerManager { return; } if (counter.decrementAndGet() == 0) { - addr2Entry.put(entry.getClient().getAddr(), entry); + String addr = convert(entry.getClient().getConfig().getAddress()); + ip2Entry.put(addr, entry); result.trySuccess(null); } } @@ -80,7 +83,7 @@ public class LoadBalancerManager { public int getAvailableClients() { int count = 0; - for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) { + for (ClientConnectionsEntry connectionEntry : ip2Entry.values()) { if (!connectionEntry.isFreezed()) { count++; } @@ -88,11 +91,10 @@ public class LoadBalancerManager { return count; } - public boolean unfreeze(String host, int port, FreezeReason freezeReason) { - InetSocketAddress addr = new InetSocketAddress(host, port); - ClientConnectionsEntry entry = addr2Entry.get(addr); + public boolean unfreeze(URI address, FreezeReason freezeReason) { + ClientConnectionsEntry entry = getEntry(address); if (entry == null) { - throw new IllegalStateException("Can't find " + addr + " in slaves!"); + throw new IllegalStateException("Can't find " + address + " in slaves!"); } synchronized (entry) { @@ -111,12 +113,21 @@ public class LoadBalancerManager { return false; } - public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) { - InetSocketAddress addr = new InetSocketAddress(host, port); - ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); + private String convert(URI address) { + InetSocketAddress addr = new InetSocketAddress(address.getHost(), address.getPort()); + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } + + public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) { + ClientConnectionsEntry connectionEntry = getEntry(address); return freeze(connectionEntry, freezeReason); } + protected ClientConnectionsEntry getEntry(URI address) { + String addr = convert(address); + return ip2Entry.get(addr); + } + public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { if (connectionEntry == null) { return null; @@ -143,11 +154,15 @@ public class LoadBalancerManager { } public boolean contains(InetSocketAddress addr) { - return addr2Entry.containsKey(addr); + return ip2Entry.containsKey(addr.getAddress().getHostAddress() + ":" + addr.getPort()); + } + + public boolean contains(String addr) { + return ip2Entry.containsKey(addr); } public RFuture getConnection(RedisCommand command, InetSocketAddress addr) { - ClientConnectionsEntry entry = addr2Entry.get(addr); + ClientConnectionsEntry entry = ip2Entry.get(addr.getAddress().getHostAddress()); if (entry != null) { return slaveConnectionPool.get(command, entry); } @@ -160,23 +175,23 @@ public class LoadBalancerManager { } public void returnPubSubConnection(RedisPubSubConnection connection) { - ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); + ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress()); pubSubConnectionPool.returnConnection(entry, connection); } public void returnConnection(RedisConnection connection) { - ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); + ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress()); slaveConnectionPool.returnConnection(entry, connection); } public void shutdown() { - for (ClientConnectionsEntry entry : addr2Entry.values()) { + for (ClientConnectionsEntry entry : ip2Entry.values()) { entry.getClient().shutdown(); } } public void shutdownAsync() { - for (ClientConnectionsEntry entry : addr2Entry.values()) { + for (ClientConnectionsEntry entry : ip2Entry.values()) { connectionManager.shutdownAsync(entry.getClient()); } } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 69f88a1a4..132a28f21 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -296,7 +296,7 @@ abstract class ConnectionPool { private void promiseFailure(ClientConnectionsEntry entry, RPromise promise, Throwable cause) { if (entry.incFailedAttempts() == config.getFailedAttempts()) { - checkForReconnect(entry); + checkForReconnect(entry, cause); } releaseConnection(entry); @@ -308,7 +308,7 @@ abstract class ConnectionPool { int attempts = entry.incFailedAttempts(); if (attempts == config.getFailedAttempts()) { conn.closeAsync(); - checkForReconnect(entry); + checkForReconnect(entry, null); } else if (attempts < config.getFailedAttempts()) { releaseConnection(entry, conn); } else { @@ -321,15 +321,14 @@ abstract class ConnectionPool { promise.tryFailure(cause); } - private void checkForReconnect(ClientConnectionsEntry entry) { + private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) { if (entry.getNodeType() == NodeType.SLAVE) { - masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(), - entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); - log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts()); + masterSlaveEntry.slaveDown(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); + log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause); scheduleCheck(entry); } else { if (entry.freezeMaster(FreezeReason.RECONNECT)) { - log.warn("host {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts()); + log.error("host " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause); scheduleCheck(entry); } } @@ -385,7 +384,7 @@ abstract class ConnectionPool { public void operationComplete(Future future) throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { - masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); + masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); log.info("slave {} successfully reconnected", entry.getClient().getAddr()); } else { synchronized (entry) {