diff --git a/src/main/java/com/lambdaworks/redis/protocol/Command.java b/src/main/java/com/lambdaworks/redis/protocol/Command.java index e8dbac516..4fc740e0f 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/Command.java +++ b/src/main/java/com/lambdaworks/redis/protocol/Command.java @@ -67,7 +67,7 @@ public class Command { } if (res instanceof RedisException) { promise.setFailure((Exception)res); - } if (output.hasError()) { + } else if (output.hasError()) { if (output.getError().startsWith("MOVED")) { String[] parts = output.getError().split(" "); int slot = Integer.valueOf(parts[1]); @@ -75,6 +75,8 @@ public class Command { } else { promise.setFailure(new RedisException(output.getError())); } + } else if (output.hasException()) { + promise.setFailure(output.getException()); } else { promise.setSuccess((T)res); } diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandOutput.java b/src/main/java/com/lambdaworks/redis/protocol/CommandOutput.java index 32cd9b42a..0559d1e64 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandOutput.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandOutput.java @@ -17,6 +17,7 @@ public abstract class CommandOutput { protected RedisCodec codec; protected T output; protected String error; + protected Throwable exception; /** * Initialize a new instance that encodes and decodes keys and @@ -97,6 +98,33 @@ public abstract class CommandOutput { return error; } + /** + * Set exception that was caught while processing result in command output. + * + * @param exception Exception caught while processing command result. + */ + public void setException(Throwable exception) { + this.exception = exception; + } + + /** + * Check if the processing command result resulted in an exception. + * + * @return true if processing of command result resulted in an exception. + */ + public boolean hasException() { + return this.exception != null; + } + + /** + * Get the exception that occurred while processing command result. + * + * @return The exception. + */ + public Throwable getException () { + return exception; + } + /** * Mark the command output complete. * diff --git a/src/main/java/com/lambdaworks/redis/protocol/RedisStateMachine.java b/src/main/java/com/lambdaworks/redis/protocol/RedisStateMachine.java index 4cd60bd90..82a5633e9 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/RedisStateMachine.java +++ b/src/main/java/com/lambdaworks/redis/protocol/RedisStateMachine.java @@ -71,7 +71,7 @@ public class RedisStateMachine { case SINGLE: if ((bytes = readLine(buffer)) == null) break loop; if (!QUEUED.equals(bytes)) { - output.set(bytes); + setCommandOutputSafely(output, bytes); } break; case ERROR: @@ -80,13 +80,13 @@ public class RedisStateMachine { break; case INTEGER: if ((end = findLineEnd(buffer)) == -1) break loop; - output.set(readLong(buffer, buffer.readerIndex(), end)); + setCommandOutputSafely(output, readLong(buffer, buffer.readerIndex(), end)); break; case BULK: if ((end = findLineEnd(buffer)) == -1) break loop; length = (int) readLong(buffer, buffer.readerIndex(), end); if (length == -1) { - output.set(null); + setCommandOutputSafely(output, null); } else { state.type = BYTES; state.count = length + 2; @@ -109,7 +109,7 @@ public class RedisStateMachine { continue loop; case BYTES: if ((bytes = readBytes(buffer, state.count)) == null) break loop; - output.set(bytes); + setCommandOutputSafely(output, bytes); } buffer.markReaderIndex(); @@ -171,4 +171,28 @@ public class RedisStateMachine { } return bytes; } + + + private boolean setCommandOutputSafely(CommandOutput output, ByteBuffer bytes) { + boolean success = false; + try { + output.set(bytes); + success = true; + } catch (Throwable t) { + output.setException(t); + } + return success; + } + + private boolean setCommandOutputSafely(CommandOutput output, long value) { + boolean success = false; + try { + output.set(value); + success = true; + } catch (Throwable t) { + output.setException(t); + } + return success; + } + } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index f9f3c6ef5..856e5ff87 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -410,7 +410,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (future.isSuccess()) { return future.getNow(); } - throw ((RedisException)future.cause()); + throw future.cause() instanceof RedisException ? + (RedisException) future.cause() : + new RedisException("Unexpected exception while processing command", future.cause()); } public T read(String key, AsyncOperation asyncOperation) {