Improvement - RBatch decoding optimization.

pull/3848/head
Nikita Koksharov 4 years ago
parent 28033956fc
commit 5bd4ea9776

@ -91,7 +91,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
int endIndex = skipCommand(in); int endIndex = skipCommand(in);
try { try {
decode(ctx, in, data); decode(ctx, in, data, 0);
} catch (Exception e) { } catch (Exception e) {
in.readerIndex(endIndex); in.readerIndex(endIndex);
throw e; throw e;
@ -101,25 +101,40 @@ public class CommandDecoder extends ReplayingDecoder<State> {
int endIndex = 0; int endIndex = 0;
if (!(data instanceof CommandsData)) { if (!(data instanceof CommandsData)) {
endIndex = skipCommand(in); endIndex = skipCommand(in);
} else {
endIndex = skipBatchCommand(in, (CommandsData) data);
} }
if (data.isExecuted()) {
try { in.readerIndex(endIndex);
decode(ctx, in, data); sendNext(ctx.channel());
} catch (Exception e) { return;
if (!(data instanceof CommandsData)) {
in.readerIndex(endIndex);
}
throw e;
} }
decode(ctx, in, data, endIndex);
}
}
private int skipBatchCommand(ByteBuf in, CommandsData data) throws IOException {
int commandsAmount = 1;
if (!data.isSkipResult()) {
commandsAmount = data.getCommands().size();
}
in.markReaderIndex();
for (int i = 0; i < commandsAmount; i++) {
skipDecode(in);
} }
int endIndex = in.readerIndex();
in.resetReaderIndex();
return endIndex;
} }
private void decode(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data) throws Exception { private void decode(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, int endIndex) throws Exception {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data); log.trace("reply: {}, channel: {}, command: {}", in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), ctx.channel(), data);
} }
decodeCommand(ctx.channel(), in, data); decodeCommand(ctx.channel(), in, data, endIndex);
} }
protected void sendNext(Channel channel, QueueCommand data) { protected void sendNext(Channel channel, QueueCommand data) {
@ -157,7 +172,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} }
} }
} }
private void skipBytes(ByteBuf is) throws IOException { private void skipBytes(ByteBuf is) throws IOException {
long l = readLong(is); long l = readLong(is);
if (l > Integer.MAX_VALUE) { if (l > Integer.MAX_VALUE) {
@ -176,7 +191,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
in.skipBytes(len + 2); in.skipBytes(len + 2);
} }
protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception { protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data, int endIndex) throws Exception {
if (data instanceof CommandData) { if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>) data; CommandData<Object, Object> cmd = (CommandData<Object, Object>) data;
try { try {
@ -184,8 +199,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
sendNext(channel, data); sendNext(channel, data);
} catch (Exception e) { } catch (Exception e) {
log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in) + ", command: " + LogHelper.toString(data), e); log.error("Unable to decode data. channel: " + channel + ", reply: " + LogHelper.toString(in) + ", command: " + LogHelper.toString(data), e);
cmd.tryFailure(e); in.readerIndex(endIndex);
sendNext(channel); sendNext(channel);
cmd.tryFailure(e);
throw e; throw e;
} }
} else if (data instanceof CommandsData) { } else if (data instanceof CommandsData) {
@ -193,8 +209,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
try { try {
decodeCommandBatch(channel, in, commands); decodeCommandBatch(channel, in, commands);
} catch (Exception e) { } catch (Exception e) {
commands.getPromise().tryFailure(e); in.readerIndex(endIndex);
sendNext(channel); sendNext(channel);
commands.getPromise().tryFailure(e);
throw e; throw e;
} }
} else { } else {

@ -77,7 +77,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
} }
@Override @Override
protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data) throws Exception { protected void decodeCommand(Channel channel, ByteBuf in, QueueCommand data, int endIndex) throws Exception {
if (data == null) { if (data == null) {
try { try {
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {

Loading…
Cancel
Save