Propagate Command processing exceptions to ConnectionManager #169

pull/170/head
Marko Stankovic 10 years ago
parent c362ea2f9e
commit ddc5684d1e

@ -67,7 +67,7 @@ public class Command<K, V, T> {
}
if (res instanceof RedisException) {
promise.setFailure((Exception)res);
} if (output.hasError()) {
} else if (output.hasError()) {
if (output.getError().startsWith("MOVED")) {
String[] parts = output.getError().split(" ");
int slot = Integer.valueOf(parts[1]);
@ -75,6 +75,8 @@ public class Command<K, V, T> {
} else {
promise.setFailure(new RedisException(output.getError()));
}
} else if (output.hasException()) {
promise.setFailure(output.getException());
} else {
promise.setSuccess((T)res);
}

@ -17,6 +17,7 @@ public abstract class CommandOutput<K, V, T> {
protected RedisCodec<K, V> codec;
protected T output;
protected String error;
protected Throwable exception;
/**
* Initialize a new instance that encodes and decodes keys and
@ -97,6 +98,33 @@ public abstract class CommandOutput<K, V, T> {
return error;
}
/**
* Set exception that was caught while processing result in command output.
*
* @param exception Exception caught while processing command result.
*/
public void setException(Throwable exception) {
this.exception = exception;
}
/**
* Check if the processing command result resulted in an exception.
*
* @return true if processing of command result resulted in an exception.
*/
public boolean hasException() {
return this.exception != null;
}
/**
* Get the exception that occurred while processing command result.
*
* @return The exception.
*/
public Throwable getException () {
return exception;
}
/**
* Mark the command output complete.
*

@ -71,7 +71,7 @@ public class RedisStateMachine<K, V> {
case SINGLE:
if ((bytes = readLine(buffer)) == null) break loop;
if (!QUEUED.equals(bytes)) {
output.set(bytes);
setCommandOutputSafely(output, bytes);
}
break;
case ERROR:
@ -80,13 +80,13 @@ public class RedisStateMachine<K, V> {
break;
case INTEGER:
if ((end = findLineEnd(buffer)) == -1) break loop;
output.set(readLong(buffer, buffer.readerIndex(), end));
setCommandOutputSafely(output, readLong(buffer, buffer.readerIndex(), end));
break;
case BULK:
if ((end = findLineEnd(buffer)) == -1) break loop;
length = (int) readLong(buffer, buffer.readerIndex(), end);
if (length == -1) {
output.set(null);
setCommandOutputSafely(output, null);
} else {
state.type = BYTES;
state.count = length + 2;
@ -109,7 +109,7 @@ public class RedisStateMachine<K, V> {
continue loop;
case BYTES:
if ((bytes = readBytes(buffer, state.count)) == null) break loop;
output.set(bytes);
setCommandOutputSafely(output, bytes);
}
buffer.markReaderIndex();
@ -171,4 +171,28 @@ public class RedisStateMachine<K, V> {
}
return bytes;
}
private boolean setCommandOutputSafely(CommandOutput output, ByteBuffer bytes) {
boolean success = false;
try {
output.set(bytes);
success = true;
} catch (Throwable t) {
output.setException(t);
}
return success;
}
private boolean setCommandOutputSafely(CommandOutput output, long value) {
boolean success = false;
try {
output.set(value);
success = true;
} catch (Throwable t) {
output.setException(t);
}
return success;
}
}

@ -410,7 +410,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (future.isSuccess()) {
return future.getNow();
}
throw ((RedisException)future.cause());
throw future.cause() instanceof RedisException ?
(RedisException) future.cause() :
new RedisException("Unexpected exception while processing command", future.cause());
}
public <V, T> T read(String key, AsyncOperation<V, T> asyncOperation) {

Loading…
Cancel
Save