diff --git a/redisson/src/main/java/org/redisson/command/RedisExecutor.java b/redisson/src/main/java/org/redisson/command/RedisExecutor.java index d8b4a05c6..c9f3b53a8 100644 --- a/redisson/src/main/java/org/redisson/command/RedisExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisExecutor.java @@ -180,13 +180,11 @@ public class RedisExecutor { } if (connectionFuture.cancel(false)) { - if (exception == null) { - exception = new RedisTimeoutException("Unable to acquire connection! " + connectionFuture + - "Increase connection pool size. " - + "Node source: " + source - + ", command: " + LogHelper.toString(command, params) - + " after " + attempt + " retry attempts"); - } + exception = new RedisTimeoutException("Unable to acquire connection! " + connectionFuture + + "Increase connection pool size. " + + "Node source: " + source + + ", command: " + LogHelper.toString(command, params) + + " after " + attempt + " retry attempts"); } else { if (connectionFuture.isSuccess()) { if (writeFuture == null || !writeFuture.isDone()) { @@ -275,7 +273,7 @@ public class RedisExecutor { if (!future.isSuccess()) { exception = new WriteRedisConnectionException( - "Unable to write command into connection! Node source: " + source + ", connection: " + connection + + "Unable to write command into connection! Increase connection pool size. Node source: " + source + ", connection: " + connection + ", command: " + LogHelper.toString(command, params) + " after " + attempt + " retry attempts", future.cause()); if (attempt == attempts) { @@ -507,6 +505,7 @@ public class RedisExecutor { connectionType, command, LogHelper.toString(params), source, connection.getRedisClient().getAddr(), connection); } writeFuture = connection.send(new CommandData<>(attemptPromise, codec, command, params)); + release(connection); } } @@ -517,10 +516,8 @@ public class RedisExecutor { RedisConnection connection = connectionFuture.getNow(); connectionManager.getShutdownLatch().release(); - if (readOnlyMode) { - connectionManager.releaseRead(source, connection); - } else { - connectionManager.releaseWrite(source, connection); + if (source.getRedirect() == Redirect.ASK || getClass() != RedisExecutor.class) { + release(connection); } if (log.isDebugEnabled()) { @@ -534,6 +531,14 @@ public class RedisExecutor { } } + private void release(RedisConnection connection) { + if (readOnlyMode) { + connectionManager.releaseRead(source, connection); + } else { + connectionManager.releaseWrite(source, connection); + } + } + public RedisClient getRedisClient() { return connectionFuture.getNow().getRedisClient(); } diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 487569ad9..cc7af3ea7 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -168,64 +168,28 @@ public class ClientConnectionsEntry { } public void acquireConnection(Runnable runnable, RedisCommand command) { - if (isPolled(command)) { - freeConnectionsCounter.acquire(runnable); - return; - } - - runnable.run(); + freeConnectionsCounter.acquire(runnable); } public void releaseConnection() { freeConnectionsCounter.release(); } - AtomicBoolean lock = new AtomicBoolean(); - - public RedisConnection pollConnection(RedisCommand command) { - if (isPolled(command)) { - while (true) { - if (lock.compareAndSet(false, true)) { - if (!iter.hasNext()) { - iter = freeConnections.iterator(); - } - try { - if (iter.hasNext()) { - RedisConnection c = iter.next(); - iter.remove(); - if (c != null) { - c.incUsage(); - c.setPooled(true); - } - return c; - } - return null; - } finally { - lock.set(false); - } - } - } + public void addConnection(RedisConnection conn) { + conn.setLastUsageTime(System.nanoTime()); + if (conn instanceof RedisPubSubConnection) { + freeSubscribeConnections.add((RedisPubSubConnection) conn); + } else { + freeConnections.add(conn); } + } - while (true) { - if (lock.compareAndSet(false, true)) { - if (!iter.hasNext()) { - iter = freeConnections.iterator(); - } - try { - if (iter.hasNext()) { - RedisConnection c = iter.next(); - if (c != null) { - c.incUsage(); - } - return c; - } - return null; - } finally { - lock.set(false); - } - } + public RedisConnection pollConnection(RedisCommand command) { + RedisConnection c = freeConnections.poll(); + if (c != null) { + c.incUsage(); } + return c; } public void releaseConnection(RedisConnection connection) { @@ -238,15 +202,9 @@ public class ClientConnectionsEntry { return; } + connection.decUsage(); connection.setLastUsageTime(System.nanoTime()); - if (connection.getUsage() == 0) { - freeConnections.add(connection); - return; - } - if (connection.decUsage() == 0 && connection.isPooled()) { - freeConnections.add(connection); - connection.setPooled(false); - } + freeConnections.add(connection); } public RFuture connect() {