diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index fc98647fd..72158ff59 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -222,7 +222,7 @@ public class RedisConnection implements RedisCommands { @Override public String toString() { - return getClass().getSimpleName() + " [redisClient=" + redisClient + ", channel=" + channel + "]"; + return getClass().getSimpleName() + "@" + System.identityHashCode(this) + " [redisClient=" + redisClient + ", channel=" + channel + "]"; } public Future getAcquireFuture() { diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index de73bd42b..ce8cddf4b 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -113,7 +113,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { throw new RedisConnectionException("Can't connect to servers!", lastException); } - scheduleClusterChangeCheck(cfg); + scheduleClusterChangeCheck(cfg, null); + } + + private void close(RedisConnection conn) { + if (nodeConnections.values().remove(conn)) { + conn.closeAsync(); + } } private Future connect(ClusterServersConfig cfg, final URI addr) { @@ -188,7 +194,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } final RedisConnection connection = future.getNow(); - Future> clusterFuture = connection.async(RedisCommands.CLUSTER_INFO); + Future> clusterFuture = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_INFO); clusterFuture.addListener(new FutureListener>() { @Override @@ -256,26 +262,31 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return result; } - private void scheduleClusterChangeCheck(final ClusterServersConfig cfg) { + private void scheduleClusterChangeCheck(final ClusterServersConfig cfg, final Iterator iterator) { monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() { @Override public void run() { - List nodes = new ArrayList(); - List slaves = new ArrayList(); AtomicReference lastException = new AtomicReference(); - for (ClusterPartition partition : lastPartitions.values()) { - if (!partition.isMasterFail()) { - nodes.add(partition.getMasterAddress()); + Iterator nodesIterator = iterator; + if (nodesIterator == null) { + List nodes = new ArrayList(); + List slaves = new ArrayList(); + for (ClusterPartition partition : lastPartitions.values()) { + if (!partition.isMasterFail()) { + nodes.add(partition.getMasterAddress()); + } + + Set partitionSlaves = new HashSet(partition.getSlaveAddresses()); + partitionSlaves.removeAll(partition.getFailedSlaveAddresses()); + slaves.addAll(partitionSlaves); } - - Set partitionSlaves = new HashSet(partition.getSlaveAddresses()); - partitionSlaves.removeAll(partition.getFailedSlaveAddresses()); - slaves.addAll(partitionSlaves); + // master nodes first + nodes.addAll(slaves); + + nodesIterator = nodes.iterator(); } - // master nodes first - nodes.addAll(slaves); - checkClusterState(cfg, nodes.iterator(), lastException); + checkClusterState(cfg, nodesIterator, lastException); } }, cfg.getScanInterval(), TimeUnit.MILLISECONDS); @@ -284,7 +295,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private void checkClusterState(final ClusterServersConfig cfg, final Iterator iterator, final AtomicReference lastException) { if (!iterator.hasNext()) { log.error("Can't update cluster state", lastException.get()); - scheduleClusterChangeCheck(cfg); + scheduleClusterChangeCheck(cfg, null); return; } URI uri = iterator.next(); @@ -299,19 +310,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } RedisConnection connection = future.getNow(); - updateClusterState(cfg, connection); + updateClusterState(cfg, connection, iterator); } }); } - private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection) { - Future future = connection.async(RedisCommands.CLUSTER_NODES); + private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, Iterator iterator) { + Future future = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_NODES); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause()); - scheduleClusterChangeCheck(cfg); + close(connection); + scheduleClusterChangeCheck(cfg, iterator); return; } @@ -322,7 +334,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { checkMasterNodesChange(newPartitions); checkSlaveNodesChange(newPartitions); checkSlotsChange(cfg, newPartitions); - scheduleClusterChangeCheck(cfg); + scheduleClusterChangeCheck(cfg, null); } }); } @@ -432,7 +444,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { URI oldUri = currentPart.getMasterAddress(); changeMaster(currentSlotRange, newUri.getHost(), newUri.getPort()); - slaveDown(currentSlotRange, oldUri.getHost(), oldUri.getPort(), FreezeReason.MANAGER); currentPart.setMasterAddress(newMasterPart.getMasterAddress()); } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index ce8bea5c6..ff26955be 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -96,12 +96,25 @@ public class MasterSlaveEntry { return writeConnectionHolder.add(masterEntry); } + private boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReason) { + ClientConnectionsEntry e = slaveBalancer.freeze(entry, freezeReason); + if (e == null) { + return false; + } + + return slaveDown(e); + } + public boolean slaveDown(String host, int port, FreezeReason freezeReason) { ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason); if (entry == null) { return false; } + return slaveDown(entry); + } + + private boolean slaveDown(ClientConnectionsEntry entry) { // add master as slave if no more slaves available if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { InetSocketAddress addr = masterEntry.getClient().getAddr(); @@ -305,7 +318,7 @@ public class MasterSlaveEntry { ClientConnectionsEntry oldMaster = masterEntry; setupMasterEntry(host, port); writeConnectionHolder.remove(oldMaster); - oldMaster.freezeMaster(FreezeReason.MANAGER); + slaveDown(oldMaster, FreezeReason.MANAGER); // more than one slave available, so master can be removed from slaves if (config.getReadMode() == ReadMode.SLAVE diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index edef7d2bb..02d408f29 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -36,6 +36,8 @@ public interface LoadBalancerManager { boolean unfreeze(String host, int port, FreezeReason freezeReason); + ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason); + ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason); Future add(ClientConnectionsEntry entry); diff --git a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java index 38135c62d..38bf4b781 100644 --- a/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java +++ b/src/main/java/org/redisson/connection/balancer/LoadBalancerManagerImpl.java @@ -94,25 +94,28 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager { } return false; } - + public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) { InetSocketAddress addr = new InetSocketAddress(host, port); ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); + return freeze(connectionEntry, freezeReason); + } + + public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { if (connectionEntry == null) { return null; } synchronized (connectionEntry) { - if (connectionEntry.isFreezed()) { - return null; - } - - connectionEntry.setFreezed(true); - // only RECONNECT freeze reason could be replaced if (connectionEntry.getFreezeReason() == null || connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) { + connectionEntry.setFreezed(true); connectionEntry.setFreezeReason(freezeReason); + return connectionEntry; + } + if (connectionEntry.isFreezed()) { + return null; } } diff --git a/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 87dc8a594..3c63d632b 100644 --- a/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -153,10 +153,15 @@ abstract class ConnectionPool { } } - StringBuilder errorMsg = new StringBuilder("Connection pool exhausted! All connections are busy. Increase connection pool size."); -// if (!freezed.isEmpty()) { -// errorMsg.append(" Disconnected hosts: " + freezed); -// } + StringBuilder errorMsg; + if (connectionManager.isClusterMode()) { + errorMsg = new StringBuilder("Connection pool exhausted! for slots: " + masterSlaveEntry.getSlotRanges()); + } else { + errorMsg = new StringBuilder("Connection pool exhausted! "); + } + if (!freezed.isEmpty()) { + errorMsg.append(" Disconnected hosts: " + freezed); + } if (!zeroConnectionsAmount.isEmpty()) { errorMsg.append(" Hosts with fully busy connections: " + zeroConnectionsAmount); }