Fixed - CommandDecoder throws IndexOutOfBoundsException if pingConnectionInterval param is used #1497

Fixed - Unable to send command! error if pingConnectionInterval param is used #1632
pull/1827/head
Nikita Koksharov 6 years ago
parent bdb5e1cf43
commit 13abe16511

@ -34,6 +34,7 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -166,7 +167,8 @@ public class RedisPubSubConnection extends RedisConnection {
} }
private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) { private <T, R> ChannelFuture async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
return channel.writeAndFlush(new CommandData<T, R>(null, messageDecoder, null, command, params)); RPromise<R> promise = new RedissonPromise<R>();
return channel.writeAndFlush(new CommandData<T, R>(promise, messageDecoder, null, command, params));
} }
public Map<ChannelName, Codec> getChannels() { public Map<ChannelName, Codec> getChannels() {

@ -51,9 +51,7 @@ import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.misc.LogHelper; import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -87,19 +85,23 @@ public class CommandDecoder extends ReplayingDecoder<State> {
log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data); log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data);
} }
if (state() == null) { if (state() == null) {
boolean makeCheckpoint = data != null; boolean makeCheckpoint = false;
if (data != null) { // commented out due to https://github.com/redisson/redisson/issues/1632. Reproduced with RedissonMapCacheTest
if (data instanceof CommandsData) { //
makeCheckpoint = false; // boolean makeCheckpoint = data != null;
} else { // if (data != null) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; // if (data instanceof CommandsData) {
if (cmd.getCommand().getReplayMultiDecoder() != null // makeCheckpoint = false;
&& (SlotsDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()) // } else {
|| ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) { // CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
makeCheckpoint = false; // MultiDecoder<Object> decoder = cmd.getCommand().getReplayMultiDecoder();
} // if (decoder != null
} // && (decoder instanceof SlotsDecoder
} // || decoder instanceof ListMultiDecoder)) {
// makeCheckpoint = false;
// }
// }
// }
state(new State(makeCheckpoint)); state(new State(makeCheckpoint));
} }
@ -122,10 +124,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (data instanceof CommandData) { if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try { try {
if (state().getLevels().size() > 0) { if (state().isMakeCheckpoint()) {
decodeFromCheckpoint(ctx, in, data, cmd); decodeFromCheckpoint(ctx, in, data, cmd);
} else { } else {
decode(in, cmd, null, ctx.channel(), false); decode(in, cmd, null, ctx, false);
} }
sendNext(ctx, data); sendNext(ctx, data);
} catch (Exception e) { } catch (Exception e) {
@ -146,7 +148,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} else { } else {
try { try {
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel(), false); decode(in, null, null, ctx, false);
} }
sendNext(ctx); sendNext(ctx);
} catch (Exception e) { } catch (Exception e) {
@ -164,54 +166,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
protected void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, protected void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandData<Object, Object> cmd) throws IOException { CommandData<Object, Object> cmd) throws IOException {
if (state().getLevels().size() == 2) { StateLevel level = state().getLastLevel();
StateLevel secondLevel = state().getLevels().get(1);
if (secondLevel.getParts().isEmpty()) { List<Object> prevParts = null;
state().getLevels().remove(1); if (state().getLevels().size() > 1) {
} StateLevel prevLevel = state().getLevels().get(state().getLevel() - 1);
prevParts = prevLevel.getParts();
} }
if (state().getLevels().size() == 2) { decodeList(in, cmd, prevParts, ctx, level.getSize(), level.getParts(), false);
StateLevel firstLevel = state().getLevels().get(0);
StateLevel secondLevel = state().getLevels().get(1);
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts(), false); if (state().getLastLevel() == level) {
state().removeLastLevel();
MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts());
if (decoder != null) {
Object result = decoder.decode(firstLevel.getParts(), state());
if (data != null) {
handleResult(cmd, null, result, true, ctx.channel());
}
}
}
if (state().getLevels().size() == 1) {
StateLevel firstLevel = state().getLevels().get(0);
if (firstLevel.getParts().isEmpty() && firstLevel.getLastList() == null) {
state().resetLevel();
decode(in, cmd, null, ctx.channel(), false);
} else {
if (firstLevel.getLastList() != null) {
if (firstLevel.getLastList().isEmpty()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
} else {
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), firstLevel.getLastListSize(), firstLevel.getLastList(), false);
}
firstLevel.setLastList(null);
firstLevel.setLastListSize(0);
while (in.isReadable() && firstLevel.getParts().size() < firstLevel.getSize()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
}
decodeList(in, cmd, null, ctx.channel(), 0, firstLevel.getParts(), false);
} else {
while (firstLevel.getSize() == firstLevel.getParts().size() && in.isReadable()) {
decode(in, cmd, firstLevel.getParts(), ctx.channel(), false);
}
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts(), false);
}
}
} }
} }
@ -244,7 +210,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
try { try {
decode(in, commandData, null, ctx.channel(), skipConvertor); decode(in, commandData, null, ctx, skipConvertor);
} finally { } finally {
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) { if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())) {
commandsData.remove(); commandsData.remove();
@ -302,8 +268,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
} }
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor) throws IOException { protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, ChannelHandlerContext ctx, boolean skipConvertor) throws IOException {
int code = in.readByte(); int code = in.readByte();
Channel channel = ctx.channel();
if (code == '+') { if (code == '+') {
ByteBuf rb = in.readBytes(in.bytesBefore((byte) '\r')); ByteBuf rb = in.readBytes(in.bytesBefore((byte) '\r'));
try { try {
@ -365,33 +332,22 @@ public class CommandDecoder extends ReplayingDecoder<State> {
handleResult(data, parts, result, false, channel); handleResult(data, parts, result, false, channel);
} else if (code == '*') { } else if (code == '*') {
long size = readLong(in); long size = readLong(in);
List<Object> respParts; final List<Object> respParts = new ArrayList<Object>();
StateLevel lastLevel = state().getLastLevel(); StateLevel lastLevel = null;
if (lastLevel != null && lastLevel.getSize() != lastLevel.getParts().size()) {
respParts = new ArrayList<Object>();
lastLevel.setLastListSize(size);
lastLevel.setLastList(respParts);
} else {
int level = state().incLevel();
if (state().getLevels().size()-1 >= level) {
StateLevel stateLevel = state().getLevels().get(level);
respParts = stateLevel.getParts();
size = stateLevel.getSize();
} else {
respParts = new ArrayList<Object>();
if (state().isMakeCheckpoint()) { if (state().isMakeCheckpoint()) {
state().addLevel(new StateLevel(size, respParts)); lastLevel = new StateLevel(size, respParts);
} state().addLevel(lastLevel);
}
} }
decodeList(in, data, parts, channel, size, respParts, skipConvertor); decodeList(in, data, parts, ctx, size, respParts, skipConvertor);
if (lastLevel != null && lastLevel.getLastList() != null) { if (state().isMakeCheckpoint()) {
lastLevel.setLastList(null); if (lastLevel == state().getLastLevel() && lastLevel.isFull()) {
lastLevel.setLastListSize(0); state().removeLastLevel();
}
} }
} else { } else {
String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8); String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8);
throw new IllegalStateException("Can't decode replay: " + dataStr); throw new IllegalStateException("Can't decode replay: " + dataStr);
@ -400,7 +356,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, long size, List<Object> respParts, boolean skipConvertor) ChannelHandlerContext ctx, long size, List<Object> respParts, boolean skipConvertor)
throws IOException { throws IOException {
if (parts == null && commandsData.get() != null) { if (parts == null && commandsData.get() != null) {
List<CommandData<?, ?>> commands = commandsData.get(); List<CommandData<?, ?>> commands = commandsData.get();
@ -410,7 +366,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
suffix = 1; suffix = 1;
} }
CommandData<Object, Object> commandData = (CommandData<Object, Object>) commands.get(i+suffix); CommandData<Object, Object> commandData = (CommandData<Object, Object>) commands.get(i+suffix);
decode(in, commandData, respParts, channel, skipConvertor); decode(in, commandData, respParts, ctx, skipConvertor);
if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) { if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) {
data.tryFailure(commandData.cause()); data.tryFailure(commandData.cause());
} }
@ -421,7 +377,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
} else { } else {
for (int i = respParts.size(); i < size; i++) { for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel, skipConvertor); decode(in, data, respParts, ctx, skipConvertor);
if (state().isMakeCheckpoint()) { if (state().isMakeCheckpoint()) {
checkpoint(); checkpoint();
} }
@ -434,13 +390,13 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
Object result = decoder.decode(respParts, state()); Object result = decoder.decode(respParts, state());
decodeResult(data, parts, channel, result); decodeResult(data, parts, ctx, result);
} }
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel, protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, ChannelHandlerContext ctx,
Object result) throws IOException { Object result) throws IOException {
if (data != null) { if (data != null) {
handleResult(data, parts, result, true, channel); handleResult(data, parts, result, true, ctx.channel());
} }
} }

