Fixed - AsyncSemaphore doesn't skip canceled tasks in the same thread. #4215

pull/4226/head
Nikita Koksharov 3 years ago
parent 14abe1e5ec
commit f9e97416ce

@ -148,8 +148,8 @@ public class ClientConnectionsEntry {
freeSubscribeConnectionsCounter.removeListeners(); freeSubscribeConnectionsCounter.removeListeners();
} }
public void acquireConnection(Runnable runnable, RedisCommand<?> command) { public CompletableFuture<Void> acquireConnection(RedisCommand<?> command) {
freeConnectionsCounter.acquire(runnable); return freeConnectionsCounter.acquire();
} }
public void releaseConnection() { public void releaseConnection() {
@ -263,8 +263,8 @@ public class ClientConnectionsEntry {
freeSubscribeConnections.add(connection); freeSubscribeConnections.add(connection);
} }
public void acquireSubscribeConnection(Runnable runnable) { public CompletableFuture<Void> acquireSubscribeConnection() {
freeSubscribeConnectionsCounter.acquire(runnable); return freeSubscribeConnectionsCounter.acquire();
} }
public void releaseSubscribeConnection() { public void releaseSubscribeConnection() {

@ -39,7 +39,6 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
/** /**
* Base connection pool class * Base connection pool class
@ -106,10 +105,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
return; return;
} }
acquireConnection(entry, new Runnable() { CompletableFuture<Void> f = acquireConnection(entry, null);
f.thenAccept(r -> {
@Override
public void run() {
CompletableFuture<T> promise = new CompletableFuture<T>(); CompletableFuture<T> promise = new CompletableFuture<T>();
createConnection(entry, promise); createConnection(entry, promise);
promise.whenComplete((conn, e) -> { promise.whenComplete((conn, e) -> {
@ -166,12 +163,11 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
} }
}); });
} });
}, null);
} }
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable, RedisCommand<?> command) { protected CompletableFuture<Void> acquireConnection(ClientConnectionsEntry entry, RedisCommand<?> command) {
entry.acquireConnection(runnable, command); return entry.acquireConnection(command);
} }
protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry); protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
@ -218,17 +214,18 @@ abstract class ConnectionPool<T extends RedisConnection> {
return acquireConnection(command, entry); return acquireConnection(command, entry);
} }
public abstract static class AcquireCallback<T> implements Runnable, BiConsumer<T, Throwable> {
}
protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) { protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {
CompletableFuture<T> result = new CompletableFuture<T>(); CompletableFuture<T> result = new CompletableFuture<T>();
Runnable callback = () -> { CompletableFuture<Void> f = acquireConnection(entry, command);
f.thenAccept(r -> {
connectTo(entry, result, command); connectTo(entry, result, command);
}; });
acquireConnection(entry, callback, command); result.whenComplete((r, e) -> {
if (e != null) {
f.completeExceptionally(e);
}
});
return result; return result;
} }

@ -58,8 +58,8 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
} }
@Override @Override
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable, RedisCommand<?> command) { protected CompletableFuture<Void> acquireConnection(ClientConnectionsEntry entry, RedisCommand<?> command) {
entry.acquireSubscribeConnection(runnable); return entry.acquireSubscribeConnection();
} }
@Override @Override

@ -27,22 +27,24 @@ import java.util.concurrent.atomic.AtomicInteger;
public class AsyncSemaphore { public class AsyncSemaphore {
private final AtomicInteger counter; private final AtomicInteger counter;
private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>(); private final Queue<CompletableFuture<Void>> listeners = new ConcurrentLinkedQueue<>();
public AsyncSemaphore(int permits) { public AsyncSemaphore(int permits) {
counter = new AtomicInteger(permits); counter = new AtomicInteger(permits);
} }
public boolean tryAcquire(long timeoutMillis) { public boolean tryAcquire(long timeoutMillis) {
CountDownLatch latch = new CountDownLatch(1); CompletableFuture<Void> f = acquire();
Runnable runnable = () -> latch.countDown();
acquire(runnable);
try { try {
return latch.await(timeoutMillis, TimeUnit.MILLISECONDS); f.get(timeoutMillis, TimeUnit.MILLISECONDS);
return true;
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return false; return false;
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (TimeoutException e) {
return false;
} }
} }
@ -54,23 +56,33 @@ public class AsyncSemaphore {
listeners.clear(); listeners.clear();
} }
public void acquire(Runnable listener) { public CompletableFuture<Void> acquire() {
listeners.add(listener); CompletableFuture<Void> future = new CompletableFuture<>();
listeners.add(future);
tryRun(); tryRun();
return future;
}
public void acquire(Runnable listener) {
acquire().thenAccept(r -> listener.run());
} }
private void tryRun() { private void tryRun() {
while (true) {
if (counter.decrementAndGet() >= 0) { if (counter.decrementAndGet() >= 0) {
Runnable listener = listeners.poll(); CompletableFuture<Void> future = listeners.poll();
if (listener == null) { if (future == null) {
counter.incrementAndGet(); counter.incrementAndGet();
return; return;
} }
listener.run(); if (future.complete(null)) {
} else { return;
if (counter.incrementAndGet() > 0) { }
tryRun(); }
if (counter.incrementAndGet() <= 0) {
return;
} }
} }
} }

Loading…
Cancel
Save