From 120fd686a26a8a39ebbcb9b0505ca16dbe15d277 Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 15 Sep 2016 12:19:56 +0300 Subject: [PATCH] Log unexpected errors in netty handlers. #617 --- .../client/handler/CommandsQueue.java | 20 ++++++++++++++++++- .../client/handler/ConnectionWatchdog.java | 2 +- .../client/protocol/BatchCommandData.java | 7 +++++++ .../redisson/client/protocol/CommandData.java | 7 +++++++ .../client/protocol/CommandsData.java | 10 ++++++++++ .../client/protocol/QueueCommand.java | 7 +++++++ 6 files changed, 51 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java index 4aa4efae2..626382ebb 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -21,6 +21,8 @@ import java.util.Queue; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommandHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -39,6 +41,8 @@ import io.netty.util.internal.PlatformDependent; */ public class CommandsQueue extends ChannelOutboundHandlerAdapter { + private static final Logger log = LoggerFactory.getLogger(CommandsQueue.class); + public static final AttributeKey CURRENT_COMMAND = AttributeKey.valueOf("promise"); private final Queue queue = PlatformDependent.newMpscQueue(); @@ -53,7 +57,7 @@ public class CommandsQueue extends ChannelOutboundHandlerAdapter { }; public void sendNextCommand(Channel channel) { - channel.attr(CommandsQueue.CURRENT_COMMAND).remove(); + channel.attr(CommandsQueue.CURRENT_COMMAND).set(null); queue.poll(); sendData(channel); } @@ -94,4 +98,18 @@ public class CommandsQueue extends ChannelOutboundHandlerAdapter { } } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + QueueCommand command = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); + if (command != null) { + if (!command.tryFailure(cause)) { + log.error("Exception occured. Channel: " + ctx.channel() + " Command: " + command, cause); + } + sendNextCommand(ctx.channel()); + return; + } + + log.error("Exception occured. Channel: " + ctx.channel(), cause); + } + } diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 897b96ee9..d9bb84fca 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -40,7 +40,6 @@ import io.netty.util.Timer; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ImmediateEventExecutor; public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { @@ -156,6 +155,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); + super.exceptionCaught(ctx, cause); } private void refresh(RedisConnection connection, Channel channel) { diff --git a/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java b/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java index 3173169f9..23f2ff17b 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/BatchCommandData.java @@ -21,6 +21,13 @@ import org.redisson.client.RedisRedirectException; import org.redisson.client.codec.Codec; import org.redisson.misc.RPromise; +/** + * + * @author Nikita Koksharov + * + * @param input type + * @param output type + */ public class BatchCommandData extends CommandData implements Comparable> { private final int index; diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandData.java index 9f470c549..6d51b4ee6 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandData.java @@ -23,6 +23,13 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.misc.RPromise; +/** + * + * @author Nikita Koksharov + * + * @param input type + * @param output type + */ public class CommandData implements QueueCommand { final RPromise promise; diff --git a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java index b30556f09..06dce5992 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java +++ b/redisson/src/main/java/org/redisson/client/protocol/CommandsData.java @@ -20,6 +20,11 @@ import java.util.List; import org.redisson.misc.RPromise; +/** + * + * @author Nikita Koksharov + * + */ public class CommandsData implements QueueCommand { private final List> commands; @@ -50,4 +55,9 @@ public class CommandsData implements QueueCommand { return result; } + @Override + public boolean tryFailure(Throwable cause) { + return promise.tryFailure(cause); + } + } diff --git a/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java b/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java index aca015b01..153ad7170 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java +++ b/redisson/src/main/java/org/redisson/client/protocol/QueueCommand.java @@ -20,6 +20,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +/** + * + * @author Nikita Koksharov + * + */ public interface QueueCommand { Set PUBSUB_COMMANDS = new HashSet(Arrays.asList("PSUBSCRIBE", "SUBSCRIBE", "PUNSUBSCRIBE", "UNSUBSCRIBE")); @@ -29,4 +34,6 @@ public interface QueueCommand { List> getPubSubOperations(); + boolean tryFailure(Throwable cause); + }