Lazy subscribe in RedissonTopic. #17

pull/22/head
Nikita 11 years ago
parent d4136ac739
commit bdfa678f5b

@ -12,7 +12,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.GlobalEventExecutor;
@ -38,7 +37,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
* @author Will Glozer
*/
public class RedisClient {
private EventLoopGroup group;
private Bootstrap bootstrap;
private HashedWheelTimer timer;
private ChannelGroup channels;
@ -50,8 +49,8 @@ public class RedisClient {
*
* @param host Server hostname.
*/
public RedisClient(String host) {
this(host, 6379);
public RedisClient(EventLoopGroup group, String host) {
this(group, host, 6379);
}
/**
@ -62,16 +61,15 @@ public class RedisClient {
* @param host Server hostname.
* @param port Server port.
*/
public RedisClient(String host, int port) {
public RedisClient(EventLoopGroup group, String host, int port) {
InetSocketAddress addr = new InetSocketAddress(host, port);
group = new NioEventLoopGroup();
bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(group).remoteAddress(addr);
setDefaultTimeout(60, TimeUnit.SECONDS);
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
timer = new HashedWheelTimer();
timer = new HashedWheelTimer();
timer.start();
}
@ -201,7 +199,7 @@ public class RedisClient {
}
ChannelGroupFuture future = channels.close();
future.awaitUninterruptibly();
group.shutdownGracefully().syncUninterruptibly();
bootstrap.group().shutdownGracefully().syncUninterruptibly();
timer.stop();
}
}

@ -6,12 +6,15 @@ import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.*;
import org.redisson.RedisPubSubTopicListenerWrapper;
import static com.lambdaworks.redis.protocol.CommandType.*;
/**
@ -29,7 +32,7 @@ import static com.lambdaworks.redis.protocol.CommandType.*;
* @author Will Glozer
*/
public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
private List<RedisPubSubListener<K, V>> listeners;
private final Queue<RedisPubSubListener<K, V>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<K, V>>();
private Set<K> channels;
private Set<K> patterns;
@ -43,7 +46,6 @@ public class RedisPubSubConnection<K, V> extends RedisAsyncConnection<K, V> {
*/
public RedisPubSubConnection(BlockingQueue<Command<K, V, ?>> queue, RedisCodec<K, V> codec, long timeout, TimeUnit unit) {
super(queue, codec, timeout, unit);
listeners = new CopyOnWriteArrayList<RedisPubSubListener<K, V>>();
channels = new HashSet<K>();
patterns = new HashSet<K>();
}

@ -211,7 +211,6 @@ public class Redisson {
}
}
topic.subscribe();
return topic;
}

@ -15,10 +15,12 @@
*/
package org.redisson;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.PubSubConnectionEntry;
@ -37,8 +39,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
*/
public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
private final CountDownLatch subscribeLatch = new CountDownLatch(1);
private final AtomicBoolean subscribeOnce = new AtomicBoolean();
private final AtomicReference<Promise<Boolean>> promise = new AtomicReference<Promise<Boolean>>();
private final Map<Integer, RedisPubSubTopicListenerWrapper<String, M>> listeners =
new ConcurrentHashMap<Integer, RedisPubSubTopicListenerWrapper<String, M>>();
@ -51,27 +52,23 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
this.connectionManager = connectionManager;
}
public void subscribe() {
if (subscribeOnce.compareAndSet(false, true)) {
RedisPubSubAdapter<String, M> listener = new RedisPubSubAdapter<String, M>() {
private void lazySubscribe() {
if (promise.get() != null) {
return;
}
if (promise.compareAndSet(null, connectionManager.getGroup().next().<Boolean>newPromise())) {
RedisPubSubAdapter<String, M> listener = new RedisPubSubAdapter<String, M>() {
@Override
public void subscribed(String channel, long count) {
if (channel.equals(getName())) {
subscribeLatch.countDown();
promise.get().setSuccess(true);
}
}
};
pubSubEntry = connectionManager.subscribe(listener, getName());
}
try {
subscribeLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
@ -86,21 +83,41 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
@Override
public int addListener(MessageListener<M> listener) {
RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, getName());
lazySubscribe();
final RedisPubSubTopicListenerWrapper<String, M> pubSubListener = new RedisPubSubTopicListenerWrapper<String, M>(listener, getName());
listeners.put(pubSubListener.hashCode(), pubSubListener);
pubSubEntry.addListener(pubSubListener);
promise.get().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Boolean> future) throws Exception {
pubSubEntry.addListener(pubSubListener);
}
});
return pubSubListener.hashCode();
}
@Override
public void removeListener(int listenerId) {
RedisPubSubTopicListenerWrapper<String, M> pubSubListener = listeners.remove(listenerId);
pubSubEntry.removeListener(pubSubListener);
final RedisPubSubTopicListenerWrapper<String, M> pubSubListener = listeners.remove(listenerId);
promise.get().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Boolean> future) throws Exception {
pubSubEntry.removeListener(pubSubListener);
}
});
// TODO lazyUnsubscribe();
}
@Override
public void close() {
connectionManager.unsubscribe(pubSubEntry, getName());
if (promise.get() == null) {
return;
}
promise.get().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Boolean> future) throws Exception {
connectionManager.unsubscribe(pubSubEntry, getName());
}
});
}
}

@ -15,6 +15,9 @@
*/
package org.redisson.connection;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
@ -43,6 +46,7 @@ public class ConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final EventLoopGroup group = new NioEventLoopGroup();
private final Queue<RedisConnection> connections = new ConcurrentLinkedQueue<RedisConnection>();
private final Queue<PubSubConnectionEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
private final List<RedisClient> clients = new ArrayList<RedisClient>();
@ -54,7 +58,7 @@ public class ConnectionManager {
public ConnectionManager(Config config) {
for (URI address : config.getAddresses()) {
RedisClient client = new RedisClient(address.getHost(), address.getPort());
RedisClient client = new RedisClient(group, address.getHost(), address.getPort());
clients.add(client);
}
balancer = config.getLoadBalancer();
@ -130,4 +134,8 @@ public class ConnectionManager {
}
}
public EventLoopGroup getGroup() {
return group;
}
}

Loading…
Cancel
Save