From c30fc89fc218a11ae36690ce4e5860663d45454d Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 10 Dec 2024 10:38:01 +0300 Subject: [PATCH] Fixed - starvation of pub/sub connections may cause a memory leak. #6328 --- .../main/java/org/redisson/misc/AsyncSemaphore.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java index 5dc71e29f..ffa5d431a 100644 --- a/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java @@ -15,9 +15,9 @@ */ package org.redisson.misc; -import java.util.Queue; +import org.redisson.cache.FastRemovalQueue; + import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -33,7 +33,7 @@ public final class AsyncSemaphore { private final AtomicInteger stackSize = new AtomicInteger(); private final AtomicInteger counter; - private final Queue> listeners = new ConcurrentLinkedQueue<>(); + private final FastRemovalQueue> listeners = new FastRemovalQueue<>(); public AsyncSemaphore(int permits) { this(permits, null); @@ -55,6 +55,11 @@ public final class AsyncSemaphore { public CompletableFuture acquire() { CompletableFuture future = new CompletableFuture<>(); listeners.add(future); + future.whenComplete((r, e) -> { + if (e != null) { + listeners.remove(future); + } + }); tryForkAndRun(); return future; }