RedissonTopic subscription logic refactored

pull/38/head
Nikita 11 years ago
parent 0fc23a6a83
commit 485483821c

@ -26,19 +26,23 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
* @param <K>
* @param <V>
*/
public class RedisPubSubTopicListenerWrapper<K, V> extends RedisPubSubAdapter<K, V> {
public class RedisPubSubTopicListenerWrapper<V> extends RedisPubSubAdapter<String, V> {
private final MessageListener<V> listener;
private final K name;
private final String name;
public RedisPubSubTopicListenerWrapper(MessageListener<V> listener, K name) {
public String getName() {
return name;
}
public RedisPubSubTopicListenerWrapper(MessageListener<V> listener, String name) {
super();
this.listener = listener;
this.name = name;
}
@Override
public void message(K channel, V message) {
public void message(String channel, V message) {
// could be subscribed to multiple channels
if (name.equals(channel)) {
listener.onMessage(message);

@ -16,23 +16,13 @@
package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
/**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
@ -43,53 +33,13 @@ import com.lambdaworks.redis.pubsub.RedisPubSubListener;
*/
public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicReference<Promise<Boolean>> promise = new AtomicReference<Promise<Boolean>>();
private final Map<Integer, RedisPubSubTopicListenerWrapper<String, M>> listeners =
new ConcurrentHashMap<Integer, RedisPubSubTopicListenerWrapper<String, M>>();
private PubSubConnectionEntry pubSubEntry;
RedissonTopic(ConnectionManager connectionManager, String name) {
super(connectionManager, name);
}
private void lazySubscribe() {
if (promise.get() != null) {
return;
}
final Promise<Boolean> newPromise = newPromise();
if (!promise.compareAndSet(null, newPromise)) {
return;
}
RedisPubSubAdapter<String, M> listener = new RedisPubSubAdapter<String, M>() {
@Override
public void subscribed(String channel, long count) {
Promise<Boolean> subscribePromise = promise.get();
//in case of reconnecting, promise might already be completed.
if (channel.equals(getName()) && !subscribePromise.isDone()) {
log.debug("subscribed to '{}' channel", getName());
subscribePromise.setSuccess(true);
}
}
};
pubSubEntry = connectionManager.subscribe(listener, getName());
}
@Override
public long publish(M message) {
// TODO refactor to publishAsync usage
RedisConnection<String, Object> conn = connectionManager.connectionWriteOp();
try {
return conn.publish(getName(), message);
} finally {
connectionManager.release(conn);
}
return publishAsync(message).awaitUninterruptibly().getNow();
}
@Override
@ -99,55 +49,25 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
}
@Override
public int addListener(final MessageListener<M> listener) {
final RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, getName());
listeners.put(pubSubListener.hashCode(), pubSubListener);
return addListener(pubSubListener);
}
private int addListener(final RedisPubSubListener<String, M> pubSubListener) {
lazySubscribe();
promise.get().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Boolean> future) throws Exception {
pubSubEntry.addListener(pubSubListener);
}
});
public int addListener(MessageListener<M> listener) {
RedisPubSubTopicListenerWrapper<M> pubSubListener = new RedisPubSubTopicListenerWrapper<M>(listener, getName());
PubSubConnectionEntry entry = connectionManager.subscribe(getName());
synchronized (entry) {
entry.addListener(pubSubListener);
}
return pubSubListener.hashCode();
}
@Override
public void removeListener(int listenerId) {
final RedisPubSubTopicListenerWrapper<String, M> pubSubListener = listeners.remove(listenerId);
promise.get().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Boolean> future) throws Exception {
pubSubEntry.removeListener(pubSubListener);
}
});
lazyUnsubscribe();
}
private void lazyUnsubscribe() {
Promise<Boolean> oldPromise = promise.get();
final PubSubConnectionEntry oldPubSubEntry = pubSubEntry;
if (oldPromise == null || !promise.compareAndSet(oldPromise, null)) {
PubSubConnectionEntry entry = connectionManager.getEntry(getName());
if (entry == null) {
return;
}
oldPromise.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Boolean> future) throws Exception {
connectionManager.unsubscribe(oldPubSubEntry, getName());
log.debug("unsubscribed from '{}' channel", getName());
// reattach eventually added listeners
for (RedisPubSubListener listener : oldPubSubEntry.getListeners()) {
addListener(listener);
}
}
});
synchronized (entry) {
entry.removeListener(listenerId);
connectionManager.unsubscribe(entry, getName());
}
}
@Override

