ConnectionWatchdog added. #183

pull/243/head
Nikita 10 years ago
parent b1f7653d5f
commit bbe5e7e71f

@ -16,16 +16,14 @@
package org.redisson.client;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.redisson.client.handler.RedisCommandsQueue;
import org.redisson.client.handler.RedisDecoder;
import org.redisson.client.handler.RedisEncoder;
import org.redisson.client.handler.CommandDecoder;
import org.redisson.client.handler.CommandEncoder;
import org.redisson.client.handler.CommandsQueue;
import org.redisson.client.handler.ConnectionWatchdog;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -39,7 +37,6 @@ import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
public class RedisClient {
@ -58,14 +55,13 @@ public class RedisClient {
addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addFirst(new RedisEncoder(),
new RedisCommandsQueue(),
new RedisDecoder());
ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels),
new CommandEncoder(),
new CommandsQueue(),
new CommandDecoder());
}
});
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
@ -88,7 +84,6 @@ public class RedisClient {
try {
ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly();
channels.add(future.channel());
return new RedisConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
@ -99,7 +94,6 @@ public class RedisClient {
try {
ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly();
channels.add(future.channel());
return new RedisPubSubConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);

@ -17,7 +17,7 @@ package org.redisson.client;
import java.util.concurrent.TimeUnit;
import org.redisson.client.handler.RedisData;
import org.redisson.client.handler.CommandData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -25,18 +25,29 @@ import org.redisson.client.protocol.RedisStrictCommand;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisConnection implements RedisCommands {
final Channel channel;
public static final AttributeKey<RedisConnection> CONNECTION = AttributeKey.valueOf("connection");
final RedisClient redisClient;
private volatile boolean closed;
volatile Channel channel;
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
this.redisClient = redisClient;
this.channel = channel;
channel.attr(CONNECTION).set(this);
}
public void updateChannel(Channel channel) {
this.channel = channel;
}
public RedisClient getRedisClient() {
@ -64,7 +75,7 @@ public class RedisConnection implements RedisCommands {
return await(r);
}
public <T, R> void send(RedisData<T, R> data) {
public <T, R> void send(CommandData<T, R> data) {
channel.writeAndFlush(data);
}
@ -75,11 +86,20 @@ public class RedisConnection implements RedisCommands {
public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
channel.writeAndFlush(new CommandData<T, R>(promise, encoder, command, params));
return promise;
}
public void setClosed(boolean reconnect) {
this.closed = reconnect;
}
public boolean isClosed() {
return closed;
}
public ChannelFuture closeAsync() {
setClosed(true);
return channel.close();
}

@ -17,7 +17,7 @@ package org.redisson.client;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.handler.RedisData;
import org.redisson.client.handler.CommandData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -29,20 +29,15 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
public class RedisPubSubConnection extends RedisConnection {
public static final AttributeKey<RedisPubSubConnection> CONNECTION = AttributeKey.valueOf("connection");
final ConcurrentLinkedQueue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
super(redisClient, channel);
channel.attr(CONNECTION).set(this);
}
public void addListener(RedisPubSubListener listener) {
@ -83,7 +78,7 @@ public class RedisPubSubConnection extends RedisConnection {
public <T, R> Future<R> async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, messageDecoder, null, command, params));
channel.writeAndFlush(new CommandData<T, R>(promise, messageDecoder, null, command, params));
return promise;
}

@ -23,7 +23,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.util.concurrent.Promise;
public class RedisData<T, R> {
public class CommandData<T, R> {
final Promise<R> promise;
final RedisCommand<T> command;
@ -32,11 +32,11 @@ public class RedisData<T, R> {
final AtomicBoolean sended = new AtomicBoolean();
final MultiDecoder<Object> messageDecoder;
public RedisData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
public CommandData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
this(promise, null, codec, command, params);
}
public RedisData(Promise<R> promise, MultiDecoder<Object> messageDecoder, Codec codec, RedisCommand<T> command, Object[] params) {
public CommandData(Promise<R> promise, MultiDecoder<Object> messageDecoder, Codec codec, RedisCommand<T> command, Object[] params) {
this.promise = promise;
this.command = command;
this.params = params;

@ -25,7 +25,7 @@ import java.util.Map;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.handler.RedisCommandsQueue.QueueCommands;
import org.redisson.client.handler.CommandsQueue.QueueCommands;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MultiDecoder;
@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;
@ -45,7 +46,7 @@ import io.netty.util.CharsetUtil;
* @author Nikita Koksharov
*
*/
public class RedisDecoder extends ReplayingDecoder<Void> {
public class CommandDecoder extends ReplayingDecoder<Void> {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -57,8 +58,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY).get();
RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get();
CommandData<Object, Object> data = ctx.channel().attr(CommandsQueue.REPLAY).get();
Decoder<Object> currentDecoder = null;
if (data == null) {
@ -75,22 +75,21 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
}
try {
decode(in, data, null, pubSubConnection, currentDecoder);
decode(in, data, null, ctx.channel(), currentDecoder);
} catch (IOException e) {
data.getPromise().setFailure(e);
}
ctx.channel().attr(RedisCommandsQueue.REPLAY).remove();
ctx.channel().attr(CommandsQueue.REPLAY).remove();
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
}
private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts, RedisPubSubConnection pubSubConnection, Decoder<Object> currentDecoder) throws IOException {
private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException {
int code = in.readByte();
if (code == '+') {
String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
// Object result = data.getCommand().getReplayDecoder().decode(in);
handleResult(data, parts, result);
} else if (code == '-') {
String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
@ -119,18 +118,18 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>();
for (int i = 0; i < size; i++) {
decode(in, data, respParts, pubSubConnection, currentDecoder);
decode(in, data, respParts, channel, currentDecoder);
}
Object result = messageDecoder(data, respParts).decode(respParts);
handleMultiResult(data, parts, pubSubConnection, result);
handleMultiResult(data, parts, channel, result);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}
}
private void handleMultiResult(RedisData<Object, Object> data, List<Object> parts,
RedisPubSubConnection pubSubConnection, Object result) {
private void handleMultiResult(CommandData<Object, Object> data, List<Object> parts,
Channel channel, Object result) {
if (data != null) {
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) {
for (Object param : data.getParams()) {
@ -149,6 +148,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
data.getPromise().setSuccess(result);
}
} else {
RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get();
if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
@ -157,7 +157,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
}
}
private void handleResult(RedisData<Object, Object> data, List<Object> parts, Object result) {
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result) {
if (data != null) {
result = data.getCommand().getConvertor().convert(result);
}
@ -168,7 +168,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
}
}
private MultiDecoder<Object> messageDecoder(RedisData<Object, Object> data, List<Object> parts) {
private MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts) {
if (data == null) {
if (parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
@ -182,7 +182,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
return data.getCommand().getReplayMultiDecoder();
}
private Decoder<Object> decoder(RedisData<Object, Object> data, List<Object> parts, Decoder<Object> currentDecoder) {
private Decoder<Object> decoder(CommandData<Object, Object> data, List<Object> parts, Decoder<Object> currentDecoder) {
if (data == null) {
if (parts.size() == 2 && parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);

@ -32,7 +32,7 @@ import io.netty.util.CharsetUtil;
* @author Nikita Koksharov
*
*/
public class RedisEncoder extends MessageToByteEncoder<RedisData<Object, Object>> {
public class CommandEncoder extends MessageToByteEncoder<CommandData<Object, Object>> {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -43,7 +43,7 @@ public class RedisEncoder extends MessageToByteEncoder<RedisData<Object, Object>
final byte[] CRLF = "\r\n".getBytes();
@Override
protected void encode(ChannelHandlerContext ctx, RedisData<Object, Object> msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, CommandData<Object, Object> msg, ByteBuf out) throws Exception {
out.writeByte(ARGS_PREFIX);
int len = 1 + msg.getParams().length;
if (msg.getCommand().getSubName() != null) {
@ -82,7 +82,7 @@ public class RedisEncoder extends MessageToByteEncoder<RedisData<Object, Object>
}
}
private Encoder encoder(RedisData<Object, Object> msg, int param) {
private Encoder encoder(CommandData<Object, Object> msg, int param) {
int typeIndex = 0;
if (msg.getCommand().getInParamType().size() > 1) {
typeIndex = param;

@ -23,13 +23,13 @@ import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import io.netty.util.internal.PlatformDependent;
public class RedisCommandsQueue extends ChannelDuplexHandler {
public class CommandsQueue extends ChannelDuplexHandler {
public enum QueueCommands {NEXT_COMMAND}
public static final AttributeKey<RedisData<Object, Object>> REPLAY = AttributeKey.valueOf("promise");
public static final AttributeKey<CommandData<Object, Object>> REPLAY = AttributeKey.valueOf("promise");
private final Queue<RedisData<Object, Object>> queue = PlatformDependent.newMpscQueue();
private final Queue<CommandData<Object, Object>> queue = PlatformDependent.newMpscQueue();
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
@ -43,8 +43,8 @@ public class RedisCommandsQueue extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof RedisData) {
RedisData<Object, Object> data = (RedisData<Object, Object>) msg;
if (msg instanceof CommandData) {
CommandData<Object, Object> data = (CommandData<Object, Object>) msg;
if (data.getSended().get()) {
super.write(ctx, msg, promise);
} else {
@ -57,7 +57,7 @@ public class RedisCommandsQueue extends ChannelDuplexHandler {
}
private void sendData(ChannelHandlerContext ctx) throws Exception {
RedisData<Object, Object> data = queue.peek();
CommandData<Object, Object> data = queue.peek();
if (data != null && data.getSended().compareAndSet(false, true)) {
ctx.channel().attr(REPLAY).set(data);
ctx.channel().writeAndFlush(data);

@ -0,0 +1,91 @@
package org.redisson.client.handler;
import java.util.concurrent.TimeUnit;
import org.redisson.client.RedisConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.util.concurrent.GenericFutureListener;
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(getClass());
private Bootstrap bootstrap;
private ChannelGroup channels;
private static final int BACKOFF_CAP = 12;
public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) {
this.bootstrap = bootstrap;
this.channels = channels;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
RedisConnection connection = ctx.channel().attr(RedisConnection.CONNECTION).get();
if (!connection.isClosed()) {
EventLoopGroup group = ctx.channel().eventLoop().parent();
reconnect(group, connection);
}
ctx.fireChannelInactive();
}
private void reconnect(final EventLoopGroup group, final RedisConnection connection){
group.schedule(new Runnable() {
@Override
public void run() {
doReConnect(group, connection, 1);
}
}, 100, TimeUnit.MILLISECONDS);
}
private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) {
if (connection.isClosed()) {
return;
}
log.debug("reconnecting connection {} to {} ", connection, connection.getRedisClient().getAddr(), connection);
bootstrap.connect().addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (connection.isClosed()) {
return;
}
if (future.isSuccess()) {
log.debug("connection {} connected to {}", connection, connection.getRedisClient().getAddr());
connection.updateChannel(future.channel());
return;
}
int timeout = 2 << attempts;
group.schedule(new Runnable() {
@Override
public void run() {
doReConnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1));
}
}, timeout, TimeUnit.MILLISECONDS);
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
}
}

@ -43,7 +43,7 @@ import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.handler.RedisData;
import org.redisson.client.handler.CommandData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.StringCodec;
@ -417,7 +417,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
connection = connectionWriteOp(slot);
}
log.debug("readAsync for slot {} using {}", slot, connection.getRedisClient().getAddr());
connection.send(new RedisData<V, R>(attemptPromise, messageDecoder, codec, command, params));
connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ex.set(new RedisTimeoutException());
Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);

Loading…
Cancel
Save