@ -39,7 +39,6 @@ import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -75,7 +74,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (data == null) { if (data == null) {
try { try {
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel(), false); decode(in, null, null, ctx, false);
} }
sendNext(ctx); sendNext(ctx);
} catch (Exception e) { } catch (Exception e) {
@ -86,11 +85,11 @@ public class CommandPubSubDecoder extends CommandDecoder {
} else if (data instanceof CommandData) { } else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try { try {
if (state().getLevels().size() > 0) { if (state().isMakeCheckpoint()) {
decodeFromCheckpoint(ctx, in, data, cmd); decodeFromCheckpoint(ctx, in, data, cmd);
} else { } else {
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
decode(in, cmd, null, ctx.channel(), false); decode(in, cmd, null, ctx, false);
} }
} }
sendNext(ctx, data); sendNext(ctx, data);
@ -104,7 +103,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
} }
@Override @Override
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel, protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, ChannelHandlerContext ctx,
final Object result) throws IOException { final Object result) throws IOException {
if (executor.isShutdown()) { if (executor.isShutdown()) {
return; return;
@ -113,7 +112,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (result instanceof Message) { if (result instanceof Message) {
checkpoint(); checkpoint();
final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); final RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(ctx.channel());
ChannelName channelName = ((Message) result).getChannel(); ChannelName channelName = ((Message) result).getChannel();
if (result instanceof PubSubStatusMessage) { if (result instanceof PubSubStatusMessage) {
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
@ -161,7 +160,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
} }
} else { } else {
if (data != null && data.getCommand().getName().equals("PING")) { if (data != null && data.getCommand().getName().equals("PING")) {
super.decodeResult(data, parts, channel, result); super.decodeResult(data, parts, ctx, result);
} }
} }
} }

