PubSubEntry extracted

pull/10/head
Nikita 11 years ago
parent 19ec0c1593
commit 756d19cc7d

@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.RCountDownLatch;
import org.redisson.misc.ReclosableLatch;
@ -50,7 +50,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
private final ReclosableLatch msg = new ReclosableLatch();
private final ConnectionManager connectionManager;
private PubSubEntry pubSubEntry;
private PubSubConnectionEntry pubSubEntry;
RedissonCountDownLatch(ConnectionManager connectionManager, String name) {
super(name);
@ -95,7 +95,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
public void await() throws InterruptedException {
while (getCount() > 0) {
// waiting for message
// waiting for open state
msg.await();
}
}
@ -109,7 +109,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return false;
}
long current = System.currentTimeMillis();
// waiting for message
// waiting for open state
msg.await(time, TimeUnit.MILLISECONDS);
long elapsed = System.currentTimeMillis() - current;
time = time - elapsed;

@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.RLock;
import com.lambdaworks.redis.RedisConnection;
@ -113,7 +113,7 @@ public class RedissonLock extends RedissonObject implements RLock {
private final Semaphore msg = new Semaphore(1);
private PubSubEntry pubSubEntry;
private PubSubConnectionEntry pubSubEntry;
RedissonLock(ConnectionManager connectionManager, String name, UUID id) {
super(name);

@ -21,7 +21,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ConnectionManager.PubSubEntry;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
@ -44,7 +44,7 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
new ConcurrentHashMap<Integer, RedisPubSubTopicListenerWrapper<String, M>>();
private final ConnectionManager connectionManager;
private PubSubEntry pubSubEntry;
private PubSubConnectionEntry pubSubEntry;
RedissonTopic(ConnectionManager connectionManager, String name) {
super(name);

@ -32,7 +32,6 @@ import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
/**
*
@ -42,55 +41,10 @@ import com.lambdaworks.redis.pubsub.RedisPubSubListener;
//TODO ping support
public class ConnectionManager {
public static class PubSubEntry {
private final Semaphore semaphore;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
public PubSubEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
super();
this.conn = conn;
this.subscriptionsPerConnection = subscriptionsPerConnection;
this.semaphore = new Semaphore(subscriptionsPerConnection);
}
public void addListener(RedisPubSubListener listener) {
conn.addListener(listener);
}
public void removeListener(RedisPubSubListener listener) {
conn.removeListener(listener);
}
public boolean subscribe(RedisPubSubAdapter listener, Object channel) {
if (semaphore.tryAcquire()) {
conn.addListener(listener);
conn.subscribe(channel);
return true;
}
return false;
}
public void unsubscribe(Object channel) {
conn.unsubscribe(channel);
semaphore.release();
}
public boolean tryClose() {
if (semaphore.tryAcquire(subscriptionsPerConnection)) {
conn.close();
return true;
}
return false;
}
}
private final Logger log = LoggerFactory.getLogger(getClass());
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final Queue<PubSubEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubEntry>();
private final Queue<PubSubConnectionEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
private final List<RedisClient> clients = new ArrayList<RedisClient>();
private final Semaphore activeConnections;
@ -128,8 +82,8 @@ public class ConnectionManager {
return conn;
}
public <K, V> PubSubEntry subscribe(RedisPubSubAdapter<K, V> listener, K channel) {
for (PubSubEntry entry : pubSubConnections) {
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, K channel) {
for (PubSubConnectionEntry entry : pubSubConnections) {
if (entry.subscribe(listener, channel)) {
return entry;
}
@ -141,7 +95,7 @@ public class ConnectionManager {
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
PubSubEntry entry = new PubSubEntry(conn, config.getSubscriptionsPerConnection());
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.subscribe(listener, channel);
pubSubConnections.add(entry);
return entry;
@ -157,7 +111,7 @@ public class ConnectionManager {
}
}
public <K> void unsubscribe(PubSubEntry entry, K channel) {
public <K> void unsubscribe(PubSubConnectionEntry entry, K channel) {
entry.unsubscribe(channel);
if (entry.tryClose()) {
pubSubConnections.remove(entry);

@ -0,0 +1,52 @@
package org.redisson.connection;
import java.util.concurrent.Semaphore;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
public class PubSubConnectionEntry {
private final Semaphore semaphore;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
super();
this.conn = conn;
this.subscriptionsPerConnection = subscriptionsPerConnection;
this.semaphore = new Semaphore(subscriptionsPerConnection);
}
public void addListener(RedisPubSubListener listener) {
conn.addListener(listener);
}
public void removeListener(RedisPubSubListener listener) {
conn.removeListener(listener);
}
public boolean subscribe(RedisPubSubAdapter listener, Object channel) {
if (semaphore.tryAcquire()) {
conn.addListener(listener);
conn.subscribe(channel);
return true;
}
return false;
}
public void unsubscribe(Object channel) {
conn.unsubscribe(channel);
semaphore.release();
}
public boolean tryClose() {
if (semaphore.tryAcquire(subscriptionsPerConnection)) {
conn.close();
return true;
}
return false;
}
}
Loading…
Cancel
Save