@ -23,7 +23,9 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import org.redisson.Config;
@ -50,6 +52,7 @@ public class ConnectionManager {
private final EventLoopGroup group = new NioEventLoopGroup();
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final Queue<PubSubConnectionEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
private final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
private final List<RedisClient> clients = new ArrayList<RedisClient>();
private final Semaphore activeConnections;
@ -96,9 +99,24 @@ public class ConnectionManager {
return conn;
}
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, K channel) {
public PubSubConnectionEntry getEntry(String channelName) {
return name2PubSubConnection.get(channelName);
}
public <K, V> PubSubConnectionEntry subscribe(String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
}
for (PubSubConnectionEntry entry : pubSubConnections) {
if (entry.subscribe(listener, channel)) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
}
entry.subscribe(channelName);
return entry;
}
}
@ -109,8 +127,50 @@ public class ConnectionManager {
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.subscribe(listener, channel);
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
return oldEntry;
}
entry.subscribe(channelName);
pubSubConnections.add(entry);
return entry;
}
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
}
for (PubSubConnectionEntry entry : pubSubConnections) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
}
entry.subscribe(listener, channelName);
return entry;
}
}
acquireConnection();
RedisPubSubConnection<K, V> conn = balancer.nextClient().connectPubSub(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
return oldEntry;
}
entry.subscribe(listener, channelName);
pubSubConnections.add(entry);
return entry;
}
@ -125,8 +185,13 @@ public class ConnectionManager {
}
}
public <K> void unsubscribe(PubSubConnectionEntry entry, K channel) {
entry.unsubscribe(channel);
public void unsubscribe(PubSubConnectionEntry entry, String channelName) {
if (entry.hasListeners(channelName)) {
return;
}
name2PubSubConnection.remove(channelName);
entry.unsubscribe(channelName);
log.debug("unsubscribed from '{}' channel", channelName);
if (entry.tryClose()) {
pubSubConnections.remove(entry);
activeConnections.release();

@ -18,12 +18,18 @@ package org.redisson.connection;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import org.redisson.RedisPubSubTopicListenerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
public class PubSubConnectionEntry {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Semaphore semaphore;
private final RedisPubSubConnection conn;
private final int subscriptionsPerConnection;
@ -39,24 +45,66 @@ public class PubSubConnectionEntry {
conn.addListener(listener);
}
public Queue<RedisPubSubListener> getListeners() {
return conn.getListeners();
// TODO optimize
public boolean hasListeners(String channelName) {
Queue<RedisPubSubListener> queue = conn.getListeners();
for (RedisPubSubListener listener : queue) {
if (!(listener instanceof RedisPubSubTopicListenerWrapper)) {
continue;
}
RedisPubSubTopicListenerWrapper entry = (RedisPubSubTopicListenerWrapper) listener;
if (entry.getName().equals(channelName)) {
return true;
}
}
return false;
}
// TODO optimize
public void removeListener(int listenerId) {
Queue<RedisPubSubListener> queue = conn.getListeners();
for (RedisPubSubListener listener : queue) {
if (!(listener instanceof RedisPubSubTopicListenerWrapper)) {
continue;
}
RedisPubSubTopicListenerWrapper entry = (RedisPubSubTopicListenerWrapper) listener;
if (entry.hashCode() == listenerId) {
removeListener(entry);
break;
}
}
}
public void removeListener(RedisPubSubListener listener) {
conn.removeListener(listener);
}
public boolean subscribe(RedisPubSubAdapter listener, Object channel) {
if (semaphore.tryAcquire()) {
conn.addListener(listener);
conn.subscribe(channel);
return true;
}
return false;
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void release() {
semaphore.release();
}
public void subscribe(final String channelName) {
conn.addListener(new RedisPubSubAdapter() {
public void subscribed(String channel, long count) {
log.debug("subscribed to '{}' channel", channelName);
}
});
conn.subscribe(channelName);
}
public void subscribe(RedisPubSubAdapter listener, Object channel) {
conn.addListener(listener);
conn.subscribe(channel);
}
public void unsubscribe(Object channel) {
public void unsubscribe(String channel) {
conn.unsubscribe(channel);
semaphore.release();
}

Loading…
Cancel
Save