@ -65,8 +65,19 @@ public class CommandsQueue extends ChannelDuplexHandler {
}; };
public void sendNextCommand(Channel channel) { public void sendNextCommand(Channel channel) {
channel.attr(CommandsQueue.CURRENT_COMMAND).set(null); QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).getAndSet(null);
if (command != null) {
queue.poll(); queue.poll();
} else {
QueueCommandHolder c = queue.peek();
if (c != null) {
QueueCommand data = c.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
queue.poll();
}
}
}
sendData(channel); sendData(channel);
} }

@ -21,6 +21,11 @@ import java.util.List;
import org.redisson.client.protocol.decoder.DecoderState; import org.redisson.client.protocol.decoder.DecoderState;
/**
*
* @author Nikita Koksharov
*
*/
public class State { public class State {
private int batchIndex; private int batchIndex;
@ -38,15 +43,8 @@ public class State {
return makeCheckpoint; return makeCheckpoint;
} }
public void resetLevel() { public int getLevel() {
level = -1; return level;
levels.clear();
}
public int decLevel() {
return --level;
}
public int incLevel() {
return ++level;
} }
public StateLevel getLastLevel() { public StateLevel getLastLevel() {
@ -61,7 +59,13 @@ public class State {
levels = new ArrayList<StateLevel>(2); levels = new ArrayList<StateLevel>(2);
} }
levels.add(stateLevel); levels.add(stateLevel);
level++;
} }
public void removeLastLevel() {
levels.remove(level);
level--;
}
public List<StateLevel> getLevels() { public List<StateLevel> getLevels() {
if (levels == null) { if (levels == null) {
return Collections.emptyList(); return Collections.emptyList();

@ -17,12 +17,15 @@ package org.redisson.client.handler;
import java.util.List; import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class StateLevel { public class StateLevel {
private long size; private final long size;
private List<Object> parts; private final List<Object> parts;
private long lastListSize;
private List<Object> lastList;
public StateLevel(long size, List<Object> parts) { public StateLevel(long size, List<Object> parts) {
super(); super();
@ -30,18 +33,8 @@ public class StateLevel {
this.parts = parts; this.parts = parts;
} }
public long getLastListSize() { public boolean isFull() {
return lastListSize; return size == parts.size();
}
public void setLastListSize(long lastListSize) {
this.lastListSize = lastListSize;
}
public List<Object> getLastList() {
return lastList;
}
public void setLastList(List<Object> lastList) {
this.lastList = lastList;
} }
public long getSize() { public long getSize() {

Loading…
Cancel
Save