From 2b06ca47788f0f86fd5d54d935dc1f08e949fac5 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 8 Jan 2014 21:23:18 +0400 Subject: [PATCH] ConnectionManager optimizations --- src/main/java/org/redisson/RedissonLock.java | 1 - src/main/java/org/redisson/RedissonTopic.java | 3 --- .../connection/ConnectionManager.java | 23 +++++++++++++++---- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 784c639f1..57b5c22aa 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -29,7 +29,6 @@ import org.redisson.core.RLock; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; -import com.lambdaworks.redis.pubsub.RedisPubSubConnection; /** * Reentrant distributed lock diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index b248d87d6..f0603de8e 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -28,12 +28,9 @@ import org.redisson.core.RedisPubSubTopicListener; import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; -import com.lambdaworks.redis.pubsub.RedisPubSubConnection; public class RedissonTopic implements RTopic { - private RedisPubSubConnection pubSubConnection; - private final CountDownLatch subscribeLatch = new CountDownLatch(1); private final AtomicBoolean subscribeOnce = new AtomicBoolean(); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index f9fc02992..aade1d744 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -39,10 +39,12 @@ public class ConnectionManager { private final Semaphore semaphore; private final RedisPubSubConnection conn; + private final int subscriptionsPerConnection; public PubSubEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { super(); this.conn = conn; + this.subscriptionsPerConnection = subscriptionsPerConnection; this.semaphore = new Semaphore(subscriptionsPerConnection); } @@ -68,24 +70,32 @@ public class ConnectionManager { semaphore.release(); } + public boolean tryClose() { + if (semaphore.tryAcquire(subscriptionsPerConnection)) { + conn.close(); + return true; + } + return false; + } + } private final Queue connections = new ConcurrentLinkedQueue(); private final Queue pubSubConnections = new ConcurrentLinkedQueue(); - private final Semaphore semaphore; + private final Semaphore activeConnections; private final RedisClient redisClient; private final Config config; public ConnectionManager(Config config) { Entry address = config.getAddresses().entrySet().iterator().next(); redisClient = new RedisClient(address.getKey(), address.getValue()); - semaphore = new Semaphore(config.getConnectionPoolSize()); + activeConnections = new Semaphore(config.getConnectionPoolSize()); this.config = config; } public RedisConnection acquireConnection() { - semaphore.acquireUninterruptibly(); + activeConnections.acquireUninterruptibly(); RedisConnection c = connections.poll(); if (c == null) { c = redisClient.connect(config.getCodec()); @@ -100,6 +110,7 @@ public class ConnectionManager { } } + activeConnections.acquireUninterruptibly(); RedisPubSubConnection conn = redisClient.connectPubSub(config.getCodec()); PubSubEntry entry = new PubSubEntry(conn, config.getSubscriptionsPerConnection()); entry.subscribe(listener, channel); @@ -109,10 +120,14 @@ public class ConnectionManager { public void unsubscribe(PubSubEntry entry, K channel) { entry.unsubscribe(channel); + if (entry.tryClose()) { + pubSubConnections.remove(entry); + activeConnections.release(); + } } public void release(RedisConnection сonnection) { - semaphore.release(); + activeConnections.release(); connections.add(сonnection); }