diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 5630f4760..86302a25f 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -113,6 +113,10 @@ public class ClientConnectionsEntry { public void acquireConnection(Runnable runnable) { freeConnectionsCounter.acquire(runnable); } + + public void removeConnection(Runnable runnable) { + freeConnectionsCounter.remove(runnable); + } public void releaseConnection() { freeConnectionsCounter.release(); diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 635f9e494..695e8b086 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -20,6 +20,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.NodeType; @@ -201,14 +202,29 @@ abstract class ConnectionPool { return connectionManager.newFailedFuture(exception); } + public static abstract class AcquireCallback implements Runnable, FutureListener { + + } + private RFuture acquireConnection(RedisCommand command, final ClientConnectionsEntry entry) { final RPromise result = connectionManager.newPromise(); - acquireConnection(entry, new Runnable() { + + AcquireCallback callback = new AcquireCallback() { @Override public void run() { + result.removeListener(this); connectTo(entry, result); } - }); + + @Override + public void operationComplete(Future future) throws Exception { + entry.removeConnection(this); + } + }; + + result.addListener(callback); + acquireConnection(entry, callback); + return result; } diff --git a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index 6af6138d3..c4ad451f0 100644 --- a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -15,8 +15,11 @@ */ package org.redisson.pubsub; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Queue; +import java.util.Set; import java.util.concurrent.CountDownLatch; /** @@ -27,7 +30,7 @@ import java.util.concurrent.CountDownLatch; public class AsyncSemaphore { private int counter; - private final Queue listeners = new LinkedList(); + private final Set listeners = new LinkedHashSet(); public AsyncSemaphore(int permits) { counter = permits; @@ -89,7 +92,11 @@ public class AsyncSemaphore { synchronized (this) { counter++; - runnable = listeners.poll(); + Iterator iter = listeners.iterator(); + if (iter.hasNext()) { + runnable = iter.next(); + iter.remove(); + } } if (runnable != null) {