|
|
|
@ -100,8 +100,10 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
try {
|
|
|
|
|
// if (state().getSize() > 0) {
|
|
|
|
|
// List<Object> respParts = new ArrayList<Object>();
|
|
|
|
|
// respParts.addAll(state().getRespParts());
|
|
|
|
|
// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), respParts);
|
|
|
|
|
// if (state().getRespParts() != null) {
|
|
|
|
|
// respParts = state().getRespParts();
|
|
|
|
|
// }
|
|
|
|
|
// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), respParts, true);
|
|
|
|
|
// } else {
|
|
|
|
|
decode(in, cmd, null, ctx.channel(), currentDecoder);
|
|
|
|
|
// }
|
|
|
|
@ -149,7 +151,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if (!promise.trySuccess(null) && promise.cause() instanceof RedisTimeoutException) {
|
|
|
|
|
// TODO try increase timeout
|
|
|
|
|
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -214,21 +215,27 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
handleResult(data, parts, result, false, channel);
|
|
|
|
|
} else if (code == '*') {
|
|
|
|
|
long size = readLong(in);
|
|
|
|
|
// state().setSizeOnce(size);
|
|
|
|
|
|
|
|
|
|
List<Object> respParts = new ArrayList<Object>();
|
|
|
|
|
boolean top = false;
|
|
|
|
|
// if (state().trySetSize(size)) {
|
|
|
|
|
// state().setRespParts(respParts);
|
|
|
|
|
// top = true;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
decodeMulti(in, data, parts, channel, currentDecoder, size, respParts);
|
|
|
|
|
decodeMulti(in, data, parts, channel, currentDecoder, size, respParts, top);
|
|
|
|
|
} else {
|
|
|
|
|
throw new IllegalStateException("Can't decode replay " + (char)code);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
|
|
|
|
|
Channel channel, Decoder<Object> currentDecoder, long size, List<Object> respParts)
|
|
|
|
|
Channel channel, Decoder<Object> currentDecoder, long size, List<Object> respParts, boolean top)
|
|
|
|
|
throws IOException {
|
|
|
|
|
for (int i = respParts.size(); i < size; i++) {
|
|
|
|
|
decode(in, data, respParts, channel, currentDecoder);
|
|
|
|
|
// if (top) {
|
|
|
|
|
// checkpoint();
|
|
|
|
|
// }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MultiDecoder<Object> decoder = messageDecoder(data, respParts, channel);
|
|
|
|
@ -250,9 +257,6 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
handleMultiResult(data, parts, channel, result);
|
|
|
|
|
// if (parts != null && !decoder.isApplicable(parts.size(), state())) {
|
|
|
|
|
// state().setRespParts(parts);
|
|
|
|
|
// }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|