|
|
|
@ -93,7 +93,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
|
|
|
|
|
if (data == null) {
|
|
|
|
|
while (in.writerIndex() > in.readerIndex()) {
|
|
|
|
|
decode(in, null, null, ctx.channel());
|
|
|
|
|
decode(in, null, null, ctx.channel());
|
|
|
|
|
}
|
|
|
|
|
} else if (data instanceof CommandData) {
|
|
|
|
|
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
|
|
|
|
@ -105,6 +105,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
cmd.tryFailure(e);
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
} else if (data instanceof CommandsData) {
|
|
|
|
|
CommandsData commands = (CommandsData)data;
|
|
|
|
@ -112,6 +113,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
decodeCommandBatch(ctx, in, data, commands);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
commands.getPromise().tryFailure(e);
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -172,7 +174,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
|
|
|
|
|
CommandsData commandBatch) {
|
|
|
|
|
CommandsData commandBatch) throws Exception {
|
|
|
|
|
int i = state().getBatchIndex();
|
|
|
|
|
|
|
|
|
|
Throwable error = null;
|
|
|
|
@ -211,6 +213,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
commandData.tryFailure(e);
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
i++;
|
|
|
|
|
if (commandData != null && !commandData.isSuccess()) {
|
|
|
|
|