From 756d19cc7d13af756579fe898662786a605d8e73 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 22 Apr 2014 20:15:35 +0400 Subject: [PATCH] PubSubEntry extracted --- .../org/redisson/RedissonCountDownLatch.java | 8 +-- src/main/java/org/redisson/RedissonLock.java | 4 +- src/main/java/org/redisson/RedissonTopic.java | 4 +- .../connection/ConnectionManager.java | 56 ++----------------- .../connection/PubSubConnectionEntry.java | 52 +++++++++++++++++ 5 files changed, 65 insertions(+), 59 deletions(-) create mode 100644 src/main/java/org/redisson/connection/PubSubConnectionEntry.java diff --git a/src/main/java/org/redisson/RedissonCountDownLatch.java b/src/main/java/org/redisson/RedissonCountDownLatch.java index 04d0f35f9..d73820d5a 100644 --- a/src/main/java/org/redisson/RedissonCountDownLatch.java +++ b/src/main/java/org/redisson/RedissonCountDownLatch.java @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.ConnectionManager.PubSubEntry; +import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.RCountDownLatch; import org.redisson.misc.ReclosableLatch; @@ -50,7 +50,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown private final ReclosableLatch msg = new ReclosableLatch(); private final ConnectionManager connectionManager; - private PubSubEntry pubSubEntry; + private PubSubConnectionEntry pubSubEntry; RedissonCountDownLatch(ConnectionManager connectionManager, String name) { super(name); @@ -95,7 +95,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown public void await() throws InterruptedException { while (getCount() > 0) { - // waiting for message + // waiting for open state msg.await(); } } @@ -109,7 +109,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown return false; } long current = System.currentTimeMillis(); - // waiting for message + // waiting for open state msg.await(time, TimeUnit.MILLISECONDS); long elapsed = System.currentTimeMillis() - current; time = time - elapsed; diff --git a/src/main/java/org/redisson/RedissonLock.java b/src/main/java/org/redisson/RedissonLock.java index 7035a4d52..907cccc3a 100644 --- a/src/main/java/org/redisson/RedissonLock.java +++ b/src/main/java/org/redisson/RedissonLock.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.ConnectionManager.PubSubEntry; +import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.RLock; import com.lambdaworks.redis.RedisConnection; @@ -113,7 +113,7 @@ public class RedissonLock extends RedissonObject implements RLock { private final Semaphore msg = new Semaphore(1); - private PubSubEntry pubSubEntry; + private PubSubConnectionEntry pubSubEntry; RedissonLock(ConnectionManager connectionManager, String name, UUID id) { super(name); diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index 77fc0d11e..ea8af887b 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -21,7 +21,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.ConnectionManager.PubSubEntry; +import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.MessageListener; import org.redisson.core.RTopic; @@ -44,7 +44,7 @@ public class RedissonTopic extends RedissonObject implements RTopic { new ConcurrentHashMap>(); private final ConnectionManager connectionManager; - private PubSubEntry pubSubEntry; + private PubSubConnectionEntry pubSubEntry; RedissonTopic(ConnectionManager connectionManager, String name) { super(name); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 11eb341b9..4b0593ec2 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -32,7 +32,6 @@ import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.codec.RedisCodec; import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubConnection; -import com.lambdaworks.redis.pubsub.RedisPubSubListener; /** * @@ -42,55 +41,10 @@ import com.lambdaworks.redis.pubsub.RedisPubSubListener; //TODO ping support public class ConnectionManager { - public static class PubSubEntry { - - private final Semaphore semaphore; - private final RedisPubSubConnection conn; - private final int subscriptionsPerConnection; - - public PubSubEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { - super(); - this.conn = conn; - this.subscriptionsPerConnection = subscriptionsPerConnection; - this.semaphore = new Semaphore(subscriptionsPerConnection); - } - - public void addListener(RedisPubSubListener listener) { - conn.addListener(listener); - } - - public void removeListener(RedisPubSubListener listener) { - conn.removeListener(listener); - } - - public boolean subscribe(RedisPubSubAdapter listener, Object channel) { - if (semaphore.tryAcquire()) { - conn.addListener(listener); - conn.subscribe(channel); - return true; - } - return false; - } - - public void unsubscribe(Object channel) { - conn.unsubscribe(channel); - semaphore.release(); - } - - public boolean tryClose() { - if (semaphore.tryAcquire(subscriptionsPerConnection)) { - conn.close(); - return true; - } - return false; - } - - } - private final Logger log = LoggerFactory.getLogger(getClass()); private final Queue connections = new ConcurrentLinkedQueue(); - private final Queue pubSubConnections = new ConcurrentLinkedQueue(); + private final Queue pubSubConnections = new ConcurrentLinkedQueue(); private final List clients = new ArrayList(); private final Semaphore activeConnections; @@ -128,8 +82,8 @@ public class ConnectionManager { return conn; } - public PubSubEntry subscribe(RedisPubSubAdapter listener, K channel) { - for (PubSubEntry entry : pubSubConnections) { + public PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, K channel) { + for (PubSubConnectionEntry entry : pubSubConnections) { if (entry.subscribe(listener, channel)) { return entry; } @@ -141,7 +95,7 @@ public class ConnectionManager { if (config.getPassword() != null) { conn.auth(config.getPassword()); } - PubSubEntry entry = new PubSubEntry(conn, config.getSubscriptionsPerConnection()); + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); entry.subscribe(listener, channel); pubSubConnections.add(entry); return entry; @@ -157,7 +111,7 @@ public class ConnectionManager { } } - public void unsubscribe(PubSubEntry entry, K channel) { + public void unsubscribe(PubSubConnectionEntry entry, K channel) { entry.unsubscribe(channel); if (entry.tryClose()) { pubSubConnections.remove(entry); diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java new file mode 100644 index 000000000..6f11be3d2 --- /dev/null +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -0,0 +1,52 @@ +package org.redisson.connection; + +import java.util.concurrent.Semaphore; + +import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; +import com.lambdaworks.redis.pubsub.RedisPubSubConnection; +import com.lambdaworks.redis.pubsub.RedisPubSubListener; + +public class PubSubConnectionEntry { + + private final Semaphore semaphore; + private final RedisPubSubConnection conn; + private final int subscriptionsPerConnection; + + public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) { + super(); + this.conn = conn; + this.subscriptionsPerConnection = subscriptionsPerConnection; + this.semaphore = new Semaphore(subscriptionsPerConnection); + } + + public void addListener(RedisPubSubListener listener) { + conn.addListener(listener); + } + + public void removeListener(RedisPubSubListener listener) { + conn.removeListener(listener); + } + + public boolean subscribe(RedisPubSubAdapter listener, Object channel) { + if (semaphore.tryAcquire()) { + conn.addListener(listener); + conn.subscribe(channel); + return true; + } + return false; + } + + public void unsubscribe(Object channel) { + conn.unsubscribe(channel); + semaphore.release(); + } + + public boolean tryClose() { + if (semaphore.tryAcquire(subscriptionsPerConnection)) { + conn.close(); + return true; + } + return false; + } + +}