Some improvement made for AsyncSemaphore object

pull/762/head
Nikita 8 years ago
parent a2ba9c0681
commit af6437dacd

@ -114,6 +114,10 @@ public class ClientConnectionsEntry {
freeConnectionsCounter.acquire(runnable); freeConnectionsCounter.acquire(runnable);
} }
public void removeConnection(Runnable runnable) {
freeConnectionsCounter.remove(runnable);
}
public void releaseConnection() { public void releaseConnection() {
freeConnectionsCounter.release(); freeConnectionsCounter.release();
} }

@ -20,6 +20,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.NodeType; import org.redisson.api.NodeType;
@ -201,14 +202,29 @@ abstract class ConnectionPool<T extends RedisConnection> {
return connectionManager.newFailedFuture(exception); return connectionManager.newFailedFuture(exception);
} }
public static abstract class AcquireCallback<T> implements Runnable, FutureListener<T> {
}
private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) { private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {
final RPromise<T> result = connectionManager.newPromise(); final RPromise<T> result = connectionManager.newPromise();
acquireConnection(entry, new Runnable() {
AcquireCallback<T> callback = new AcquireCallback<T>() {
@Override @Override
public void run() { public void run() {
result.removeListener(this);
connectTo(entry, result); connectTo(entry, result);
} }
});
@Override
public void operationComplete(Future<T> future) throws Exception {
entry.removeConnection(this);
}
};
result.addListener(callback);
acquireConnection(entry, callback);
return result; return result;
} }

@ -15,8 +15,11 @@
*/ */
package org.redisson.pubsub; package org.redisson.pubsub;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
/** /**
@ -27,7 +30,7 @@ import java.util.concurrent.CountDownLatch;
public class AsyncSemaphore { public class AsyncSemaphore {
private int counter; private int counter;
private final Queue<Runnable> listeners = new LinkedList<Runnable>(); private final Set<Runnable> listeners = new LinkedHashSet<Runnable>();
public AsyncSemaphore(int permits) { public AsyncSemaphore(int permits) {
counter = permits; counter = permits;
@ -89,7 +92,11 @@ public class AsyncSemaphore {
synchronized (this) { synchronized (this) {
counter++; counter++;
runnable = listeners.poll(); Iterator<Runnable> iter = listeners.iterator();
if (iter.hasNext()) {
runnable = iter.next();
iter.remove();
}
} }
if (runnable != null) { if (runnable != null) {

Loading…
Cancel
Save