diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 5213d2606..82571db55 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -91,7 +91,7 @@ public class CommandDecoder extends ReplayingDecoder { int endIndex = skipCommand(in); try { - decode(ctx, in, data); + decode(ctx, in, data, 0); } catch (Exception e) { in.readerIndex(endIndex); throw e; @@ -101,25 +101,40 @@ public class CommandDecoder extends ReplayingDecoder { int endIndex = 0; if (!(data instanceof CommandsData)) { endIndex = skipCommand(in); + } else { + endIndex = skipBatchCommand(in, (CommandsData) data); } - - try { - decode(ctx, in, data); - } catch (Exception e) { - if (!(data instanceof CommandsData)) { - in.readerIndex(endIndex); - } - throw e; + if (data.isExecuted()) { + in.readerIndex(endIndex); + sendNext(ctx.channel()); + return; } + + decode(ctx, in, data, endIndex); + } + } + + private int skipBatchCommand(ByteBuf in, CommandsData data) throws IOException { + int commandsAmount = 1; + if (!data.isSkipResult()) { + commandsAmount = data.getCommands().size(); + } + + in.markReaderIndex(); + for (int i = 0; i < commandsAmount; i++) { + skipDecode(in); } + int endIndex = in.readerIndex(); + in.resetReaderIndex(); + return endIndex; } - private void decode(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception { + private void decode(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, int endIndex) throws Exception { if (log.isTraceEnabled()) { log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data); } - decodeCommand(ctx.channel(), in, data); + decodeCommand(ctx.channel(), in, data, endIndex); } protected void sendNext(Channel channel, QueueCommand data) { @@ -157,7 +172,7 @@ public class CommandDecoder extends ReplayingDecoder { } } } - + private void skipBytes(ByteBuf is) throws IOException { long l = readLong(is); if (l > Integer.MAX_VALUE) { @@ -176,7 +191,7 @@ public class CommandDecoder extends ReplayingDecoder { in.skipBytes(len + 2); } - protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception { + protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data, int endIndex) throws Exception { if (data instanceof CommandData) { CommandData cmd = (CommandData) data; try { @@ -184,8 +199,9 @@ public class CommandDecoder extends ReplayingDecoder { sendNext(channel, data); } catch (Exception e) { log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in) + ", command: " + LogHelper.toString(data), e); - cmd.tryFailure(e); + in.readerIndex(endIndex); sendNext(channel); + cmd.tryFailure(e); throw e; } } else if (data instanceof CommandsData) { @@ -193,8 +209,9 @@ public class CommandDecoder extends ReplayingDecoder { try { decodeCommandBatch(channel, in, commands); } catch (Exception e) { - commands.getPromise().tryFailure(e); + in.readerIndex(endIndex); sendNext(channel); + commands.getPromise().tryFailure(e); throw e; } } else { diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index 091ef5ba4..b515fcbca 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -77,7 +77,7 @@ public class CommandPubSubDecoder extends CommandDecoder { } @Override - protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception { + protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data, int endIndex) throws Exception { if (data == null) { try { while (in.writerIndex() > in.readerIndex()) {