|
|
|
@ -204,7 +204,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
} else if (data instanceof CommandsData) {
|
|
|
|
|
CommandsData commands = (CommandsData) data;
|
|
|
|
|
try {
|
|
|
|
|
decodeCommandBatch(channel, in, data, commands);
|
|
|
|
|
decodeCommandBatch(channel, in, commands);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
commands.getPromise().tryFailure(e);
|
|
|
|
|
sendNext(channel);
|
|
|
|
@ -232,8 +232,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
state(null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void decodeCommandBatch(Channel channel, ByteBuf in, QueueCommand data,
|
|
|
|
|
CommandsData commandBatch) throws Exception {
|
|
|
|
|
private void decodeCommandBatch(Channel channel, ByteBuf in, CommandsData commandBatch) throws Exception {
|
|
|
|
|
int i = state().getBatchIndex();
|
|
|
|
|
|
|
|
|
|
Throwable error = null;
|
|
|
|
@ -265,8 +264,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
|
|
|
|
|
decode(in, commandData, null, channel, skipConvertor, commandsData);
|
|
|
|
|
|
|
|
|
|
if (commandData != null && RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())
|
|
|
|
|
&& commandData.getPromise().isSuccess()) {
|
|
|
|
|
if (commandData != null
|
|
|
|
|
&& RedisCommands.EXEC.getName().equals(commandData.getCommand().getName())
|
|
|
|
|
&& commandData.getPromise().isSuccess()) {
|
|
|
|
|
List<Object> objects = (List<Object>) commandData.getPromise().getNow();
|
|
|
|
|
Iterator<Object> iter = objects.iterator();
|
|
|
|
|
boolean multiFound = false;
|
|
|
|
@ -277,7 +277,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
}
|
|
|
|
|
Object res = iter.next();
|
|
|
|
|
|
|
|
|
|
completeResponse((CommandData<Object, Object>) command, res, channel);
|
|
|
|
|
completeResponse((CommandData<Object, Object>) command, res);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (RedisCommands.MULTI.getName().equals(command.getCommand().getName())) {
|
|
|
|
@ -307,13 +307,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
if (commandBatch.isSkipResult() || i == commandBatch.getCommands().size()) {
|
|
|
|
|
RPromise<Void> promise = commandBatch.getPromise();
|
|
|
|
|
if (error != null) {
|
|
|
|
|
if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) {
|
|
|
|
|
log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data));
|
|
|
|
|
}
|
|
|
|
|
promise.tryFailure(error);
|
|
|
|
|
} else {
|
|
|
|
|
if (!promise.trySuccess(null) && promise.cause() instanceof RedisTimeoutException) {
|
|
|
|
|
log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data));
|
|
|
|
|
}
|
|
|
|
|
promise.trySuccess(null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sendNext(channel);
|
|
|
|
@ -328,7 +324,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
if (code == '+') {
|
|
|
|
|
String result = readString(in);
|
|
|
|
|
|
|
|
|
|
handleResult(data, parts, result, skipConvertor, channel);
|
|
|
|
|
handleResult(data, parts, result, skipConvertor);
|
|
|
|
|
} else if (code == '-') {
|
|
|
|
|
String error = readString(in);
|
|
|
|
|
|
|
|
|
@ -369,7 +365,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
}
|
|
|
|
|
} else if (code == ':') {
|
|
|
|
|
Long result = readLong(in);
|
|
|
|
|
handleResult(data, parts, result, false, channel);
|
|
|
|
|
handleResult(data, parts, result, false);
|
|
|
|
|
} else if (code == '$') {
|
|
|
|
|
ByteBuf buf = readBytes(in);
|
|
|
|
|
Object result = null;
|
|
|
|
@ -377,7 +373,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
Decoder<Object> decoder = selectDecoder(data, parts);
|
|
|
|
|
result = decoder.decode(buf, state());
|
|
|
|
|
}
|
|
|
|
|
handleResult(data, parts, result, false, channel);
|
|
|
|
|
handleResult(data, parts, result, false);
|
|
|
|
|
} else if (code == '*') {
|
|
|
|
|
long size = readLong(in);
|
|
|
|
|
List<Object> respParts = new ArrayList<Object>(Math.max((int) size, 0));
|
|
|
|
@ -435,24 +431,24 @@ public class CommandDecoder extends ReplayingDecoder<State> {
|
|
|
|
|
protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
|
|
|
|
|
Object result) throws IOException {
|
|
|
|
|
if (data != null) {
|
|
|
|
|
handleResult(data, parts, result, true, channel);
|
|
|
|
|
handleResult(data, parts, result, true);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor, Channel channel) {
|
|
|
|
|
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor) {
|
|
|
|
|
if (data != null && !skipConvertor) {
|
|
|
|
|
result = data.getCommand().getConvertor().convert(result);
|
|
|
|
|
}
|
|
|
|
|
if (parts != null) {
|
|
|
|
|
parts.add(result);
|
|
|
|
|
} else {
|
|
|
|
|
completeResponse(data, result, channel);
|
|
|
|
|
completeResponse(data, result);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected void completeResponse(CommandData<Object, Object> data, Object result, Channel channel) {
|
|
|
|
|
if (data != null && !data.getPromise().trySuccess(result) && data.cause() instanceof RedisTimeoutException) {
|
|
|
|
|
log.warn("response has been skipped due to timeout! channel: {}, command: {}", channel, LogHelper.toString(data));
|
|
|
|
|
protected void completeResponse(CommandData<Object, Object> data, Object result) {
|
|
|
|
|
if (data != null) {
|
|
|
|
|
data.getPromise().trySuccess(result);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|