diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionsHolder.java b/redisson/src/main/java/org/redisson/connection/ConnectionsHolder.java index db898f3c4..93e1a06b8 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionsHolder.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionsHolder.java @@ -54,7 +54,7 @@ public class ConnectionsHolder { public ConnectionsHolder(RedisClient client, int poolMaxSize, Function> connectionCallback, ServiceManager serviceManager, boolean changeUsage) { - this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize); + this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize, serviceManager.getGroup()); this.client = client; this.connectionCallback = connectionCallback; this.serviceManager = serviceManager; @@ -215,9 +215,7 @@ public class ConnectionsHolder { private void connectTo(CompletableFuture promise, RedisCommand command) { if (promise.isDone()) { - serviceManager.getGroup().submit(() -> { - releaseConnection(); - }); + releaseConnection(); return; } @@ -249,5 +247,8 @@ public class ConnectionsHolder { releaseConnection(); } + public ServiceManager getServiceManager() { + return serviceManager; + } } diff --git a/redisson/src/main/java/org/redisson/connection/TrackedConnectionsHolder.java b/redisson/src/main/java/org/redisson/connection/TrackedConnectionsHolder.java index 0b06f056b..812a5f165 100644 --- a/redisson/src/main/java/org/redisson/connection/TrackedConnectionsHolder.java +++ b/redisson/src/main/java/org/redisson/connection/TrackedConnectionsHolder.java @@ -42,7 +42,7 @@ public class TrackedConnectionsHolder extends ConnectionsHolder private final AtomicInteger usage = new AtomicInteger(); public TrackedConnectionsHolder(ConnectionsHolder holder) { - super(null, 0, null, null, false); + super(null, 0, null, holder.getServiceManager(), false); this.holder = holder; } diff --git a/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java index 725d83cb0..d74a65d78 100644 --- a/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java @@ -18,6 +18,7 @@ package org.redisson.misc; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; /** @@ -27,13 +28,22 @@ import java.util.concurrent.atomic.AtomicInteger; */ public final class AsyncSemaphore { + private final ExecutorService executorService; + private final AtomicInteger tasksLatch = new AtomicInteger(1); + private final AtomicInteger stackSize = new AtomicInteger(); + private final AtomicInteger counter; private final Queue> listeners = new ConcurrentLinkedQueue<>(); public AsyncSemaphore(int permits) { + this(permits, null); + } + + public AsyncSemaphore(int permits, ExecutorService executorService) { counter = new AtomicInteger(permits); + this.executorService = executorService; } - + public int queueSize() { return listeners.size(); } @@ -45,10 +55,26 @@ public final class AsyncSemaphore { public CompletableFuture acquire() { CompletableFuture future = new CompletableFuture<>(); listeners.add(future); - tryRun(); + tryForkAndRun(); return future; } + private void tryForkAndRun() { + if (executorService != null) { + int val = tasksLatch.get(); + if (stackSize.get() > 100 * val + && tasksLatch.compareAndSet(val, val+1)) { + executorService.submit(() -> { + tasksLatch.decrementAndGet(); + tryRun(); + }); + return; + } + } + + tryRun(); + } + private void tryRun() { while (true) { if (counter.decrementAndGet() >= 0) { @@ -58,7 +84,15 @@ public final class AsyncSemaphore { return; } - if (future.complete(null)) { + boolean complete; + if (executorService != null) { + stackSize.incrementAndGet(); + complete = future.complete(null); + stackSize.decrementAndGet(); + } else { + complete = future.complete(null); + } + if (complete) { return; } } @@ -75,7 +109,7 @@ public final class AsyncSemaphore { public void release() { counter.incrementAndGet(); - tryRun(); + tryForkAndRun(); } @Override