diff --git a/src/main/java/com/lambdaworks/redis/RedisClient.java b/src/main/java/com/lambdaworks/redis/RedisClient.java index 4ff0fb777..04ce48137 100644 --- a/src/main/java/com/lambdaworks/redis/RedisClient.java +++ b/src/main/java/com/lambdaworks/redis/RedisClient.java @@ -12,7 +12,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.GlobalEventExecutor; @@ -38,7 +37,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; * @author Will Glozer */ public class RedisClient { - private EventLoopGroup group; + private Bootstrap bootstrap; private HashedWheelTimer timer; private ChannelGroup channels; @@ -50,8 +49,8 @@ public class RedisClient { * * @param host Server hostname. */ - public RedisClient(String host) { - this(host, 6379); + public RedisClient(EventLoopGroup group, String host) { + this(group, host, 6379); } /** @@ -62,16 +61,15 @@ public class RedisClient { * @param host Server hostname. * @param port Server port. */ - public RedisClient(String host, int port) { + public RedisClient(EventLoopGroup group, String host, int port) { InetSocketAddress addr = new InetSocketAddress(host, port); - group = new NioEventLoopGroup(); bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(group).remoteAddress(addr); setDefaultTimeout(60, TimeUnit.SECONDS); channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - timer = new HashedWheelTimer(); + timer = new HashedWheelTimer(); timer.start(); } @@ -201,7 +199,7 @@ public class RedisClient { } ChannelGroupFuture future = channels.close(); future.awaitUninterruptibly(); - group.shutdownGracefully().syncUninterruptibly(); + bootstrap.group().shutdownGracefully().syncUninterruptibly(); timer.stop(); } } diff --git a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java index c530974ac..d9a7bbae3 100644 --- a/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java +++ b/src/main/java/com/lambdaworks/redis/pubsub/RedisPubSubConnection.java @@ -6,12 +6,15 @@ import com.lambdaworks.redis.RedisAsyncConnection; import com.lambdaworks.redis.codec.RedisCodec; import com.lambdaworks.redis.protocol.Command; import com.lambdaworks.redis.protocol.CommandArgs; + import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.*; +import org.redisson.RedisPubSubTopicListenerWrapper; + import static com.lambdaworks.redis.protocol.CommandType.*; /** @@ -29,7 +32,7 @@ import static com.lambdaworks.redis.protocol.CommandType.*; * @author Will Glozer */ public class RedisPubSubConnection extends RedisAsyncConnection { - private List> listeners; + private final Queue> listeners = new ConcurrentLinkedQueue>(); private Set channels; private Set patterns; @@ -43,7 +46,6 @@ public class RedisPubSubConnection extends RedisAsyncConnection { */ public RedisPubSubConnection(BlockingQueue> queue, RedisCodec codec, long timeout, TimeUnit unit) { super(queue, codec, timeout, unit); - listeners = new CopyOnWriteArrayList>(); channels = new HashSet(); patterns = new HashSet(); } diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index fb86661f9..901e3eba4 100644 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -211,7 +211,6 @@ public class Redisson { } } - topic.subscribe(); return topic; } diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index ea8af887b..7bdf37868 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -15,10 +15,12 @@ */ package org.redisson; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.redisson.connection.ConnectionManager; import org.redisson.connection.PubSubConnectionEntry; @@ -37,8 +39,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter; */ public class RedissonTopic extends RedissonObject implements RTopic { - private final CountDownLatch subscribeLatch = new CountDownLatch(1); - private final AtomicBoolean subscribeOnce = new AtomicBoolean(); + private final AtomicReference> promise = new AtomicReference>(); private final Map> listeners = new ConcurrentHashMap>(); @@ -51,27 +52,23 @@ public class RedissonTopic extends RedissonObject implements RTopic { this.connectionManager = connectionManager; } - public void subscribe() { - if (subscribeOnce.compareAndSet(false, true)) { - RedisPubSubAdapter listener = new RedisPubSubAdapter() { + private void lazySubscribe() { + if (promise.get() != null) { + return; + } + if (promise.compareAndSet(null, connectionManager.getGroup().next().newPromise())) { + RedisPubSubAdapter listener = new RedisPubSubAdapter() { @Override public void subscribed(String channel, long count) { if (channel.equals(getName())) { - subscribeLatch.countDown(); + promise.get().setSuccess(true); } } - }; pubSubEntry = connectionManager.subscribe(listener, getName()); } - - try { - subscribeLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } } @Override @@ -86,21 +83,41 @@ public class RedissonTopic extends RedissonObject implements RTopic { @Override public int addListener(MessageListener listener) { - RedisPubSubTopicListenerWrapper pubSubListener = new RedisPubSubTopicListenerWrapper(listener, getName()); + lazySubscribe(); + final RedisPubSubTopicListenerWrapper pubSubListener = new RedisPubSubTopicListenerWrapper(listener, getName()); listeners.put(pubSubListener.hashCode(), pubSubListener); - pubSubEntry.addListener(pubSubListener); + promise.get().addListener(new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + pubSubEntry.addListener(pubSubListener); + } + }); return pubSubListener.hashCode(); } @Override public void removeListener(int listenerId) { - RedisPubSubTopicListenerWrapper pubSubListener = listeners.remove(listenerId); - pubSubEntry.removeListener(pubSubListener); + final RedisPubSubTopicListenerWrapper pubSubListener = listeners.remove(listenerId); + promise.get().addListener(new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + pubSubEntry.removeListener(pubSubListener); + } + }); + // TODO lazyUnsubscribe(); } @Override public void close() { - connectionManager.unsubscribe(pubSubEntry, getName()); + if (promise.get() == null) { + return; + } + promise.get().addListener(new FutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + connectionManager.unsubscribe(pubSubEntry, getName()); + } + }); } } diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index 4b0593ec2..368015d33 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -15,6 +15,9 @@ */ package org.redisson.connection; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -43,6 +46,7 @@ public class ConnectionManager { private final Logger log = LoggerFactory.getLogger(getClass()); + private final EventLoopGroup group = new NioEventLoopGroup(); private final Queue connections = new ConcurrentLinkedQueue(); private final Queue pubSubConnections = new ConcurrentLinkedQueue(); private final List clients = new ArrayList(); @@ -54,7 +58,7 @@ public class ConnectionManager { public ConnectionManager(Config config) { for (URI address : config.getAddresses()) { - RedisClient client = new RedisClient(address.getHost(), address.getPort()); + RedisClient client = new RedisClient(group, address.getHost(), address.getPort()); clients.add(client); } balancer = config.getLoadBalancer(); @@ -130,4 +134,8 @@ public class ConnectionManager { } } + public EventLoopGroup getGroup() { + return group; + } + }