From 485483821cf81a2fc8d2218c52ab4160796c0224 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 4 Jun 2014 16:24:50 +0400 Subject: [PATCH] RedissonTopic subscription logic refactored --- .../RedisPubSubTopicListenerWrapper.java | 12 +- src/main/java/org/redisson/RedissonTopic.java | 106 +++--------------- .../connection/ConnectionManager.java | 75 ++++++++++++- .../connection/PubSubConnectionEntry.java | 68 +++++++++-- 4 files changed, 149 insertions(+), 112 deletions(-) diff --git a/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java b/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java index d8bd4304e..92e0528f8 100644 --- a/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java +++ b/src/main/java/org/redisson/RedisPubSubTopicListenerWrapper.java @@ -26,19 +26,23 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; * @param * @param */ -public class RedisPubSubTopicListenerWrapper extends RedisPubSubAdapter { +public class RedisPubSubTopicListenerWrapper extends RedisPubSubAdapter { private final MessageListener listener; - private final K name; + private final String name; - public RedisPubSubTopicListenerWrapper(MessageListener listener, K name) { + public String getName() { + return name; + } + + public RedisPubSubTopicListenerWrapper(MessageListener listener, String name) { super(); this.listener = listener; this.name = name; } @Override - public void message(K channel, V message) { + public void message(String channel, V message) { // could be subscribed to multiple channels if (name.equals(channel)) { listener.onMessage(message); diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index c4eac850b..576879418 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -16,23 +16,13 @@ package org.redisson; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import org.redisson.connection.ConnectionManager; import org.redisson.connection.PubSubConnectionEntry; import org.redisson.core.MessageListener; import org.redisson.core.RTopic; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.lambdaworks.redis.RedisConnection; -import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; -import com.lambdaworks.redis.pubsub.RedisPubSubListener; /** * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. @@ -43,53 +33,13 @@ import com.lambdaworks.redis.pubsub.RedisPubSubListener; */ public class RedissonTopic extends RedissonObject implements RTopic { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final AtomicReference> promise = new AtomicReference>(); - - private final Map> listeners = - new ConcurrentHashMap>(); - - private PubSubConnectionEntry pubSubEntry; - RedissonTopic(ConnectionManager connectionManager, String name) { super(connectionManager, name); } - private void lazySubscribe() { - if (promise.get() != null) { - return; - } - - final Promise newPromise = newPromise(); - if (!promise.compareAndSet(null, newPromise)) { - return; - } - - RedisPubSubAdapter listener = new RedisPubSubAdapter() { - @Override - public void subscribed(String channel, long count) { - Promise subscribePromise = promise.get(); - //in case of reconnecting, promise might already be completed. - if (channel.equals(getName()) && !subscribePromise.isDone()) { - log.debug("subscribed to '{}' channel", getName()); - subscribePromise.setSuccess(true); - } - } - }; - - pubSubEntry = connectionManager.subscribe(listener, getName()); - } - @Override public long publish(M message) { - // TODO refactor to publishAsync usage - RedisConnection conn = connectionManager.connectionWriteOp(); - try { - return conn.publish(getName(), message); - } finally { - connectionManager.release(conn); - } + return publishAsync(message).awaitUninterruptibly().getNow(); } @Override @@ -99,55 +49,25 @@ public class RedissonTopic extends RedissonObject implements RTopic { } @Override - public int addListener(final MessageListener listener) { - final RedisPubSubTopicListenerWrapper pubSubListener = new RedisPubSubTopicListenerWrapper(listener, getName()); - listeners.put(pubSubListener.hashCode(), pubSubListener); - - return addListener(pubSubListener); - } - - private int addListener(final RedisPubSubListener pubSubListener) { - lazySubscribe(); - promise.get().addListener(new FutureListener() { - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - pubSubEntry.addListener(pubSubListener); - } - }); + public int addListener(MessageListener listener) { + RedisPubSubTopicListenerWrapper pubSubListener = new RedisPubSubTopicListenerWrapper(listener, getName()); + PubSubConnectionEntry entry = connectionManager.subscribe(getName()); + synchronized (entry) { + entry.addListener(pubSubListener); + } return pubSubListener.hashCode(); } @Override public void removeListener(int listenerId) { - final RedisPubSubTopicListenerWrapper pubSubListener = listeners.remove(listenerId); - promise.get().addListener(new FutureListener() { - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - pubSubEntry.removeListener(pubSubListener); - } - }); - lazyUnsubscribe(); - } - - private void lazyUnsubscribe() { - Promise oldPromise = promise.get(); - final PubSubConnectionEntry oldPubSubEntry = pubSubEntry; - if (oldPromise == null || !promise.compareAndSet(oldPromise, null)) { + PubSubConnectionEntry entry = connectionManager.getEntry(getName()); + if (entry == null) { return; } - - oldPromise.addListener(new FutureListener() { - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - connectionManager.unsubscribe(oldPubSubEntry, getName()); - log.debug("unsubscribed from '{}' channel", getName()); - - // reattach eventually added listeners - for (RedisPubSubListener listener : oldPubSubEntry.getListeners()) { - addListener(listener); - } - } - }); + synchronized (entry) { + entry.removeListener(listenerId); + connectionManager.unsubscribe(entry, getName()); + } } @Override diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 1ec44b317..17c348ba1 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -23,7 +23,9 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; import org.redisson.Config; @@ -50,6 +52,7 @@ public class ConnectionManager { private final EventLoopGroup group = new NioEventLoopGroup(); private final Queue connections = new ConcurrentLinkedQueue(); private final Queue pubSubConnections = new ConcurrentLinkedQueue(); + private final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap(); private final List clients = new ArrayList(); private final Semaphore activeConnections; @@ -96,9 +99,24 @@ public class ConnectionManager { return conn; } - public PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, K channel) { + public PubSubConnectionEntry getEntry(String channelName) { + return name2PubSubConnection.get(channelName); + } + + public PubSubConnectionEntry subscribe(String channelName) { + PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); + if (сonnEntry != null) { + return сonnEntry; + } + for (PubSubConnectionEntry entry : pubSubConnections) { - if (entry.subscribe(listener, channel)) { + if (entry.tryAcquire()) { + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + entry.release(); + return oldEntry; + } + entry.subscribe(channelName); return entry; } } @@ -109,8 +127,50 @@ public class ConnectionManager { if (config.getPassword() != null) { conn.auth(config.getPassword()); } + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); - entry.subscribe(listener, channel); + entry.tryAcquire(); + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + return oldEntry; + } + entry.subscribe(channelName); + pubSubConnections.add(entry); + return entry; + } + + public PubSubConnectionEntry subscribe(RedisPubSubAdapter listener, String channelName) { + PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName); + if (сonnEntry != null) { + return сonnEntry; + } + + for (PubSubConnectionEntry entry : pubSubConnections) { + if (entry.tryAcquire()) { + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + entry.release(); + return oldEntry; + } + entry.subscribe(listener, channelName); + return entry; + } + } + + acquireConnection(); + + RedisPubSubConnection conn = balancer.nextClient().connectPubSub(codec); + if (config.getPassword() != null) { + conn.auth(config.getPassword()); + } + + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); + entry.tryAcquire(); + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + return oldEntry; + } + entry.subscribe(listener, channelName); pubSubConnections.add(entry); return entry; } @@ -125,8 +185,13 @@ public class ConnectionManager { } } - public void unsubscribe(PubSubConnectionEntry entry, K channel) { - entry.unsubscribe(channel); + public void unsubscribe(PubSubConnectionEntry entry, String channelName) { + if (entry.hasListeners(channelName)) { + return; + } + name2PubSubConnection.remove(channelName); + entry.unsubscribe(channelName); + log.debug("unsubscribed from '{}' channel", channelName); if (entry.tryClose()) { pubSubConnections.remove(entry); activeConnections.release(); diff --git a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java index d5615bb13..856feb5e1 100644 --- a/src/main/java/org/redisson/connection/PubSubConnectionEntry.java +++ b/src/main/java/org/redisson/connection/PubSubConnectionEntry.java @@ -18,12 +18,18 @@ package org.redisson.connection; import java.util.Queue; import java.util.concurrent.Semaphore; +import org.redisson.RedisPubSubTopicListenerWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import com.lambdaworks.redis.pubsub.RedisPubSubListener; public class PubSubConnectionEntry { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final Semaphore semaphore; private final RedisPubSubConnection conn; private final int subscriptionsPerConnection; @@ -39,24 +45,66 @@ public class PubSubConnectionEntry { conn.addListener(listener); } - public Queue getListeners() { - return conn.getListeners(); + // TODO optimize + public boolean hasListeners(String channelName) { + Queue queue = conn.getListeners(); + for (RedisPubSubListener listener : queue) { + if (!(listener instanceof RedisPubSubTopicListenerWrapper)) { + continue; + } + + RedisPubSubTopicListenerWrapper entry = (RedisPubSubTopicListenerWrapper) listener; + if (entry.getName().equals(channelName)) { + return true; + } + } + return false; + } + + // TODO optimize + public void removeListener(int listenerId) { + Queue queue = conn.getListeners(); + for (RedisPubSubListener listener : queue) { + if (!(listener instanceof RedisPubSubTopicListenerWrapper)) { + continue; + } + + RedisPubSubTopicListenerWrapper entry = (RedisPubSubTopicListenerWrapper) listener; + if (entry.hashCode() == listenerId) { + removeListener(entry); + break; + } + } } public void removeListener(RedisPubSubListener listener) { conn.removeListener(listener); } - public boolean subscribe(RedisPubSubAdapter listener, Object channel) { - if (semaphore.tryAcquire()) { - conn.addListener(listener); - conn.subscribe(channel); - return true; - } - return false; + public boolean tryAcquire() { + return semaphore.tryAcquire(); + } + + public void release() { + semaphore.release(); + } + + public void subscribe(final String channelName) { + conn.addListener(new RedisPubSubAdapter() { + public void subscribed(String channel, long count) { + log.debug("subscribed to '{}' channel", channelName); + } + }); + conn.subscribe(channelName); + } + + + public void subscribe(RedisPubSubAdapter listener, Object channel) { + conn.addListener(listener); + conn.subscribe(channel); } - public void unsubscribe(Object channel) { + public void unsubscribe(String channel) { conn.unsubscribe(channel); semaphore.release(); }