|
|
|
@ -115,25 +115,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
decode(ctx, in, data);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
in.markReaderIndex();
|
|
|
|
|
if (data instanceof CommandsData) {
|
|
|
|
|
CommandsData cmd = (CommandsData) data;
|
|
|
|
|
if (!cmd.isSkipResult()) {
|
|
|
|
|
for (int j = 0; j < cmd.getCommands().size(); j++) {
|
|
|
|
|
skipCommand(in);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if (!(data instanceof CommandsData)) {
|
|
|
|
|
in.markReaderIndex();
|
|
|
|
|
skipCommand(in);
|
|
|
|
|
in.resetReaderIndex();
|
|
|
|
|
}
|
|
|
|
|
in.resetReaderIndex();
|
|
|
|
|
|
|
|
|
|
decode(ctx, in, data);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void decode(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception {
|
|
|
|
|
if (decodeInExecutor) {
|
|
|
|
|
if (decodeInExecutor && !(data instanceof CommandsData)) {
|
|
|
|
|
ByteBuf copy = in.copy(in.readerIndex(), in.writerIndex() - in.readerIndex());
|
|
|
|
|
in.skipBytes(in.writerIndex() - in.readerIndex());
|
|
|
|
|
executor.execute(() -> {
|
|
|
|
@ -251,7 +244,13 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
while (in.writerIndex() > in.readerIndex()) {
|
|
|
|
|
CommandData<Object, Object> commandData = null;
|
|
|
|
|
try {
|
|
|
|
|
checkpoint();
|
|
|
|
|
state.get().setBatchIndex(i);
|
|
|
|
|
|
|
|
|
|
in.markReaderIndex();
|
|
|
|
|
skipCommand(in);
|
|
|
|
|
in.resetReaderIndex();
|
|
|
|
|
|
|
|
|
|
RedisCommand<?> cmd = commandBatch.getCommands().get(i).getCommand();
|
|
|
|
|
boolean skipConvertor = commandBatch.isQueued();
|
|
|
|
|
List<CommandData<?, ?>> commandsData = null;
|
|
|
|
@ -317,6 +316,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
|
|
|
|
|
sendNext(channel);
|
|
|
|
|
} else {
|
|
|
|
|
checkpoint();
|
|
|
|
|
state.get().setBatchIndex(i);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|