diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index 873d5b375..c9ef09048 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -44,14 +44,11 @@ import org.redisson.client.RedisMovedException; import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.RedisTimeoutException; import org.redisson.client.RedisTryAgainException; -import org.redisson.client.codec.BaseCodec; -import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.Decoder; -import org.redisson.client.protocol.Encoder; import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -63,7 +60,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; @@ -78,30 +74,6 @@ import io.netty.util.concurrent.FastThreadLocal; */ public class CommandDecoder extends ReplayingDecoder { - public static class NullCodec extends BaseCodec { - - public static final NullCodec INSTANCE = new NullCodec(); - - private final Decoder decoder = new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) { - return new Object(); - } - }; - - @Override - public Decoder getValueDecoder() { - return decoder; - } - - @Override - public Encoder getValueEncoder() { - throw new UnsupportedOperationException(); - } - - } - - final Logger log = LoggerFactory.getLogger(getClass()); private static final char CR = '\r'; @@ -113,13 +85,6 @@ public class CommandDecoder extends ReplayingDecoder { final ExecutorService executor; private final boolean decodeInExecutor; - private final FastThreadLocal decoderStatus = new FastThreadLocal() { - @Override - protected Status initialValue() { - return Status.NORMAL; - }; - }; - private final FastThreadLocal state = new FastThreadLocal(); public CommandDecoder(ExecutorService executor, boolean decodeInExecutor) { @@ -128,57 +93,116 @@ public class CommandDecoder extends ReplayingDecoder { } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - final QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); + protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); if (log.isTraceEnabled()) { log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data); } + if (state.get() == null) { state.set(new State()); } - + state.get().setDecoderState(null); - in.markReaderIndex(); - decodeCommand(ctx.channel(), in, data); - - if (decoderStatus.get() == Status.FILL_BUFFER) { + if (data == null) { + while (in.writerIndex() > in.readerIndex()) { + in.markReaderIndex(); + skipCommand(in); + in.resetReaderIndex(); + + decode(ctx, in, data); + } + } else { + in.markReaderIndex(); + if (data instanceof CommandsData) { + CommandsData cmd = (CommandsData) data; + if (!cmd.isSkipResult()) { + for (int j = 0; j < cmd.getCommands().size(); j++) { + skipCommand(in); + } + } + } else { + skipCommand(in); + } in.resetReaderIndex(); - final ByteBuf copy = ByteBufAllocator.DEFAULT.buffer(in.writerIndex()); - in.readBytes(copy); - state.set(null); - decoderStatus.set(Status.NORMAL); - final Channel channel = ctx.channel(); + decode(ctx, in, data); + } + } + + private void decode(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception { + if (decodeInExecutor) { + ByteBuf copy = in.copy(in.readerIndex(), in.writerIndex() - in.readerIndex()); + in.skipBytes(in.writerIndex() - in.readerIndex()); executor.execute(() -> { - decoderStatus.set(Status.DECODE_BUFFER); state.set(new State()); state.get().setDecoderState(null); + try { - decodeCommand(channel, copy, data); + decodeCommand(ctx.channel(), copy, data); } catch (Exception e) { log.error("Unable to decode data in separate thread: " + LogHelper.toString(data), e); } finally { copy.release(); - decoderStatus.remove(); - state.remove(); } }); + } else { + decodeCommand(ctx.channel(), in, data); } - } protected void sendNext(Channel channel, QueueCommand data) { if (data != null) { - if (decoderStatus.get() == Status.FILL_BUFFER || data.isExecuted()) { + if (data.isExecuted()) { sendNext(channel); } } else { sendNext(channel); } } - + + protected void skipCommand(ByteBuf in) throws Exception { + skipDecode(in); + } + + protected void skipDecode(ByteBuf in) throws IOException{ + int code = in.readByte(); + if (code == '+') { + skipString(in); + } else if (code == '-') { + skipString(in); + } else if (code == ':') { + skipString(in); + } else if (code == '$') { + skipBytes(in); + } else if (code == '*') { + long size = readLong(in); + for (int i = 0; i < size; i++) { + skipDecode(in); + } + } + } + + private void skipBytes(ByteBuf is) throws IOException { + long l = readLong(is); + if (l > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "Java only supports arrays up to " + Integer.MAX_VALUE + " in size"); + } + int size = (int) l; + if (size == -1) { + return; + } + is.skipBytes(size + 2); + } + + private void skipString(ByteBuf in) { + int len = in.bytesBefore((byte) '\r'); + in.skipBytes(len + 2); + } + protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception { if (data instanceof CommandData) { CommandData cmd = (CommandData) data; @@ -188,7 +212,6 @@ public class CommandDecoder extends ReplayingDecoder { } catch (Exception e) { log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in) + ", command: " + LogHelper.toString(data), e); cmd.tryFailure(e); - decoderStatus.set(Status.NORMAL); sendNext(channel); throw e; } @@ -198,7 +221,6 @@ public class CommandDecoder extends ReplayingDecoder { decodeCommandBatch(channel, in, data, commands); } catch (Exception e) { commands.getPromise().tryFailure(e); - decoderStatus.set(Status.NORMAL); sendNext(channel); throw e; } @@ -210,7 +232,6 @@ public class CommandDecoder extends ReplayingDecoder { sendNext(channel); } catch (Exception e) { log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in), e); - decoderStatus.set(Status.NORMAL); sendNext(channel); throw e; } @@ -218,27 +239,19 @@ public class CommandDecoder extends ReplayingDecoder { } protected void sendNext(Channel channel) { - if (decoderStatus.get() != Status.DECODE_BUFFER) { - channel.pipeline().get(CommandsQueue.class).sendNextCommand(channel); - state.set(null); - } + channel.pipeline().get(CommandsQueue.class).sendNextCommand(channel); + state.set(null); } private void decodeCommandBatch(Channel channel, ByteBuf in, QueueCommand data, CommandsData commandBatch) throws Exception { - int i = 0; - if (decoderStatus.get() != Status.DECODE_BUFFER) { - i = state.get().getBatchIndex(); - } + int i = state.get().getBatchIndex(); Throwable error = null; while (in.writerIndex() > in.readerIndex()) { CommandData commandData = null; try { - if (decoderStatus.get() != Status.DECODE_BUFFER) { - checkpoint(); - state.get().setBatchIndex(i); - } + state.get().setBatchIndex(i); RedisCommand cmd = commandBatch.getCommands().get(i).getCommand(); boolean skipConvertor = commandBatch.isQueued(); List> commandsData = null; @@ -291,25 +304,20 @@ public class CommandDecoder extends ReplayingDecoder { } if (commandBatch.isSkipResult() || i == commandBatch.getCommands().size()) { - if (decoderStatus.get() != Status.FILL_BUFFER) { - RPromise promise = commandBatch.getPromise(); - if (error != null) { - if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { - log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data)); - } - } else { - if (!promise.trySuccess(null) && promise.cause() instanceof RedisTimeoutException) { - log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data)); - } + RPromise promise = commandBatch.getPromise(); + if (error != null) { + if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { + log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data)); + } + } else { + if (!promise.trySuccess(null) && promise.cause() instanceof RedisTimeoutException) { + log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data)); } } sendNext(channel); } else { - if (decoderStatus.get() != Status.DECODE_BUFFER) { - checkpoint(); - state.get().setBatchIndex(i); - } + state.get().setBatchIndex(i); } } @@ -384,7 +392,7 @@ public class CommandDecoder extends ReplayingDecoder { in.skipBytes(len + 2); return result; } - + @SuppressWarnings("unchecked") private void decodeList(ByteBuf in, CommandData data, List parts, Channel channel, long size, List respParts, boolean skipConvertor, List> commandsData) @@ -412,10 +420,6 @@ public class CommandDecoder extends ReplayingDecoder { return; } - if (decoderStatus.get() == Status.FILL_BUFFER) { - return; - } - Object result = decoder.decode(respParts, state.get()); decodeResult(data, parts, channel, result); } @@ -428,7 +432,7 @@ public class CommandDecoder extends ReplayingDecoder { } private void handleResult(CommandData data, List parts, Object result, boolean skipConvertor, Channel channel) { - if (data != null && !skipConvertor && decoderStatus.get() != Status.FILL_BUFFER) { + if (data != null && !skipConvertor) { result = data.getCommand().getConvertor().convert(result); } if (parts != null) { @@ -439,10 +443,6 @@ public class CommandDecoder extends ReplayingDecoder { } protected void completeResponse(CommandData data, Object result, Channel channel) { - if (decoderStatus.get() == Status.FILL_BUFFER) { - return; - } - if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) { log.warn("response has been skipped due to timeout! channel: {}, command: {}, result: {}", channel, LogHelper.toString(data), LogHelper.toString(result)); } @@ -473,15 +473,6 @@ public class CommandDecoder extends ReplayingDecoder { } Codec codec = data.getCodec(); - if (decodeInExecutor && !(codec instanceof StringCodec || codec instanceof ByteArrayCodec)) { - if (decoderStatus.get() == Status.NORMAL) { - decoderStatus.set(Status.FILL_BUFFER); - codec = NullCodec.INSTANCE; - } else if (decoderStatus.get() == Status.FILL_BUFFER) { - codec = NullCodec.INSTANCE; - } - } - Decoder decoder = data.getCommand().getReplayDecoder(); if (decoder == null) { if (codec == null) {