Merge pull request #170 from marko-stankovic/issue_169

Propagate response processing exceptions to ConnectionManager
pull/115/merge
Nikita Koksharov 10 years ago
commit 196e0449c8

@ -67,7 +67,7 @@ public class Command<K, V, T> {
}
if (res instanceof RedisException) {
promise.setFailure((Exception)res);
} if (output.hasError()) {
} else if (output.hasError()) {
if (output.getError().startsWith("MOVED")) {
String[] parts = output.getError().split(" ");
int slot = Integer.valueOf(parts[1]);
@ -79,6 +79,8 @@ public class Command<K, V, T> {
} else {
promise.setFailure(new RedisException(output.getError()));
}
} else if (output.hasException()) {
promise.setFailure(output.getException());
} else {
promise.setSuccess((T)res);
}

@ -17,6 +17,7 @@ public abstract class CommandOutput<K, V, T> {
protected RedisCodec<K, V> codec;
protected T output;
protected String error;
protected Throwable exception;
/**
* Initialize a new instance that encodes and decodes keys and
@ -97,6 +98,33 @@ public abstract class CommandOutput<K, V, T> {
return error;
}
/**
* Set exception that was caught while processing result in command output.
*
* @param exception Exception caught while processing command result.
*/
public void setException(Throwable exception) {
this.exception = exception;
}
/**
* Check if the processing command result resulted in an exception.
*
* @return true if processing of command result resulted in an exception.
*/
public boolean hasException() {
return this.exception != null;
}
/**
* Get the exception that occurred while processing command result.
*
* @return The exception.
*/
public Throwable getException () {
return exception;
}
/**
* Mark the command output complete.
*

@ -71,7 +71,7 @@ public class RedisStateMachine<K, V> {
case SINGLE:
if ((bytes = readLine(buffer)) == null) break loop;
if (!QUEUED.equals(bytes)) {
output.set(bytes);
setCommandOutputSafely(output, bytes);
}
break;
case ERROR:
@ -80,13 +80,13 @@ public class RedisStateMachine<K, V> {
break;
case INTEGER:
if ((end = findLineEnd(buffer)) == -1) break loop;
output.set(readLong(buffer, buffer.readerIndex(), end));
setCommandOutputSafely(output, readLong(buffer, buffer.readerIndex(), end));
break;
case BULK:
if ((end = findLineEnd(buffer)) == -1) break loop;
length = (int) readLong(buffer, buffer.readerIndex(), end);
if (length == -1) {
output.set(null);
setCommandOutputSafely(output, null);
} else {
state.type = BYTES;
state.count = length + 2;
@ -109,7 +109,7 @@ public class RedisStateMachine<K, V> {
continue loop;
case BYTES:
if ((bytes = readBytes(buffer, state.count)) == null) break loop;
output.set(bytes);
setCommandOutputSafely(output, bytes);
}
buffer.markReaderIndex();
@ -171,4 +171,28 @@ public class RedisStateMachine<K, V> {
}
return bytes;
}
private boolean setCommandOutputSafely(CommandOutput output, ByteBuffer bytes) {
boolean success = false;
try {
output.set(bytes);
success = true;
} catch (Throwable t) {
output.setException(t);
}
return success;
}
private boolean setCommandOutputSafely(CommandOutput output, long value) {
boolean success = false;
try {
output.set(value);
success = true;
} catch (Throwable t) {
output.setException(t);
}
return success;
}
}

@ -410,7 +410,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (future.isSuccess()) {
return future.getNow();
}
throw ((RedisException)future.cause());
throw future.cause() instanceof RedisException ?
(RedisException) future.cause() :
new RedisException("Unexpected exception while processing command", future.cause());
}
public <V, T> T read(String key, AsyncOperation<V, T> asyncOperation) {

@ -1,5 +1,7 @@
package org.redisson;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.lambdaworks.redis.RedisException;
import io.netty.util.concurrent.Future;
import java.io.Serializable;
@ -12,6 +14,7 @@ import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.core.Predicate;
import org.redisson.core.RMap;
@ -415,12 +418,12 @@ public class RedissonMapTest extends BaseTest {
Future<Integer> future = map.putAsync(2, 3);
Assert.assertNull(future.get());
Assert.assertEquals((Integer)3, map.get(2));
Assert.assertEquals((Integer) 3, map.get(2));
Future<Integer> future1 = map.putAsync(2, 4);
Assert.assertEquals((Integer)3, future1.get());
Assert.assertEquals((Integer) 3, future1.get());
Assert.assertEquals((Integer)4, map.get(2));
Assert.assertEquals((Integer) 4, map.get(2));
}
@Test
@ -430,10 +433,10 @@ public class RedissonMapTest extends BaseTest {
map.put(3, 5);
map.put(7, 8);
Assert.assertEquals((Integer)3, map.removeAsync(1).get());
Assert.assertEquals((Integer)5, map.removeAsync(3).get());
Assert.assertEquals((Integer) 3, map.removeAsync(1).get());
Assert.assertEquals((Integer) 5, map.removeAsync(3).get());
Assert.assertNull(map.removeAsync(10).get());
Assert.assertEquals((Integer)8, map.removeAsync(7).get());
Assert.assertEquals((Integer) 8, map.removeAsync(7).get());
}
@Test
@ -444,7 +447,7 @@ public class RedissonMapTest extends BaseTest {
map.put(4, 6);
map.put(7, 8);
Assert.assertEquals((Long)3L, map.fastRemoveAsync(1, 3, 7).get());
Assert.assertEquals((Long) 3L, map.fastRemoveAsync(1, 3, 7).get());
Thread.sleep(1);
Assert.assertEquals(1, map.size());
}
@ -456,4 +459,39 @@ public class RedissonMapTest extends BaseTest {
Assert.assertEquals(0, map.fastRemove());
Assert.assertEquals(1, map.size());
}
@Test(timeout = 5000)
public void testDeserializationErrorReturnsErrorImmediately() throws Exception {
redisson.getConfig().setCodec(new JsonJacksonCodec());
RMap<String, SimpleObjectWithoutDefaultConstructor> map = redisson.getMap("deserializationFailure");
SimpleObjectWithoutDefaultConstructor object = new SimpleObjectWithoutDefaultConstructor("test-val");
Assert.assertEquals("test-val", object.getTestField());
map.put("test-key", object);
try {
map.get("test-key");
Assert.fail("Expected exception from map.get() call");
} catch (Exception e) {
e.printStackTrace();
}
}
public static class SimpleObjectWithoutDefaultConstructor {
private String testField;
SimpleObjectWithoutDefaultConstructor(String testField) {
this.testField = testField;
}
public String getTestField() {
return testField;
}
public void setTestField(String testField) {
this.testField = testField;
}
}
}

Loading…
Cancel
Save