Connection creation should be thread-safe

pull/6/head
Nikita 11 years ago
parent 20998dd085
commit e22d4ab29d

@ -2,25 +2,33 @@
package com.lambdaworks.redis;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.protocol.*;
import com.lambdaworks.redis.pubsub.PubSubCommandHandler;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandHandler;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.pubsub.PubSubCommandHandler;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
/**
* A scalable thread-safe <a href="http://redis.io/">Redis</a> client. Multiple threads
@ -55,9 +63,6 @@ public class RedisClient {
* @param port Server port.
*/
public RedisClient(String host, int port) {
ExecutorService connectors = Executors.newFixedThreadPool(1);
ExecutorService workers = Executors.newCachedThreadPool();
InetSocketAddress addr = new InetSocketAddress(host, port);
group = new NioEventLoopGroup();
@ -163,14 +168,18 @@ public class RedisClient {
private <K, V, T extends RedisAsyncConnection<K, V>> T connect(final CommandHandler<K, V> handler, final T connection) {
try {
final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels, timer);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(watchdog, handler, connection);
}
});
bootstrap.connect().sync();
ChannelFuture connect = null;
// TODO use better concurrent workaround
synchronized (bootstrap) {
connect = bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(watchdog, handler, connection);
}
}).connect();
}
connect.sync();
watchdog.setReconnect(true);

Loading…
Cancel
Save