Fixed - starvation of pub/sub connections may cause a memory leak. #6328

pull/6334/head
Nikita Koksharov 2 months ago
parent 6a6bc93084
commit c30fc89fc2

@ -15,9 +15,9 @@
*/ */
package org.redisson.misc; package org.redisson.misc;
import java.util.Queue; import org.redisson.cache.FastRemovalQueue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -33,7 +33,7 @@ public final class AsyncSemaphore {
private final AtomicInteger stackSize = new AtomicInteger(); private final AtomicInteger stackSize = new AtomicInteger();
private final AtomicInteger counter; private final AtomicInteger counter;
private final Queue<CompletableFuture<Void>> listeners = new ConcurrentLinkedQueue<>(); private final FastRemovalQueue<CompletableFuture<Void>> listeners = new FastRemovalQueue<>();
public AsyncSemaphore(int permits) { public AsyncSemaphore(int permits) {
this(permits, null); this(permits, null);
@ -55,6 +55,11 @@ public final class AsyncSemaphore {
public CompletableFuture<Void> acquire() { public CompletableFuture<Void> acquire() {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
listeners.add(future); listeners.add(future);
future.whenComplete((r, e) -> {
if (e != null) {
listeners.remove(future);
}
});
tryForkAndRun(); tryForkAndRun();
return future; return future;
} }

Loading…
Cancel
Save