ConnectionManager optimizations

pull/6/head
Nikita 11 years ago
parent 925f143987
commit 2b06ca4778

@ -29,7 +29,6 @@ import org.redisson.core.RLock;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
/**
* Reentrant distributed lock

@ -28,12 +28,9 @@ import org.redisson.core.RedisPubSubTopicListener;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class RedissonTopic<M> implements RTopic<M> {
private RedisPubSubConnection<String, M> pubSubConnection;
private final CountDownLatch subscribeLatch = new CountDownLatch(1);
private final AtomicBoolean subscribeOnce = new AtomicBoolean();

@ -39,10 +39,12 @@ public class ConnectionManager {
private final Semaphore semaphore;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
public PubSubEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
super();
this.conn = conn;
this.subscriptionsPerConnection = subscriptionsPerConnection;
this.semaphore = new Semaphore(subscriptionsPerConnection);
}
@ -68,24 +70,32 @@ public class ConnectionManager {
semaphore.release();
}
public boolean tryClose() {
if (semaphore.tryAcquire(subscriptionsPerConnection)) {
conn.close();
return true;
}
return false;
}
}
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final Queue<PubSubEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubEntry>();
private final Semaphore semaphore;
private final Semaphore activeConnections;
private final RedisClient redisClient;
private final Config config;
public ConnectionManager(Config config) {
Entry<String, Integer> address = config.getAddresses().entrySet().iterator().next();
redisClient = new RedisClient(address.getKey(), address.getValue());
semaphore = new Semaphore(config.getConnectionPoolSize());
activeConnections = new Semaphore(config.getConnectionPoolSize());
this.config = config;
}
public <K, V> RedisConnection<K, V> acquireConnection() {
semaphore.acquireUninterruptibly();
activeConnections.acquireUninterruptibly();
RedisConnection<K, V> c = connections.poll();
if (c == null) {
c = redisClient.connect(config.getCodec());
@ -100,6 +110,7 @@ public class ConnectionManager {
}
}
activeConnections.acquireUninterruptibly();
RedisPubSubConnection<K, V> conn = redisClient.connectPubSub(config.getCodec());
PubSubEntry entry = new PubSubEntry(conn, config.getSubscriptionsPerConnection());
entry.subscribe(listener, channel);
@ -109,10 +120,14 @@ public class ConnectionManager {
public <K> void unsubscribe(PubSubEntry entry, K channel) {
entry.unsubscribe(channel);
if (entry.tryClose()) {
pubSubConnections.remove(entry);
activeConnections.release();
}
}
public void release(RedisConnection сonnection) {
semaphore.release();
activeConnections.release();
connections.add(сonnection);
}

Loading…
Cancel
Save