Fixed - connection leak during high load with few connections #5971

pull/5994/head
Nikita Koksharov 8 months ago
parent ca4dae140b
commit 3c1c90f603

@ -54,7 +54,7 @@ public class ConnectionsHolder<T extends RedisConnection> {
public ConnectionsHolder(RedisClient client, int poolMaxSize,
Function<RedisClient, CompletionStage<T>> connectionCallback,
ServiceManager serviceManager, boolean changeUsage) {
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize);
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize, serviceManager.getGroup());
this.client = client;
this.connectionCallback = connectionCallback;
this.serviceManager = serviceManager;
@ -215,9 +215,7 @@ public class ConnectionsHolder<T extends RedisConnection> {
private void connectTo(CompletableFuture<T> promise, RedisCommand<?> command) {
if (promise.isDone()) {
serviceManager.getGroup().submit(() -> {
releaseConnection();
});
releaseConnection();
return;
}
@ -249,5 +247,8 @@ public class ConnectionsHolder<T extends RedisConnection> {
releaseConnection();
}
public ServiceManager getServiceManager() {
return serviceManager;
}
}

@ -42,7 +42,7 @@ public class TrackedConnectionsHolder extends ConnectionsHolder<RedisConnection>
private final AtomicInteger usage = new AtomicInteger();
public TrackedConnectionsHolder(ConnectionsHolder<RedisConnection> holder) {
super(null, 0, null, null, false);
super(null, 0, null, holder.getServiceManager(), false);
this.holder = holder;
}

@ -18,6 +18,7 @@ package org.redisson.misc;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -27,13 +28,22 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public final class AsyncSemaphore {
private final ExecutorService executorService;
private final AtomicInteger tasksLatch = new AtomicInteger(1);
private final AtomicInteger stackSize = new AtomicInteger();
private final AtomicInteger counter;
private final Queue<CompletableFuture<Void>> listeners = new ConcurrentLinkedQueue<>();
public AsyncSemaphore(int permits) {
this(permits, null);
}
public AsyncSemaphore(int permits, ExecutorService executorService) {
counter = new AtomicInteger(permits);
this.executorService = executorService;
}
public int queueSize() {
return listeners.size();
}
@ -45,10 +55,26 @@ public final class AsyncSemaphore {
public CompletableFuture<Void> acquire() {
CompletableFuture<Void> future = new CompletableFuture<>();
listeners.add(future);
tryRun();
tryForkAndRun();
return future;
}
private void tryForkAndRun() {
if (executorService != null) {
int val = tasksLatch.get();
if (stackSize.get() > 100 * val
&& tasksLatch.compareAndSet(val, val+1)) {
executorService.submit(() -> {
tasksLatch.decrementAndGet();
tryRun();
});
return;
}
}
tryRun();
}
private void tryRun() {
while (true) {
if (counter.decrementAndGet() >= 0) {
@ -58,7 +84,15 @@ public final class AsyncSemaphore {
return;
}
if (future.complete(null)) {
boolean complete;
if (executorService != null) {
stackSize.incrementAndGet();
complete = future.complete(null);
stackSize.decrementAndGet();
} else {
complete = future.complete(null);
}
if (complete) {
return;
}
}
@ -75,7 +109,7 @@ public final class AsyncSemaphore {
public void release() {
counter.incrementAndGet();
tryRun();
tryForkAndRun();
}
@Override

Loading…
Cancel
Save