diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index f17f971b0..bfd47c098 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -67,12 +67,12 @@ public class ClientConnectionsEntry { this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize); if (subscribePoolMaxSize > 0) { - connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter, c -> { + connectionManager.getConnectionWatcher().add(this, subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter, c -> { freeSubscribeConnections.remove(c); return allSubscribeConnections.remove(c); }); } - connectionManager.getConnectionWatcher().add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter, c -> { + connectionManager.getConnectionWatcher().add(this, poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter, c -> { freeConnections.remove(c); return allConnections.remove(c); }); @@ -115,6 +115,11 @@ public class ClientConnectionsEntry { firstFailTime.compareAndSet(0, System.currentTimeMillis()); } + public RFuture shutdownAsync() { + connectionManager.getConnectionWatcher().remove(this); + return client.shutdownAsync(); + } + public RedisClient getClient() { return client; } diff --git a/redisson/src/main/java/org/redisson/connection/IdleConnectionWatcher.java b/redisson/src/main/java/org/redisson/connection/IdleConnectionWatcher.java index e7bce3a24..60028274b 100644 --- a/redisson/src/main/java/org/redisson/connection/IdleConnectionWatcher.java +++ b/redisson/src/main/java/org/redisson/connection/IdleConnectionWatcher.java @@ -26,10 +26,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.Map; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; public class IdleConnectionWatcher { @@ -55,37 +58,32 @@ public class IdleConnectionWatcher { }; - private final Queue entries = new ConcurrentLinkedQueue<>(); + private final Map> entries = new ConcurrentHashMap<>(); private final ScheduledFuture monitorFuture; public IdleConnectionWatcher(ConnectionManager manager, MasterSlaveServersConfig config) { - monitorFuture = manager.getGroup().scheduleWithFixedDelay(new Runnable() { - - @Override - public void run() { - long currTime = System.nanoTime(); - for (Entry entry : entries) { - if (!validateAmount(entry)) { - continue; - } + monitorFuture = manager.getGroup().scheduleWithFixedDelay(() -> { + long currTime = System.nanoTime(); + for (Entry entry : entries.values().stream().flatMap(m -> m.stream()).collect(Collectors.toList())) { + if (!validateAmount(entry)) { + continue; + } - for (RedisConnection c : entry.connections) { - long timeInPool = TimeUnit.NANOSECONDS.toMillis(currTime - c.getLastUsageTime()); - if (timeInPool > config.getIdleConnectionTimeout() - && validateAmount(entry) - && entry.deleteHandler.apply(c)) { - ChannelFuture future = c.closeAsync(); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - log.debug("Connection {} has been closed due to idle timeout. Not used for {} ms", c.getChannel(), timeInPool); - } - }); - } + for (RedisConnection c : entry.connections) { + long timeInPool = TimeUnit.NANOSECONDS.toMillis(currTime - c.getLastUsageTime()); + if (timeInPool > config.getIdleConnectionTimeout() + && validateAmount(entry) + && entry.deleteHandler.apply(c)) { + ChannelFuture future = c.closeAsync(); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + log.debug("Connection {} has been closed due to idle timeout. Not used for {} ms", c.getChannel(), timeInPool); + } + }); } } } - }, config.getIdleConnectionTimeout(), config.getIdleConnectionTimeout(), TimeUnit.MILLISECONDS); } @@ -93,9 +91,14 @@ public class IdleConnectionWatcher { return entry.maximumAmount - entry.freeConnectionsCounter.getCounter() + entry.connections.size() > entry.minimumAmount; } - public void add(int minimumAmount, int maximumAmount, Collection connections, + public void remove(ClientConnectionsEntry entry) { + entries.remove(entry); + } + + public void add(ClientConnectionsEntry entry, int minimumAmount, int maximumAmount, Collection connections, AsyncSemaphore freeConnectionsCounter, Function deleteHandler) { - entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter, deleteHandler)); + Queue list = entries.computeIfAbsent(entry, k -> new ConcurrentLinkedQueue<>()); + list.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter, deleteHandler)); } public void stop() { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 52822c498..dab70cf6a 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -123,11 +123,11 @@ public class MasterSlaveEntry { } masterEntry = new ClientConnectionsEntry( - client, - config.getMasterConnectionMinimumIdleSize(), + client, + config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(), config.getSubscriptionConnectionMinimumIdleSize(), - config.getSubscriptionConnectionPoolSize(), + config.getSubscriptionConnectionPoolSize(), connectionManager, NodeType.MASTER); @@ -440,7 +440,7 @@ public class MasterSlaveEntry { if (oldMaster != masterEntry) { writeConnectionPool.remove(masterEntry); pubSubConnectionPool.remove(masterEntry); - masterEntry.getClient().shutdownAsync(); + masterEntry.shutdownAsync(); masterEntry = oldMaster; } log.error("Unable to change master from: " + oldMaster.getClient().getAddr() + " to: " + address, e); @@ -465,7 +465,7 @@ public class MasterSlaveEntry { && slaveBalancer.getAvailableClients() > 1) { slaveDown(newMasterClient.getAddr(), FreezeReason.SYSTEM); } - oldMaster.getClient().shutdownAsync(); + oldMaster.shutdownAsync(); log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr()); }); } @@ -478,7 +478,7 @@ public class MasterSlaveEntry { RPromise result = new RedissonPromise(); CountableListener listener = new CountableListener(result, null, 2); if (masterEntry != null) { - masterEntry.getClient().shutdownAsync().onComplete(listener); + masterEntry.shutdownAsync().onComplete(listener); } slaveBalancer.shutdownAsync().onComplete(listener); return result; diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index dd25cce3a..b50ea851f 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -294,7 +294,7 @@ public class LoadBalancerManager { RPromise result = new RedissonPromise(); CountableListener listener = new CountableListener(result, null, client2Entry.values().size()); for (ClientConnectionsEntry entry : client2Entry.values()) { - entry.getClient().shutdownAsync().onComplete(listener); + entry.shutdownAsync().onComplete(listener); } return result; }