From f9e97416ce10ffcafa59c46ae7a887b2805e27d6 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 11 Apr 2022 10:45:31 +0300 Subject: [PATCH] Fixed - AsyncSemaphore doesn't skip canceled tasks in the same thread. #4215 --- .../connection/ClientConnectionsEntry.java | 8 +- .../connection/pool/ConnectionPool.java | 137 +++++++++--------- .../connection/pool/PubSubConnectionPool.java | 4 +- .../org/redisson/pubsub/AsyncSemaphore.java | 48 +++--- 4 files changed, 103 insertions(+), 94 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index b63f3b97c..257ec4d0e 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -148,8 +148,8 @@ public class ClientConnectionsEntry { freeSubscribeConnectionsCounter.removeListeners(); } - public void acquireConnection(Runnable runnable, RedisCommand command) { - freeConnectionsCounter.acquire(runnable); + public CompletableFuture acquireConnection(RedisCommand command) { + return freeConnectionsCounter.acquire(); } public void releaseConnection() { @@ -263,8 +263,8 @@ public class ClientConnectionsEntry { freeSubscribeConnections.add(connection); } - public void acquireSubscribeConnection(Runnable runnable) { - freeSubscribeConnectionsCounter.acquire(runnable); + public CompletableFuture acquireSubscribeConnection() { + return freeSubscribeConnectionsCounter.acquire(); } public void releaseSubscribeConnection() { diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 0c8324026..dfe68b6d8 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -39,7 +39,6 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; /** * Base connection pool class @@ -105,73 +104,70 @@ abstract class ConnectionPool { initPromise.completeExceptionally(cause); return; } - - acquireConnection(entry, new Runnable() { - - @Override - public void run() { - CompletableFuture promise = new CompletableFuture(); - createConnection(entry, promise); - promise.whenComplete((conn, e) -> { - if (e == null) { - if (!initPromise.isDone()) { - entry.addConnection(conn); - } else { - conn.closeAsync(); - } - } - releaseConnection(entry); + CompletableFuture f = acquireConnection(entry, null); + f.thenAccept(r -> { + CompletableFuture promise = new CompletableFuture(); + createConnection(entry, promise); + promise.whenComplete((conn, e) -> { + if (e == null) { + if (!initPromise.isDone()) { + entry.addConnection(conn); + } else { + conn.closeAsync(); + } + } - if (e != null) { - if (initPromise.isDone()) { - return; - } - - for (RedisConnection connection : entry.getAllConnections()) { - if (!connection.isClosed()) { - connection.closeAsync(); - } - } - entry.getAllConnections().clear(); - - for (RedisConnection connection : entry.getAllSubscribeConnections()) { - if (!connection.isClosed()) { - connection.closeAsync(); - } - } - entry.getAllSubscribeConnections().clear(); - - int totalInitializedConnections = minimumIdleSize - initializedConnections.get(); - String errorMsg; - if (totalInitializedConnections == 0) { - errorMsg = "Unable to connect to Redis server: " + entry.getClient().getAddr(); - } else { - errorMsg = "Unable to init enough connections amount! Only " + totalInitializedConnections - + " of " + minimumIdleSize + " were initialized. Redis server: " + entry.getClient().getAddr(); - } - Throwable cause = new RedisConnectionException(errorMsg, e); - initPromise.completeExceptionally(cause); - return; + releaseConnection(entry); + + if (e != null) { + if (initPromise.isDone()) { + return; + } + + for (RedisConnection connection : entry.getAllConnections()) { + if (!connection.isClosed()) { + connection.closeAsync(); } + } + entry.getAllConnections().clear(); - int value = initializedConnections.decrementAndGet(); - if (value == 0) { - if (initPromise.complete(null)) { - log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); - } - } else if (value > 0 && !initPromise.isDone()) { - if (requests.incrementAndGet() <= minimumIdleSize) { - createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections); - } + for (RedisConnection connection : entry.getAllSubscribeConnections()) { + if (!connection.isClosed()) { + connection.closeAsync(); } - }); - } - }, null); + } + entry.getAllSubscribeConnections().clear(); + + int totalInitializedConnections = minimumIdleSize - initializedConnections.get(); + String errorMsg; + if (totalInitializedConnections == 0) { + errorMsg = "Unable to connect to Redis server: " + entry.getClient().getAddr(); + } else { + errorMsg = "Unable to init enough connections amount! Only " + totalInitializedConnections + + " of " + minimumIdleSize + " were initialized. Redis server: " + entry.getClient().getAddr(); + } + Throwable cause = new RedisConnectionException(errorMsg, e); + initPromise.completeExceptionally(cause); + return; + } + + int value = initializedConnections.decrementAndGet(); + if (value == 0) { + if (initPromise.complete(null)) { + log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); + } + } else if (value > 0 && !initPromise.isDone()) { + if (requests.incrementAndGet() <= minimumIdleSize) { + createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections); + } + } + }); + }); } - protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable, RedisCommand command) { - entry.acquireConnection(runnable, command); + protected CompletableFuture acquireConnection(ClientConnectionsEntry entry, RedisCommand command) { + return entry.acquireConnection(command); } protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry); @@ -215,20 +211,21 @@ abstract class ConnectionPool { } public CompletableFuture get(RedisCommand command, ClientConnectionsEntry entry) { - return acquireConnection(command, entry); - } - - public abstract static class AcquireCallback implements Runnable, BiConsumer { - + return acquireConnection(command, entry); } - + protected final CompletableFuture acquireConnection(RedisCommand command, ClientConnectionsEntry entry) { CompletableFuture result = new CompletableFuture(); - Runnable callback = () -> { + CompletableFuture f = acquireConnection(entry, command); + f.thenAccept(r -> { connectTo(entry, result, command); - }; - acquireConnection(entry, callback, command); + }); + result.whenComplete((r, e) -> { + if (e != null) { + f.completeExceptionally(e); + } + }); return result; } diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index 54330bd20..9d39efc87 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -58,8 +58,8 @@ public class PubSubConnectionPool extends ConnectionPool } @Override - protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable, RedisCommand command) { - entry.acquireSubscribeConnection(runnable); + protected CompletableFuture acquireConnection(ClientConnectionsEntry entry, RedisCommand command) { + return entry.acquireSubscribeConnection(); } @Override diff --git a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index 5b1a879d5..322bc8d66 100644 --- a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -27,22 +27,24 @@ import java.util.concurrent.atomic.AtomicInteger; public class AsyncSemaphore { private final AtomicInteger counter; - private final Queue listeners = new ConcurrentLinkedQueue<>(); + private final Queue> listeners = new ConcurrentLinkedQueue<>(); public AsyncSemaphore(int permits) { counter = new AtomicInteger(permits); } public boolean tryAcquire(long timeoutMillis) { - CountDownLatch latch = new CountDownLatch(1); - Runnable runnable = () -> latch.countDown(); - acquire(runnable); - + CompletableFuture f = acquire(); try { - return latch.await(timeoutMillis, TimeUnit.MILLISECONDS); + f.get(timeoutMillis, TimeUnit.MILLISECONDS); + return true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } catch (TimeoutException e) { + return false; } } @@ -53,24 +55,34 @@ public class AsyncSemaphore { public void removeListeners() { listeners.clear(); } - - public void acquire(Runnable listener) { - listeners.add(listener); + + public CompletableFuture acquire() { + CompletableFuture future = new CompletableFuture<>(); + listeners.add(future); tryRun(); + return future; + } + + public void acquire(Runnable listener) { + acquire().thenAccept(r -> listener.run()); } private void tryRun() { - if (counter.decrementAndGet() >= 0) { - Runnable listener = listeners.poll(); - if (listener == null) { - counter.incrementAndGet(); - return; + while (true) { + if (counter.decrementAndGet() >= 0) { + CompletableFuture future = listeners.poll(); + if (future == null) { + counter.incrementAndGet(); + return; + } + + if (future.complete(null)) { + return; + } } - listener.run(); - } else { - if (counter.incrementAndGet() > 0) { - tryRun(); + if (counter.incrementAndGet() <= 0) { + return; } } }