From e22d4ab29d1968f6d21ae1f34cd953991054460f Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 1 Jan 2014 18:17:44 +0400 Subject: [PATCH] Connection creation should be thread-safe --- .../com/lambdaworks/redis/RedisClient.java | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/RedisClient.java b/src/main/java/com/lambdaworks/redis/RedisClient.java index 923793689..4ff0fb777 100644 --- a/src/main/java/com/lambdaworks/redis/RedisClient.java +++ b/src/main/java/com/lambdaworks/redis/RedisClient.java @@ -2,25 +2,33 @@ package com.lambdaworks.redis; -import com.lambdaworks.redis.codec.RedisCodec; -import com.lambdaworks.redis.codec.Utf8StringCodec; -import com.lambdaworks.redis.protocol.*; -import com.lambdaworks.redis.pubsub.PubSubCommandHandler; -import com.lambdaworks.redis.pubsub.RedisPubSubConnection; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; -import io.netty.util.Timer; import io.netty.util.concurrent.GlobalEventExecutor; import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import com.lambdaworks.redis.codec.RedisCodec; +import com.lambdaworks.redis.codec.Utf8StringCodec; +import com.lambdaworks.redis.protocol.Command; +import com.lambdaworks.redis.protocol.CommandHandler; +import com.lambdaworks.redis.protocol.ConnectionWatchdog; +import com.lambdaworks.redis.pubsub.PubSubCommandHandler; +import com.lambdaworks.redis.pubsub.RedisPubSubConnection; /** * A scalable thread-safe Redis client. Multiple threads @@ -55,9 +63,6 @@ public class RedisClient { * @param port Server port. */ public RedisClient(String host, int port) { - ExecutorService connectors = Executors.newFixedThreadPool(1); - ExecutorService workers = Executors.newCachedThreadPool(); - InetSocketAddress addr = new InetSocketAddress(host, port); group = new NioEventLoopGroup(); @@ -163,14 +168,18 @@ public class RedisClient { private > T connect(final CommandHandler handler, final T connection) { try { final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels, timer); - bootstrap.handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(watchdog, handler, connection); - } - }); - - bootstrap.connect().sync(); + + ChannelFuture connect = null; + // TODO use better concurrent workaround + synchronized (bootstrap) { + connect = bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(watchdog, handler, connection); + } + }).connect(); + } + connect.sync(); watchdog.setReconnect(true);