Fix reconnect handling

pull/22/head
Zihui Ren 11 years ago
parent bdfa678f5b
commit 746818d49a

@ -13,7 +13,6 @@ import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -39,7 +38,6 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class RedisClient { public class RedisClient {
private Bootstrap bootstrap; private Bootstrap bootstrap;
private HashedWheelTimer timer;
private ChannelGroup channels; private ChannelGroup channels;
private long timeout; private long timeout;
private TimeUnit unit; private TimeUnit unit;
@ -69,8 +67,6 @@ public class RedisClient {
setDefaultTimeout(60, TimeUnit.SECONDS); setDefaultTimeout(60, TimeUnit.SECONDS);
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
timer = new HashedWheelTimer();
timer.start();
} }
/** /**
@ -165,7 +161,7 @@ public class RedisClient {
private <K, V, T extends RedisAsyncConnection<K, V>> T connect(final CommandHandler<K, V> handler, final T connection) { private <K, V, T extends RedisAsyncConnection<K, V>> T connect(final CommandHandler<K, V> handler, final T connection) {
try { try {
final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels, timer); final ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, channels);
ChannelFuture connect = null; ChannelFuture connect = null;
// TODO use better concurrent workaround // TODO use better concurrent workaround
@ -200,7 +196,6 @@ public class RedisClient {
ChannelGroupFuture future = channels.close(); ChannelGroupFuture future = channels.close();
future.awaitUninterruptibly(); future.awaitUninterruptibly();
bootstrap.group().shutdownGracefully().syncUninterruptibly(); bootstrap.group().shutdownGracefully().syncUninterruptibly();
timer.stop();
} }
} }

@ -10,6 +10,7 @@ import io.netty.channel.group.ChannelGroup;
import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.Timer; import io.netty.util.Timer;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -21,25 +22,22 @@ import java.util.concurrent.TimeUnit;
* @author Will Glozer * @author Will Glozer
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask { public class ConnectionWatchdog extends ChannelInboundHandlerAdapter{
private Bootstrap bootstrap; private Bootstrap bootstrap;
private Channel channel; private Channel channel;
private ChannelGroup channels; private ChannelGroup channels;
private Timer timer;
private boolean reconnect; private boolean reconnect;
private int attempts; private static final int BACKOFF_CAP = 12;
/** /**
* Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} * Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup}
* and establishes a new {@link Channel} when disconnected, while reconnect is true. * and establishes a new {@link Channel} when disconnected, while reconnect is true.
* *
* @param bootstrap Configuration for new channels. * @param bootstrap Configuration for new channels.
* @param timer Timer used for delayed reconnect.
*/ */
public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels, Timer timer) { public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) {
this.bootstrap = bootstrap; this.bootstrap = bootstrap;
this.channels = channels; this.channels = channels;
this.timer = timer;
} }
public void setReconnect(boolean reconnect) { public void setReconnect(boolean reconnect) {
@ -50,50 +48,72 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel(); channel = ctx.channel();
channels.add(channel); channels.add(channel);
attempts = 0;
ctx.fireChannelActive(); ctx.fireChannelActive();
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (reconnect) { if (reconnect) {
if (attempts < 8) attempts++; ChannelPipeline pipeLine = channel.pipeline();
int timeout = 2 << attempts; CommandHandler<?, ?> handler = pipeLine.get(CommandHandler.class);
timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS); RedisAsyncConnection<?, ?> connection = pipeLine.get(RedisAsyncConnection.class);
EventLoop loop = ctx.channel().eventLoop();
reconnect(loop, handler, connection);
} }
ctx.fireChannelInactive(); ctx.fireChannelInactive();
} }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
/** /**
* Reconnect to the remote address that the closed channel was connected to. * Reconnect to the remote address that the closed channel was connected to.
* This creates a new {@link ChannelPipeline} with the same handler instances * This creates a new {@link ChannelPipeline} with the same handler instances
* contained in the old channel's pipeline. * contained in the old channel's pipeline.
* *
* @param timeout Timer task handle. * @param loop EventLoop
* @param handler Redis Command handle.
* @param connection RedisAsyncConnection
* *
* @throws Exception when reconnection fails. * @throws Exception when reconnection fails.
*/ */
@Override private void reconnect(final EventLoop loop, final CommandHandler<?, ?> handler, final RedisAsyncConnection<?, ?> connection){
public void run(Timeout timeout) throws Exception { loop.schedule(new Runnable() {
ChannelPipeline old = channel.pipeline(); @Override
final CommandHandler<?, ?> handler = old.get(CommandHandler.class); public void run() {
final RedisAsyncConnection<?, ?> connection = old.get(RedisAsyncConnection.class); doReConnect(loop, handler, connection, 1);
}
}, 2, TimeUnit.MILLISECONDS);
}
ChannelFuture connect = null; private void doReConnect(final EventLoop loop, final CommandHandler<?, ?> handler, final RedisAsyncConnection<?, ?> connection, final int attempts) {
// TODO use better concurrent workaround if (reconnect) {
synchronized (bootstrap) { ChannelFuture connect;
connect = bootstrap.handler(new ChannelInitializer<Channel>() { synchronized (bootstrap) {
connect = bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(ConnectionWatchdog.this, handler, connection);
}
}).connect();
}
connect.addListener(new GenericFutureListener<ChannelFuture>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
ch.pipeline().addLast(this, handler, connection); if (!future.isSuccess()) {
int timeout = 2 << attempts;
loop.schedule(new Runnable() {
@Override
public void run() {
doReConnect(loop, handler, connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
}
} }
}).connect(); });
} }
connect.sync();
} }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
} }

@ -61,8 +61,10 @@ public class RedissonTopic<M> extends RedissonObject implements RTopic<M> {
RedisPubSubAdapter<String, M> listener = new RedisPubSubAdapter<String, M>() { RedisPubSubAdapter<String, M> listener = new RedisPubSubAdapter<String, M>() {
@Override @Override
public void subscribed(String channel, long count) { public void subscribed(String channel, long count) {
if (channel.equals(getName())) { Promise<Boolean> subscribePromise = promise.get();
promise.get().setSuccess(true); //in case of reconnecting, promise might already be completed.
if (channel.equals(getName()) && !subscribePromise.isDone()) {
subscribePromise.setSuccess(true);
} }
} }
}; };

Loading…
Cancel
Save