From 0028f43e6b7767cf9424ac7bc4bff867a12623ca Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 27 Oct 2016 13:42:31 +0300 Subject: [PATCH] Remove `connection pool exhausted` exception #680 --- .../connection/ClientConnectionsEntry.java | 40 ++--- .../connection/IdleConnectionWatcher.java | 10 +- .../connection/pool/ConnectionPool.java | 158 ++++++++++-------- .../connection/pool/PubSubConnectionPool.java | 12 +- .../org/redisson/pubsub/AsyncSemaphore.java | 4 + 5 files changed, 124 insertions(+), 100 deletions(-) diff --git a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 57cb6674c..364e60fde 100644 --- a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -33,16 +33,18 @@ import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; +import org.redisson.pubsub.AsyncSemaphore; + public class ClientConnectionsEntry { final Logger log = LoggerFactory.getLogger(getClass()); private final Queue allSubscribeConnections = new ConcurrentLinkedQueue(); private final Queue freeSubscribeConnections = new ConcurrentLinkedQueue(); - private final AtomicInteger freeSubscribeConnectionsCounter = new AtomicInteger(); + private final AsyncSemaphore freeSubscribeConnectionsCounter; private final Queue freeConnections = new ConcurrentLinkedQueue(); - private final AtomicInteger freeConnectionsCounter = new AtomicInteger(); + private final AsyncSemaphore freeConnectionsCounter; public enum FreezeReason {MANAGER, RECONNECT, SYSTEM} @@ -58,10 +60,10 @@ public class ClientConnectionsEntry { public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize, ConnectionManager connectionManager, NodeType serverMode) { this.client = client; - this.freeConnectionsCounter.set(poolMaxSize); + this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize); this.connectionManager = connectionManager; this.nodeType = serverMode; - this.freeSubscribeConnectionsCounter.set(subscribePoolMaxSize); + this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize); if (subscribePoolMaxSize > 0) { connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter); @@ -106,27 +108,15 @@ public class ClientConnectionsEntry { } public int getFreeAmount() { - return freeConnectionsCounter.get(); - } - - private boolean tryAcquire(AtomicInteger counter) { - while (true) { - int value = counter.get(); - if (value == 0) { - return false; - } - if (counter.compareAndSet(value, value - 1)) { - return true; - } - } + return freeConnectionsCounter.getCounter(); } - public boolean tryAcquireConnection() { - return tryAcquire(freeConnectionsCounter); + public void acquireConnection(Runnable runnable) { + freeConnectionsCounter.acquire(runnable); } public void releaseConnection() { - freeConnectionsCounter.incrementAndGet(); + freeConnectionsCounter.release(); } public RedisConnection pollConnection() { @@ -139,7 +129,7 @@ public class ClientConnectionsEntry { } public Future connect() { - final Promise connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise(); + final Promise connectionFuture = connectionManager.newPromise(); Future future = client.connectAsync(); future.addListener(new FutureListener() { @Override @@ -192,7 +182,7 @@ public class ClientConnectionsEntry { } public Future connectPubSub() { - final Promise connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise(); + final Promise connectionFuture = connectionManager.newPromise(); Future future = client.connectPubSubAsync(); future.addListener(new FutureListener() { @Override @@ -227,12 +217,12 @@ public class ClientConnectionsEntry { freeSubscribeConnections.add(connection); } - public boolean tryAcquireSubscribeConnection() { - return tryAcquire(freeSubscribeConnectionsCounter); + public void acquireSubscribeConnection(Runnable runnable) { + freeSubscribeConnectionsCounter.acquire(runnable); } public void releaseSubscribeConnection() { - freeSubscribeConnectionsCounter.incrementAndGet(); + freeSubscribeConnectionsCounter.release(); } public boolean freezeMaster(FreezeReason reason) { diff --git a/src/main/java/org/redisson/connection/IdleConnectionWatcher.java b/src/main/java/org/redisson/connection/IdleConnectionWatcher.java index 6fc92b2cf..598b67634 100644 --- a/src/main/java/org/redisson/connection/IdleConnectionWatcher.java +++ b/src/main/java/org/redisson/connection/IdleConnectionWatcher.java @@ -30,6 +30,8 @@ import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import org.redisson.pubsub.AsyncSemaphore; + public class IdleConnectionWatcher { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -38,10 +40,10 @@ public class IdleConnectionWatcher { private final int minimumAmount; private final int maximumAmount; - private final AtomicInteger freeConnectionsCounter; + private final AsyncSemaphore freeConnectionsCounter; private final Collection connections; - public Entry(int minimumAmount, int maximumAmount, Collection connections, AtomicInteger freeConnectionsCounter) { + public Entry(int minimumAmount, int maximumAmount, Collection connections, AsyncSemaphore freeConnectionsCounter) { super(); this.minimumAmount = minimumAmount; this.maximumAmount = maximumAmount; @@ -84,10 +86,10 @@ public class IdleConnectionWatcher { } private boolean validateAmount(Entry entry) { - return entry.maximumAmount - entry.freeConnectionsCounter.get() + entry.connections.size() > entry.minimumAmount; + return entry.maximumAmount - entry.freeConnectionsCounter.getCounter() + entry.connections.size() > entry.minimumAmount; } - public void add(int minimumAmount, int maximumAmount, Collection connections, AtomicInteger freeConnectionsCounter) { + public void add(int minimumAmount, int maximumAmount, Collection connections, AsyncSemaphore freeConnectionsCounter) { entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter)); } diff --git a/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 2d75878e7..728f749d0 100644 --- a/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -40,6 +40,15 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; +import org.redisson.pubsub.AsyncSemaphore; + +/** + * Base connection pool class + * + * @author Nikita Koksharov + * + * @param - connection type + */ abstract class ConnectionPool { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -74,7 +83,7 @@ abstract class ConnectionPool { final int minimumIdleSize = getMinimumIdleSize(entry); if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) { - initPromise.setSuccess(null); + initPromise.trySuccess(null); return; } @@ -96,37 +105,52 @@ abstract class ConnectionPool { initPromise.tryFailure(cause); return; } - - Future promise = createConnection(entry); - promise.addListener(new FutureListener() { + + acquireConnection(entry, new Runnable() { + @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - T conn = future.getNow(); - releaseConnection(entry, conn); - } + public void run() { + Promise promise = connectionManager.newPromise(); + createConnection(entry, promise); + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + T conn = future.getNow(); - releaseConnection(entry); + releaseConnection(entry, conn); + } - if (!future.isSuccess()) { - Throwable cause = new RedisConnectionException( - "Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: " - + entry.getClient().getAddr(), future.cause()); - initPromise.tryFailure(cause); - return; - } + releaseConnection(entry); + + if (!future.isSuccess()) { + Throwable cause = new RedisConnectionException( + "Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: " + + entry.getClient().getAddr(), future.cause()); + initPromise.tryFailure(cause); + return; + } - int value = initializedConnections.decrementAndGet(); - if (value == 0) { - log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); - initPromise.setSuccess(null); - } else if (value > 0 && !initPromise.isDone()) { - if (requests.incrementAndGet() <= minimumIdleSize) { - createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections); + int value = initializedConnections.decrementAndGet(); + if (value == 0) { + log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); + if (!initPromise.trySuccess(null)) { + throw new IllegalStateException(); + } + } else if (value > 0 && !initPromise.isDone()) { + if (requests.incrementAndGet() <= minimumIdleSize) { + createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections); + } + } } - } + }); } }); + + } + + protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) { + entry.acquireConnection(runnable); } protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry); @@ -137,28 +161,36 @@ abstract class ConnectionPool { public Future get() { for (int j = entries.size() - 1; j >= 0; j--) { - ClientConnectionsEntry entry = getEntry(); - if (!entry.isFreezed() && tryAcquireConnection(entry)) { - return connectTo(entry); + final ClientConnectionsEntry entry = getEntry(); + if (!entry.isFreezed() + && tryAcquireConnection(entry)) { + final Promise result = connectionManager.newPromise(); + acquireConnection(entry, new Runnable() { + @Override + public void run() { + connectTo(entry, result); + } + }); + return result; } } - - List zeroConnectionsAmount = new LinkedList(); + + List failedAttempts = new LinkedList(); List freezed = new LinkedList(); for (ClientConnectionsEntry entry : entries) { if (entry.isFreezed()) { freezed.add(entry.getClient().getAddr()); } else { - zeroConnectionsAmount.add(entry.getClient().getAddr()); + failedAttempts.add(entry.getClient().getAddr()); } } - StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " exhausted! "); + StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. "); if (!freezed.isEmpty()) { errorMsg.append(" Disconnected hosts: " + freezed); } - if (!zeroConnectionsAmount.isEmpty()) { - errorMsg.append(" Hosts with fully busy connections: " + zeroConnectionsAmount); + if (!failedAttempts.isEmpty()) { + errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts); } RedisConnectionException exception = new RedisConnectionException(errorMsg.toString()); @@ -168,7 +200,9 @@ abstract class ConnectionPool { public Future get(ClientConnectionsEntry entry) { if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed()) && tryAcquireConnection(entry)) { - return connectTo(entry); + Promise result = connectionManager.newPromise(); + connectTo(entry, result); + return result; } RedisConnectionException exception = new RedisConnectionException( @@ -177,7 +211,7 @@ abstract class ConnectionPool { } protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { - return entry.getFailedAttempts() < config.getFailedAttempts() && entry.tryAcquireConnection(); + return entry.getFailedAttempts() < config.getFailedAttempts(); } protected T poll(ClientConnectionsEntry entry) { @@ -188,28 +222,31 @@ abstract class ConnectionPool { return (Future) entry.connect(); } - private Future connectTo(ClientConnectionsEntry entry) { + private void connectTo(ClientConnectionsEntry entry, Promise promise) { + if (promise.isDone()) { + releaseConnection(entry); + return; + } T conn = poll(entry); if (conn != null) { if (!conn.isActive()) { - return promiseFailure(entry, conn); + promiseFailure(entry, promise, conn); + return; } - return promiseSuccessful(entry, conn); + connectedSuccessful(entry, promise, conn); + return; } - return createConnection(entry); + createConnection(entry, promise); } - private Future createConnection(final ClientConnectionsEntry entry) { - final Promise promise = connectionManager.newPromise(); + private void createConnection(final ClientConnectionsEntry entry, final Promise promise) { Future connFuture = connect(entry); connFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - releaseConnection(entry); - promiseFailure(entry, promise, future.cause()); return; } @@ -220,13 +257,12 @@ abstract class ConnectionPool { return; } - promiseSuccessful(entry, promise, conn); + connectedSuccessful(entry, promise, conn); } }); - return promise; } - private void promiseSuccessful(ClientConnectionsEntry entry, Promise promise, T conn) { + private void connectedSuccessful(ClientConnectionsEntry entry, Promise promise, T conn) { entry.resetFailedAttempts(); if (!promise.trySuccess(conn)) { releaseConnection(entry, conn); @@ -234,45 +270,31 @@ abstract class ConnectionPool { } } - private Future promiseSuccessful(ClientConnectionsEntry entry, T conn) { - entry.resetFailedAttempts(); - return (Future) conn.getAcquireFuture(); - } - private void promiseFailure(ClientConnectionsEntry entry, Promise promise, Throwable cause) { if (entry.incFailedAttempts() == config.getFailedAttempts()) { checkForReconnect(entry); } - promise.tryFailure(cause); - } - - private void promiseFailure(ClientConnectionsEntry entry, Promise promise, T conn) { - int attempts = entry.incFailedAttempts(); - if (attempts == config.getFailedAttempts()) { - checkForReconnect(entry); - } else if (attempts < config.getFailedAttempts()) { - releaseConnection(entry, conn); - } - releaseConnection(entry); - RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); promise.tryFailure(cause); } - private Future promiseFailure(ClientConnectionsEntry entry, T conn) { + private void promiseFailure(ClientConnectionsEntry entry, Promise promise, T conn) { int attempts = entry.incFailedAttempts(); if (attempts == config.getFailedAttempts()) { + conn.closeAsync(); checkForReconnect(entry); } else if (attempts < config.getFailedAttempts()) { releaseConnection(entry, conn); + } else { + conn.closeAsync(); } releaseConnection(entry); RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); - return connectionManager.newFailedFuture(cause); + promise.tryFailure(cause); } private void checkForReconnect(ClientConnectionsEntry entry) { @@ -364,8 +386,8 @@ abstract class ConnectionPool { if (entry.getConfig().getPassword() != null) { Future temp = c.async(RedisCommands.AUTH, config.getPassword()); - FutureListener listener = new FutureListener () { - @Override public void operationComplete (Future < Void > future)throws Exception { + FutureListener listener = new FutureListener() { + @Override public void operationComplete(Future future)throws Exception { ping(c, pingListener); } }; diff --git a/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index d1a64fad1..1a31ccac8 100644 --- a/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -23,6 +23,12 @@ import org.redisson.connection.ClientConnectionsEntry; import io.netty.util.concurrent.Future; +/** + * Connection pool for Publish / Subscribe + * + * @author Nikita Koksharov + * + */ public class PubSubConnectionPool extends ConnectionPool { public PubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { @@ -45,10 +51,10 @@ public class PubSubConnectionPool extends ConnectionPool } @Override - protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { - return entry.tryAcquireSubscribeConnection(); + protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) { + entry.acquireSubscribeConnection(runnable); } - + @Override protected void releaseConnection(ClientConnectionsEntry entry) { entry.releaseSubscribeConnection(); diff --git a/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index 49f3d29b2..74ebcd055 100644 --- a/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -74,6 +74,10 @@ public class AsyncSemaphore { } } + public int getCounter() { + return counter; + } + public void release() { Runnable runnable = null;