diff --git a/src/main/java/com/lambdaworks/redis/RedisClient.java b/src/main/java/com/lambdaworks/redis/RedisClient.java index 04ce48137..16c0b2556 100644 --- a/src/main/java/com/lambdaworks/redis/RedisClient.java +++ b/src/main/java/com/lambdaworks/redis/RedisClient.java @@ -13,7 +13,6 @@ import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.GlobalEventExecutor; import java.net.InetSocketAddress; @@ -39,7 +38,6 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection; public class RedisClient { private Bootstrap bootstrap; - private HashedWheelTimer timer; private ChannelGroup channels; private long timeout; private TimeUnit unit; @@ -69,8 +67,6 @@ public class RedisClient { setDefaultTimeout(60, TimeUnit.SECONDS); channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - timer = new HashedWheelTimer(); - timer.start(); } /** @@ -165,7 +161,7 @@ public class RedisClient { private > T connect(final CommandHandler handler, final T connection) { try { - final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels, timer); + final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels); ChannelFuture connect = null; // TODO use better concurrent workaround @@ -200,7 +196,6 @@ public class RedisClient { ChannelGroupFuture future = channels.close(); future.awaitUninterruptibly(); bootstrap.group().shutdownGracefully().syncUninterruptibly(); - timer.stop(); } } diff --git a/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java b/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java index f588fcb80..16cadfec9 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java +++ b/src/main/java/com/lambdaworks/redis/protocol/ConnectionWatchdog.java @@ -10,6 +10,7 @@ import io.netty.channel.group.ChannelGroup; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; +import io.netty.util.concurrent.GenericFutureListener; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; @@ -21,25 +22,22 @@ import java.util.concurrent.TimeUnit; * @author Will Glozer */ @ChannelHandler.Sharable -public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask { +public class ConnectionWatchdog extends ChannelInboundHandlerAdapter{ private Bootstrap bootstrap; private Channel channel; private ChannelGroup channels; - private Timer timer; private boolean reconnect; - private int attempts; + private static final int BACKOFF_CAP = 12; /** * Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} * and establishes a new {@link Channel} when disconnected, while reconnect is true. * * @param bootstrap Configuration for new channels. - * @param timer Timer used for delayed reconnect. */ - public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels, Timer timer) { + public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) { this.bootstrap = bootstrap; this.channels = channels; - this.timer = timer; } public void setReconnect(boolean reconnect) { @@ -50,50 +48,72 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements public void channelActive(ChannelHandlerContext ctx) throws Exception { channel = ctx.channel(); channels.add(channel); - attempts = 0; ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (reconnect) { - if (attempts < 8) attempts++; - int timeout = 2 << attempts; - timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS); + ChannelPipeline pipeLine = channel.pipeline(); + CommandHandler handler = pipeLine.get(CommandHandler.class); + RedisAsyncConnection connection = pipeLine.get(RedisAsyncConnection.class); + EventLoop loop = ctx.channel().eventLoop(); + reconnect(loop, handler, connection); } ctx.fireChannelInactive(); } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.channel().close(); - } - /** * Reconnect to the remote address that the closed channel was connected to. * This creates a new {@link ChannelPipeline} with the same handler instances * contained in the old channel's pipeline. * - * @param timeout Timer task handle. + * @param loop EventLoop + * @param handler Redis Command handle. + * @param connection RedisAsyncConnection * * @throws Exception when reconnection fails. */ - @Override - public void run(Timeout timeout) throws Exception { - ChannelPipeline old = channel.pipeline(); - final CommandHandler handler = old.get(CommandHandler.class); - final RedisAsyncConnection connection = old.get(RedisAsyncConnection.class); + private void reconnect(final EventLoop loop, final CommandHandler handler, final RedisAsyncConnection connection){ + loop.schedule(new Runnable() { + @Override + public void run() { + doReConnect(loop, handler, connection, 1); + } + }, 2, TimeUnit.MILLISECONDS); + } - ChannelFuture connect = null; - // TODO use better concurrent workaround - synchronized (bootstrap) { - connect = bootstrap.handler(new ChannelInitializer() { + private void doReConnect(final EventLoop loop, final CommandHandler handler, final RedisAsyncConnection connection, final int attempts) { + if (reconnect) { + ChannelFuture connect; + synchronized (bootstrap) { + connect = bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(ConnectionWatchdog.this, handler, connection); + } + }).connect(); + } + connect.addListener(new GenericFutureListener() { @Override - protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(this, handler, connection); + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + int timeout = 2 << attempts; + loop.schedule(new Runnable() { + @Override + public void run() { + doReConnect(loop, handler, connection, Math.min(BACKOFF_CAP, attempts + 1)); + } + }, timeout, TimeUnit.MILLISECONDS); + } } - }).connect(); + }); } - connect.sync(); } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.channel().close(); + } + } diff --git a/src/main/java/org/redisson/RedissonTopic.java b/src/main/java/org/redisson/RedissonTopic.java index 7bdf37868..091bc4190 100644 --- a/src/main/java/org/redisson/RedissonTopic.java +++ b/src/main/java/org/redisson/RedissonTopic.java @@ -61,8 +61,10 @@ public class RedissonTopic extends RedissonObject implements RTopic { RedisPubSubAdapter listener = new RedisPubSubAdapter() { @Override public void subscribed(String channel, long count) { - if (channel.equals(getName())) { - promise.get().setSuccess(true); + Promise subscribePromise = promise.get(); + //in case of reconnecting, promise might already be completed. + if (channel.equals(getName()) && !subscribePromise.isDone()) { + subscribePromise.setSuccess(true); } } };