From 13abe16511bedfdbce4ca36e1df931c3a230dbba Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 24 Dec 2018 10:10:20 +0300 Subject: [PATCH] Fixed - CommandDecoder throws IndexOutOfBoundsException if pingConnectionInterval param is used #1497 Fixed - Unable to send command! error if pingConnectionInterval param is used #1632 --- .../client/RedisPubSubConnection.java | 4 +- .../client/handler/CommandDecoder.java | 144 ++++++------------ .../client/handler/CommandPubSubDecoder.java | 13 +- .../client/handler/CommandsQueue.java | 15 +- .../org/redisson/client/handler/State.java | 26 ++-- .../redisson/client/handler/StateLevel.java | 27 ++-- 6 files changed, 97 insertions(+), 132 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java index f5d5c1494..ff1a78de9 100644 --- a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -34,6 +34,7 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -166,7 +167,8 @@ public class RedisPubSubConnection extends RedisConnection { } private ChannelFuture async(MultiDecoder messageDecoder, RedisCommand command, Object ... params) { - return channel.writeAndFlush(new CommandData(null, messageDecoder, null, command, params)); + RPromise promise = new RedissonPromise(); + return channel.writeAndFlush(new CommandData(promise, messageDecoder, null, command, params)); } public Map getChannels() { diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index d825dc532..6286448e3 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -51,9 +51,7 @@ import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder; -import org.redisson.client.protocol.decoder.SlotsDecoder; import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; import org.slf4j.Logger; @@ -87,19 +85,23 @@ public class CommandDecoder extends ReplayingDecoder { log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data); } if (state() == null) { - boolean makeCheckpoint = data != null; - if (data != null) { - if (data instanceof CommandsData) { - makeCheckpoint = false; - } else { - CommandData cmd = (CommandData)data; - if (cmd.getCommand().getReplayMultiDecoder() != null - && (SlotsDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()) - || ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) { - makeCheckpoint = false; - } - } - } + boolean makeCheckpoint = false; +// commented out due to https://github.com/redisson/redisson/issues/1632. Reproduced with RedissonMapCacheTest +// +// boolean makeCheckpoint = data != null; +// if (data != null) { +// if (data instanceof CommandsData) { +// makeCheckpoint = false; +// } else { +// CommandData cmd = (CommandData)data; +// MultiDecoder decoder = cmd.getCommand().getReplayMultiDecoder(); +// if (decoder != null +// && (decoder instanceof SlotsDecoder +// || decoder instanceof ListMultiDecoder)) { +// makeCheckpoint = false; +// } +// } +// } state(new State(makeCheckpoint)); } @@ -122,10 +124,10 @@ public class CommandDecoder extends ReplayingDecoder { if (data instanceof CommandData) { CommandData cmd = (CommandData)data; try { - if (state().getLevels().size() > 0) { + if (state().isMakeCheckpoint()) { decodeFromCheckpoint(ctx, in, data, cmd); } else { - decode(in, cmd, null, ctx.channel(), false); + decode(in, cmd, null, ctx, false); } sendNext(ctx, data); } catch (Exception e) { @@ -146,7 +148,7 @@ public class CommandDecoder extends ReplayingDecoder { } else { try { while (in.writerIndex() > in.readerIndex()) { - decode(in, null, null, ctx.channel(), false); + decode(in, null, null, ctx, false); } sendNext(ctx); } catch (Exception e) { @@ -164,54 +166,18 @@ public class CommandDecoder extends ReplayingDecoder { protected void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, CommandData cmd) throws IOException { - if (state().getLevels().size() == 2) { - StateLevel secondLevel = state().getLevels().get(1); - - if (secondLevel.getParts().isEmpty()) { - state().getLevels().remove(1); - } - } + StateLevel level = state().getLastLevel(); - if (state().getLevels().size() == 2) { - StateLevel firstLevel = state().getLevels().get(0); - StateLevel secondLevel = state().getLevels().get(1); - - decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts(), false); - - MultiDecoder decoder = messageDecoder(cmd, firstLevel.getParts()); - if (decoder != null) { - Object result = decoder.decode(firstLevel.getParts(), state()); - if (data != null) { - handleResult(cmd, null, result, true, ctx.channel()); - } - } + List prevParts = null; + if (state().getLevels().size() > 1) { + StateLevel prevLevel = state().getLevels().get(state().getLevel() - 1); + prevParts = prevLevel.getParts(); } - if (state().getLevels().size() == 1) { - StateLevel firstLevel = state().getLevels().get(0); - if (firstLevel.getParts().isEmpty() && firstLevel.getLastList() == null) { - state().resetLevel(); - decode(in, cmd, null, ctx.channel(), false); - } else { - if (firstLevel.getLastList() != null) { - if (firstLevel.getLastList().isEmpty()) { - decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); - } else { - decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList(), false); - } - firstLevel.setLastList(null); - firstLevel.setLastListSize(0); - - while (in.isReadable() && firstLevel.getParts().size() < firstLevel.getSize()) { - decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); - } - decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false); - } else { - while (firstLevel.getSize() == firstLevel.getParts().size() && in.isReadable()) { - decode(in, cmd, firstLevel.getParts(), ctx.channel(), false); - } - decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false); - } - } + + decodeList(in, cmd, prevParts, ctx, level.getSize(), level.getParts(), false); + + if (state().getLastLevel() == level) { + state().removeLastLevel(); } } @@ -244,7 +210,7 @@ public class CommandDecoder extends ReplayingDecoder { } try { - decode(in, commandData, null, ctx.channel(), skipConvertor); + decode(in, commandData, null, ctx, skipConvertor); } finally { if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { commandsData.remove(); @@ -302,8 +268,9 @@ public class CommandDecoder extends ReplayingDecoder { } } - protected void decode(ByteBuf in, CommandData data, List parts, Channel channel, boolean skipConvertor) throws IOException { + protected void decode(ByteBuf in, CommandData data, List parts, ChannelHandlerContext ctx, boolean skipConvertor) throws IOException { int code = in.readByte(); + Channel channel = ctx.channel(); if (code == '+') { ByteBuf rb = in.readBytes(in.bytesBefore((byte) '\r')); try { @@ -365,33 +332,22 @@ public class CommandDecoder extends ReplayingDecoder { handleResult(data, parts, result, false, channel); } else if (code == '*') { long size = readLong(in); - List respParts; + final List respParts = new ArrayList(); - StateLevel lastLevel = state().getLastLevel(); - if (lastLevel != null && lastLevel.getSize() != lastLevel.getParts().size()) { - respParts = new ArrayList(); - lastLevel.setLastListSize(size); - lastLevel.setLastList(respParts); - } else { - int level = state().incLevel(); - if (state().getLevels().size()-1 >= level) { - StateLevel stateLevel = state().getLevels().get(level); - respParts = stateLevel.getParts(); - size = stateLevel.getSize(); - } else { - respParts = new ArrayList(); - if (state().isMakeCheckpoint()) { - state().addLevel(new StateLevel(size, respParts)); - } - } + StateLevel lastLevel = null; + if (state().isMakeCheckpoint()) { + lastLevel = new StateLevel(size, respParts); + state().addLevel(lastLevel); } - decodeList(in, data, parts, channel, size, respParts, skipConvertor); + decodeList(in, data, parts, ctx, size, respParts, skipConvertor); - if (lastLevel != null && lastLevel.getLastList() != null) { - lastLevel.setLastList(null); - lastLevel.setLastListSize(0); + if (state().isMakeCheckpoint()) { + if (lastLevel == state().getLastLevel() && lastLevel.isFull()) { + state().removeLastLevel(); + } } + } else { String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8); throw new IllegalStateException("Can't decode replay: " + dataStr); @@ -400,7 +356,7 @@ public class CommandDecoder extends ReplayingDecoder { @SuppressWarnings("unchecked") private void decodeList(ByteBuf in, CommandData data, List parts, - Channel channel, long size, List respParts, boolean skipConvertor) + ChannelHandlerContext ctx, long size, List respParts, boolean skipConvertor) throws IOException { if (parts == null && commandsData.get() != null) { List> commands = commandsData.get(); @@ -410,7 +366,7 @@ public class CommandDecoder extends ReplayingDecoder { suffix = 1; } CommandData commandData = (CommandData) commands.get(i+suffix); - decode(in, commandData, respParts, channel, skipConvertor); + decode(in, commandData, respParts, ctx, skipConvertor); if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) { data.tryFailure(commandData.cause()); } @@ -421,7 +377,7 @@ public class CommandDecoder extends ReplayingDecoder { } } else { for (int i = respParts.size(); i < size; i++) { - decode(in, data, respParts, channel, skipConvertor); + decode(in, data, respParts, ctx, skipConvertor); if (state().isMakeCheckpoint()) { checkpoint(); } @@ -434,13 +390,13 @@ public class CommandDecoder extends ReplayingDecoder { } Object result = decoder.decode(respParts, state()); - decodeResult(data, parts, channel, result); + decodeResult(data, parts, ctx, result); } - protected void decodeResult(CommandData data, List parts, Channel channel, + protected void decodeResult(CommandData data, List parts, ChannelHandlerContext ctx, Object result) throws IOException { if (data != null) { - handleResult(data, parts, result, true, channel); + handleResult(data, parts, result, true, ctx.channel()); } } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java index b3ecfb034..df7d9a7f6 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandPubSubDecoder.java @@ -39,7 +39,6 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import io.netty.util.internal.PlatformDependent; @@ -75,7 +74,7 @@ public class CommandPubSubDecoder extends CommandDecoder { if (data == null) { try { while (in.writerIndex() > in.readerIndex()) { - decode(in, null, null, ctx.channel(), false); + decode(in, null, null, ctx, false); } sendNext(ctx); } catch (Exception e) { @@ -86,11 +85,11 @@ public class CommandPubSubDecoder extends CommandDecoder { } else if (data instanceof CommandData) { CommandData cmd = (CommandData)data; try { - if (state().getLevels().size() > 0) { + if (state().isMakeCheckpoint()) { decodeFromCheckpoint(ctx, in, data, cmd); } else { while (in.writerIndex() > in.readerIndex()) { - decode(in, cmd, null, ctx.channel(), false); + decode(in, cmd, null, ctx, false); } } sendNext(ctx, data); @@ -104,7 +103,7 @@ public class CommandPubSubDecoder extends CommandDecoder { } @Override - protected void decodeResult(CommandData data, List parts, Channel channel, + protected void decodeResult(CommandData data, List parts, ChannelHandlerContext ctx, final Object result) throws IOException { if (executor.isShutdown()) { return; @@ -113,7 +112,7 @@ public class CommandPubSubDecoder extends CommandDecoder { if (result instanceof Message) { checkpoint(); - final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); + final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(ctx.channel()); ChannelName channelName = ((Message) result).getChannel(); if (result instanceof PubSubStatusMessage) { String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); @@ -161,7 +160,7 @@ public class CommandPubSubDecoder extends CommandDecoder { } } else { if (data != null && data.getCommand().getName().equals("PING")) { - super.decodeResult(data, parts, channel, result); + super.decodeResult(data, parts, ctx, result); } } } diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java index 8d7f35e8e..756d3b38b 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandsQueue.java @@ -65,8 +65,19 @@ public class CommandsQueue extends ChannelDuplexHandler { }; public void sendNextCommand(Channel channel) { - channel.attr(CommandsQueue.CURRENT_COMMAND).set(null); - queue.poll(); + QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).getAndSet(null); + if (command != null) { + queue.poll(); + } else { + QueueCommandHolder c = queue.peek(); + if (c != null) { + QueueCommand data = c.getCommand(); + List> pubSubOps = data.getPubSubOperations(); + if (!pubSubOps.isEmpty()) { + queue.poll(); + } + } + } sendData(channel); } diff --git a/redisson/src/main/java/org/redisson/client/handler/State.java b/redisson/src/main/java/org/redisson/client/handler/State.java index 18706dc0b..57c794666 100644 --- a/redisson/src/main/java/org/redisson/client/handler/State.java +++ b/redisson/src/main/java/org/redisson/client/handler/State.java @@ -21,6 +21,11 @@ import java.util.List; import org.redisson.client.protocol.decoder.DecoderState; +/** + * + * @author Nikita Koksharov + * + */ public class State { private int batchIndex; @@ -37,18 +42,11 @@ public class State { public boolean isMakeCheckpoint() { return makeCheckpoint; } - - public void resetLevel() { - level = -1; - levels.clear(); - } - public int decLevel() { - return --level; - } - public int incLevel() { - return ++level; - } + public int getLevel() { + return level; + } + public StateLevel getLastLevel() { if (levels == null || levels.isEmpty()) { return null; @@ -61,7 +59,13 @@ public class State { levels = new ArrayList(2); } levels.add(stateLevel); + level++; } + public void removeLastLevel() { + levels.remove(level); + level--; + } + public List getLevels() { if (levels == null) { return Collections.emptyList(); diff --git a/redisson/src/main/java/org/redisson/client/handler/StateLevel.java b/redisson/src/main/java/org/redisson/client/handler/StateLevel.java index 210ecc19f..5c821ac6c 100644 --- a/redisson/src/main/java/org/redisson/client/handler/StateLevel.java +++ b/redisson/src/main/java/org/redisson/client/handler/StateLevel.java @@ -17,12 +17,15 @@ package org.redisson.client.handler; import java.util.List; +/** + * + * @author Nikita Koksharov + * + */ public class StateLevel { - private long size; - private List parts; - private long lastListSize; - private List lastList; + private final long size; + private final List parts; public StateLevel(long size, List parts) { super(); @@ -30,20 +33,10 @@ public class StateLevel { this.parts = parts; } - public long getLastListSize() { - return lastListSize; + public boolean isFull() { + return size == parts.size(); } - public void setLastListSize(long lastListSize) { - this.lastListSize = lastListSize; - } - - public List getLastList() { - return lastList; - } - public void setLastList(List lastList) { - this.lastList = lastList; - } - + public long getSize() { return size; }