diff --git a/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java index 5dc71e29f..ffa5d431a 100644 --- a/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java @@ -15,9 +15,9 @@ */ package org.redisson.misc; -import java.util.Queue; +import org.redisson.cache.FastRemovalQueue; + import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -33,7 +33,7 @@ public final class AsyncSemaphore { private final AtomicInteger stackSize = new AtomicInteger(); private final AtomicInteger counter; - private final Queue> listeners = new ConcurrentLinkedQueue<>(); + private final FastRemovalQueue> listeners = new FastRemovalQueue<>(); public AsyncSemaphore(int permits) { this(permits, null); @@ -55,6 +55,11 @@ public final class AsyncSemaphore { public CompletableFuture acquire() { CompletableFuture future = new CompletableFuture<>(); listeners.add(future); + future.whenComplete((r, e) -> { + if (e != null) { + listeners.remove(future); + } + }); tryForkAndRun(); return future; }