diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index ad43da42e..bf32fb690 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -16,16 +16,14 @@ package org.redisson.client; import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutionException; -import org.redisson.client.handler.RedisCommandsQueue; -import org.redisson.client.handler.RedisDecoder; -import org.redisson.client.handler.RedisEncoder; +import org.redisson.client.handler.CommandDecoder; +import org.redisson.client.handler.CommandEncoder; +import org.redisson.client.handler.CommandsQueue; +import org.redisson.client.handler.ConnectionWatchdog; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.StringCodec; -import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -39,7 +37,6 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; public class RedisClient { @@ -58,14 +55,13 @@ public class RedisClient { addr = new InetSocketAddress(host, port); bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr); bootstrap.handler(new ChannelInitializer() { - @Override protected void initChannel(Channel ch) throws Exception { - ch.pipeline().addFirst(new RedisEncoder(), - new RedisCommandsQueue(), - new RedisDecoder()); + ch.pipeline().addFirst(new ConnectionWatchdog(bootstrap, channels), + new CommandEncoder(), + new CommandsQueue(), + new CommandDecoder()); } - }); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); @@ -88,7 +84,6 @@ public class RedisClient { try { ChannelFuture future = bootstrap.connect(); future.syncUninterruptibly(); - channels.add(future.channel()); return new RedisConnection(this, future.channel()); } catch (Exception e) { throw new RedisConnectionException("unable to connect", e); @@ -99,7 +94,6 @@ public class RedisClient { try { ChannelFuture future = bootstrap.connect(); future.syncUninterruptibly(); - channels.add(future.channel()); return new RedisPubSubConnection(this, future.channel()); } catch (Exception e) { throw new RedisConnectionException("unable to connect", e); diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 838a4f235..204e0d588 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -17,7 +17,7 @@ package org.redisson.client; import java.util.concurrent.TimeUnit; -import org.redisson.client.handler.RedisData; +import org.redisson.client.handler.CommandData; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -25,18 +25,29 @@ import org.redisson.client.protocol.RedisStrictCommand; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; public class RedisConnection implements RedisCommands { - final Channel channel; + public static final AttributeKey CONNECTION = AttributeKey.valueOf("connection"); + final RedisClient redisClient; + private volatile boolean closed; + volatile Channel channel; + public RedisConnection(RedisClient redisClient, Channel channel) { super(); this.redisClient = redisClient; this.channel = channel; + + channel.attr(CONNECTION).set(this); + } + + public void updateChannel(Channel channel) { + this.channel = channel; } public RedisClient getRedisClient() { @@ -64,7 +75,7 @@ public class RedisConnection implements RedisCommands { return await(r); } - public void send(RedisData data) { + public void send(CommandData data) { channel.writeAndFlush(data); } @@ -75,11 +86,20 @@ public class RedisConnection implements RedisCommands { public Future async(Codec encoder, RedisCommand command, Object ... params) { Promise promise = redisClient.getBootstrap().group().next().newPromise(); - channel.writeAndFlush(new RedisData(promise, encoder, command, params)); + channel.writeAndFlush(new CommandData(promise, encoder, command, params)); return promise; } + public void setClosed(boolean reconnect) { + this.closed = reconnect; + } + + public boolean isClosed() { + return closed; + } + public ChannelFuture closeAsync() { + setClosed(true); return channel.close(); } diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index 0593cec86..004c00dbc 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -17,7 +17,7 @@ package org.redisson.client; import java.util.concurrent.ConcurrentLinkedQueue; -import org.redisson.client.handler.RedisData; +import org.redisson.client.handler.CommandData; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -29,20 +29,15 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import io.netty.channel.Channel; -import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; public class RedisPubSubConnection extends RedisConnection { - public static final AttributeKey CONNECTION = AttributeKey.valueOf("connection"); - final ConcurrentLinkedQueue> listeners = new ConcurrentLinkedQueue>(); public RedisPubSubConnection(RedisClient redisClient, Channel channel) { super(redisClient, channel); - - channel.attr(CONNECTION).set(this); } public void addListener(RedisPubSubListener listener) { @@ -83,7 +78,7 @@ public class RedisPubSubConnection extends RedisConnection { public Future async(MultiDecoder messageDecoder, RedisCommand command, Object ... params) { Promise promise = redisClient.getBootstrap().group().next().newPromise(); - channel.writeAndFlush(new RedisData(promise, messageDecoder, null, command, params)); + channel.writeAndFlush(new CommandData(promise, messageDecoder, null, command, params)); return promise; } diff --git a/src/main/java/org/redisson/client/handler/RedisData.java b/src/main/java/org/redisson/client/handler/CommandData.java similarity index 86% rename from src/main/java/org/redisson/client/handler/RedisData.java rename to src/main/java/org/redisson/client/handler/CommandData.java index d07bee59b..5cd18c70c 100644 --- a/src/main/java/org/redisson/client/handler/RedisData.java +++ b/src/main/java/org/redisson/client/handler/CommandData.java @@ -23,7 +23,7 @@ import org.redisson.client.protocol.decoder.MultiDecoder; import io.netty.util.concurrent.Promise; -public class RedisData { +public class CommandData { final Promise promise; final RedisCommand command; @@ -32,11 +32,11 @@ public class RedisData { final AtomicBoolean sended = new AtomicBoolean(); final MultiDecoder messageDecoder; - public RedisData(Promise promise, Codec codec, RedisCommand command, Object[] params) { + public CommandData(Promise promise, Codec codec, RedisCommand command, Object[] params) { this(promise, null, codec, command, params); } - public RedisData(Promise promise, MultiDecoder messageDecoder, Codec codec, RedisCommand command, Object[] params) { + public CommandData(Promise promise, MultiDecoder messageDecoder, Codec codec, RedisCommand command, Object[] params) { this.promise = promise; this.command = command; this.params = params; diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java similarity index 87% rename from src/main/java/org/redisson/client/handler/RedisDecoder.java rename to src/main/java/org/redisson/client/handler/CommandDecoder.java index 629eccef5..f726ae514 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -25,7 +25,7 @@ import java.util.Map; import org.redisson.client.RedisException; import org.redisson.client.RedisMovedException; import org.redisson.client.RedisPubSubConnection; -import org.redisson.client.handler.RedisCommandsQueue.QueueCommands; +import org.redisson.client.handler.CommandsQueue.QueueCommands; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.MultiDecoder; @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import io.netty.util.CharsetUtil; @@ -45,7 +46,7 @@ import io.netty.util.CharsetUtil; * @author Nikita Koksharov * */ -public class RedisDecoder extends ReplayingDecoder { +public class CommandDecoder extends ReplayingDecoder { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -57,8 +58,7 @@ public class RedisDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - RedisData data = ctx.channel().attr(RedisCommandsQueue.REPLAY).get(); - RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get(); + CommandData data = ctx.channel().attr(CommandsQueue.REPLAY).get(); Decoder currentDecoder = null; if (data == null) { @@ -75,22 +75,21 @@ public class RedisDecoder extends ReplayingDecoder { } try { - decode(in, data, null, pubSubConnection, currentDecoder); + decode(in, data, null, ctx.channel(), currentDecoder); } catch (IOException e) { data.getPromise().setFailure(e); } - ctx.channel().attr(RedisCommandsQueue.REPLAY).remove(); + ctx.channel().attr(CommandsQueue.REPLAY).remove(); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); } - private void decode(ByteBuf in, RedisData data, List parts, RedisPubSubConnection pubSubConnection, Decoder currentDecoder) throws IOException { + private void decode(ByteBuf in, CommandData data, List parts, Channel channel, Decoder currentDecoder) throws IOException { int code = in.readByte(); if (code == '+') { String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); in.skipBytes(2); -// Object result = data.getCommand().getReplayDecoder().decode(in); handleResult(data, parts, result); } else if (code == '-') { String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); @@ -119,18 +118,18 @@ public class RedisDecoder extends ReplayingDecoder { long size = readLong(in); List respParts = new ArrayList(); for (int i = 0; i < size; i++) { - decode(in, data, respParts, pubSubConnection, currentDecoder); + decode(in, data, respParts, channel, currentDecoder); } Object result = messageDecoder(data, respParts).decode(respParts); - handleMultiResult(data, parts, pubSubConnection, result); + handleMultiResult(data, parts, channel, result); } else { throw new IllegalStateException("Can't decode replay " + (char)code); } } - private void handleMultiResult(RedisData data, List parts, - RedisPubSubConnection pubSubConnection, Object result) { + private void handleMultiResult(CommandData data, List parts, + Channel channel, Object result) { if (data != null) { if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) { for (Object param : data.getParams()) { @@ -149,6 +148,7 @@ public class RedisDecoder extends ReplayingDecoder { data.getPromise().setSuccess(result); } } else { + RedisPubSubConnection pubSubConnection = (RedisPubSubConnection)channel.attr(RedisPubSubConnection.CONNECTION).get(); if (result instanceof PubSubMessage) { pubSubConnection.onMessage((PubSubMessage) result); } else { @@ -157,7 +157,7 @@ public class RedisDecoder extends ReplayingDecoder { } } - private void handleResult(RedisData data, List parts, Object result) { + private void handleResult(CommandData data, List parts, Object result) { if (data != null) { result = data.getCommand().getConvertor().convert(result); } @@ -168,7 +168,7 @@ public class RedisDecoder extends ReplayingDecoder { } } - private MultiDecoder messageDecoder(RedisData data, List parts) { + private MultiDecoder messageDecoder(CommandData data, List parts) { if (data == null) { if (parts.get(0).equals("message")) { String channelName = (String) parts.get(1); @@ -182,7 +182,7 @@ public class RedisDecoder extends ReplayingDecoder { return data.getCommand().getReplayMultiDecoder(); } - private Decoder decoder(RedisData data, List parts, Decoder currentDecoder) { + private Decoder decoder(CommandData data, List parts, Decoder currentDecoder) { if (data == null) { if (parts.size() == 2 && parts.get(0).equals("message")) { String channelName = (String) parts.get(1); diff --git a/src/main/java/org/redisson/client/handler/RedisEncoder.java b/src/main/java/org/redisson/client/handler/CommandEncoder.java similarity index 96% rename from src/main/java/org/redisson/client/handler/RedisEncoder.java rename to src/main/java/org/redisson/client/handler/CommandEncoder.java index 306ef1770..ab3a831db 100644 --- a/src/main/java/org/redisson/client/handler/RedisEncoder.java +++ b/src/main/java/org/redisson/client/handler/CommandEncoder.java @@ -32,7 +32,7 @@ import io.netty.util.CharsetUtil; * @author Nikita Koksharov * */ -public class RedisEncoder extends MessageToByteEncoder> { +public class CommandEncoder extends MessageToByteEncoder> { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -43,7 +43,7 @@ public class RedisEncoder extends MessageToByteEncoder final byte[] CRLF = "\r\n".getBytes(); @Override - protected void encode(ChannelHandlerContext ctx, RedisData msg, ByteBuf out) throws Exception { + protected void encode(ChannelHandlerContext ctx, CommandData msg, ByteBuf out) throws Exception { out.writeByte(ARGS_PREFIX); int len = 1 + msg.getParams().length; if (msg.getCommand().getSubName() != null) { @@ -82,7 +82,7 @@ public class RedisEncoder extends MessageToByteEncoder } } - private Encoder encoder(RedisData msg, int param) { + private Encoder encoder(CommandData msg, int param) { int typeIndex = 0; if (msg.getCommand().getInParamType().size() > 1) { typeIndex = param; diff --git a/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java b/src/main/java/org/redisson/client/handler/CommandsQueue.java similarity index 81% rename from src/main/java/org/redisson/client/handler/RedisCommandsQueue.java rename to src/main/java/org/redisson/client/handler/CommandsQueue.java index 346b78bf9..b12d86bee 100644 --- a/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java +++ b/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -23,13 +23,13 @@ import io.netty.channel.ChannelPromise; import io.netty.util.AttributeKey; import io.netty.util.internal.PlatformDependent; -public class RedisCommandsQueue extends ChannelDuplexHandler { +public class CommandsQueue extends ChannelDuplexHandler { public enum QueueCommands {NEXT_COMMAND} - public static final AttributeKey> REPLAY = AttributeKey.valueOf("promise"); + public static final AttributeKey> REPLAY = AttributeKey.valueOf("promise"); - private final Queue> queue = PlatformDependent.newMpscQueue(); + private final Queue> queue = PlatformDependent.newMpscQueue(); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { @@ -43,8 +43,8 @@ public class RedisCommandsQueue extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof RedisData) { - RedisData data = (RedisData) msg; + if (msg instanceof CommandData) { + CommandData data = (CommandData) msg; if (data.getSended().get()) { super.write(ctx, msg, promise); } else { @@ -57,7 +57,7 @@ public class RedisCommandsQueue extends ChannelDuplexHandler { } private void sendData(ChannelHandlerContext ctx) throws Exception { - RedisData data = queue.peek(); + CommandData data = queue.peek(); if (data != null && data.getSended().compareAndSet(false, true)) { ctx.channel().attr(REPLAY).set(data); ctx.channel().writeAndFlush(data); diff --git a/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java new file mode 100644 index 000000000..595d509be --- /dev/null +++ b/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -0,0 +1,91 @@ +package org.redisson.client.handler; + +import java.util.concurrent.TimeUnit; + +import org.redisson.client.RedisConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.util.concurrent.GenericFutureListener; + +public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private Bootstrap bootstrap; + private ChannelGroup channels; + private static final int BACKOFF_CAP = 12; + + public ConnectionWatchdog(Bootstrap bootstrap, ChannelGroup channels) { + this.bootstrap = bootstrap; + this.channels = channels; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channels.add(ctx.channel()); + ctx.fireChannelActive(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + RedisConnection connection = ctx.channel().attr(RedisConnection.CONNECTION).get(); + if (!connection.isClosed()) { + EventLoopGroup group = ctx.channel().eventLoop().parent(); + reconnect(group, connection); + } + ctx.fireChannelInactive(); + } + + private void reconnect(final EventLoopGroup group, final RedisConnection connection){ + group.schedule(new Runnable() { + @Override + public void run() { + doReConnect(group, connection, 1); + } + }, 100, TimeUnit.MILLISECONDS); + } + + private void doReConnect(final EventLoopGroup group, final RedisConnection connection, final int attempts) { + if (connection.isClosed()) { + return; + } + + log.debug("reconnecting connection {} to {} ", connection, connection.getRedisClient().getAddr(), connection); + + bootstrap.connect().addListener(new GenericFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (connection.isClosed()) { + return; + } + + if (future.isSuccess()) { + log.debug("connection {} connected to {}", connection, connection.getRedisClient().getAddr()); + connection.updateChannel(future.channel()); + return; + } + + int timeout = 2 << attempts; + group.schedule(new Runnable() { + @Override + public void run() { + doReConnect(group, connection, Math.min(BACKOFF_CAP, attempts + 1)); + } + }, timeout, TimeUnit.MILLISECONDS); + } + }); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.channel().close(); + } + +} diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index c538f5eb8..cce9e533d 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -43,7 +43,7 @@ import org.redisson.client.RedisMovedException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; -import org.redisson.client.handler.RedisData; +import org.redisson.client.handler.CommandData; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.StringCodec; @@ -417,7 +417,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { connection = connectionWriteOp(slot); } log.debug("readAsync for slot {} using {}", slot, connection.getRedisClient().getAddr()); - connection.send(new RedisData(attemptPromise, messageDecoder, codec, command, params)); + connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); ex.set(new RedisTimeoutException()); Timeout timeout = timer.newTimeout(timerTask, config.getTimeout(), TimeUnit.MILLISECONDS);