From c362ea2f9e7b8efcab5ac5764f099e351fa087c2 Mon Sep 17 00:00:00 2001 From: Marko Stankovic Date: Mon, 1 Jun 2015 00:13:05 +0200 Subject: [PATCH 1/2] Add test to prove that calls don't hang on command errors #169 Test that proves that connection won't hang if exception is thrown while processing command in CommandHandler. This test currently fails, until bug is fixed. --- .../java/org/redisson/RedissonMapTest.java | 52 ++++++++++++++++--- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/src/test/java/org/redisson/RedissonMapTest.java b/src/test/java/org/redisson/RedissonMapTest.java index 906117496..3a81350df 100644 --- a/src/test/java/org/redisson/RedissonMapTest.java +++ b/src/test/java/org/redisson/RedissonMapTest.java @@ -1,5 +1,7 @@ package org.redisson; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.lambdaworks.redis.RedisException; import io.netty.util.concurrent.Future; import java.io.Serializable; @@ -12,6 +14,7 @@ import java.util.concurrent.ExecutionException; import org.junit.Assert; import org.junit.Test; +import org.redisson.codec.JsonJacksonCodec; import org.redisson.core.Predicate; import org.redisson.core.RMap; @@ -415,12 +418,12 @@ public class RedissonMapTest extends BaseTest { Future future = map.putAsync(2, 3); Assert.assertNull(future.get()); - Assert.assertEquals((Integer)3, map.get(2)); + Assert.assertEquals((Integer) 3, map.get(2)); Future future1 = map.putAsync(2, 4); - Assert.assertEquals((Integer)3, future1.get()); + Assert.assertEquals((Integer) 3, future1.get()); - Assert.assertEquals((Integer)4, map.get(2)); + Assert.assertEquals((Integer) 4, map.get(2)); } @Test @@ -430,10 +433,10 @@ public class RedissonMapTest extends BaseTest { map.put(3, 5); map.put(7, 8); - Assert.assertEquals((Integer)3, map.removeAsync(1).get()); - Assert.assertEquals((Integer)5, map.removeAsync(3).get()); + Assert.assertEquals((Integer) 3, map.removeAsync(1).get()); + Assert.assertEquals((Integer) 5, map.removeAsync(3).get()); Assert.assertNull(map.removeAsync(10).get()); - Assert.assertEquals((Integer)8, map.removeAsync(7).get()); + Assert.assertEquals((Integer) 8, map.removeAsync(7).get()); } @Test @@ -444,7 +447,7 @@ public class RedissonMapTest extends BaseTest { map.put(4, 6); map.put(7, 8); - Assert.assertEquals((Long)3L, map.fastRemoveAsync(1, 3, 7).get()); + Assert.assertEquals((Long) 3L, map.fastRemoveAsync(1, 3, 7).get()); Thread.sleep(1); Assert.assertEquals(1, map.size()); } @@ -456,4 +459,39 @@ public class RedissonMapTest extends BaseTest { Assert.assertEquals(0, map.fastRemove()); Assert.assertEquals(1, map.size()); } + + @Test(timeout = 5000) + public void testDeserializationErrorReturnsErrorImmediately() throws Exception { + redisson.getConfig().setCodec(new JsonJacksonCodec()); + + RMap map = redisson.getMap("deserializationFailure"); + SimpleObjectWithoutDefaultConstructor object = new SimpleObjectWithoutDefaultConstructor("test-val"); + + Assert.assertEquals("test-val", object.getTestField()); + map.put("test-key", object); + + try { + map.get("test-key"); + Assert.fail("Expected exception from map.get() call"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static class SimpleObjectWithoutDefaultConstructor { + + private String testField; + + SimpleObjectWithoutDefaultConstructor(String testField) { + this.testField = testField; + } + + public String getTestField() { + return testField; + } + + public void setTestField(String testField) { + this.testField = testField; + } + } } From ddc5684d1e5bf04a11f51fdc693c7edc8d3f9fe0 Mon Sep 17 00:00:00 2001 From: Marko Stankovic Date: Mon, 1 Jun 2015 00:14:09 +0200 Subject: [PATCH 2/2] Propagate Command processing exceptions to ConnectionManager #169 --- .../lambdaworks/redis/protocol/Command.java | 4 ++- .../redis/protocol/CommandOutput.java | 28 ++++++++++++++++ .../redis/protocol/RedisStateMachine.java | 32 ++++++++++++++++--- .../MasterSlaveConnectionManager.java | 4 ++- 4 files changed, 62 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/protocol/Command.java b/src/main/java/com/lambdaworks/redis/protocol/Command.java index e8dbac516..4fc740e0f 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/Command.java +++ b/src/main/java/com/lambdaworks/redis/protocol/Command.java @@ -67,7 +67,7 @@ public class Command { } if (res instanceof RedisException) { promise.setFailure((Exception)res); - } if (output.hasError()) { + } else if (output.hasError()) { if (output.getError().startsWith("MOVED")) { String[] parts = output.getError().split(" "); int slot = Integer.valueOf(parts[1]); @@ -75,6 +75,8 @@ public class Command { } else { promise.setFailure(new RedisException(output.getError())); } + } else if (output.hasException()) { + promise.setFailure(output.getException()); } else { promise.setSuccess((T)res); } diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandOutput.java b/src/main/java/com/lambdaworks/redis/protocol/CommandOutput.java index 32cd9b42a..0559d1e64 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandOutput.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandOutput.java @@ -17,6 +17,7 @@ public abstract class CommandOutput { protected RedisCodec codec; protected T output; protected String error; + protected Throwable exception; /** * Initialize a new instance that encodes and decodes keys and @@ -97,6 +98,33 @@ public abstract class CommandOutput { return error; } + /** + * Set exception that was caught while processing result in command output. + * + * @param exception Exception caught while processing command result. + */ + public void setException(Throwable exception) { + this.exception = exception; + } + + /** + * Check if the processing command result resulted in an exception. + * + * @return true if processing of command result resulted in an exception. + */ + public boolean hasException() { + return this.exception != null; + } + + /** + * Get the exception that occurred while processing command result. + * + * @return The exception. + */ + public Throwable getException () { + return exception; + } + /** * Mark the command output complete. * diff --git a/src/main/java/com/lambdaworks/redis/protocol/RedisStateMachine.java b/src/main/java/com/lambdaworks/redis/protocol/RedisStateMachine.java index 4cd60bd90..82a5633e9 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/RedisStateMachine.java +++ b/src/main/java/com/lambdaworks/redis/protocol/RedisStateMachine.java @@ -71,7 +71,7 @@ public class RedisStateMachine { case SINGLE: if ((bytes = readLine(buffer)) == null) break loop; if (!QUEUED.equals(bytes)) { - output.set(bytes); + setCommandOutputSafely(output, bytes); } break; case ERROR: @@ -80,13 +80,13 @@ public class RedisStateMachine { break; case INTEGER: if ((end = findLineEnd(buffer)) == -1) break loop; - output.set(readLong(buffer, buffer.readerIndex(), end)); + setCommandOutputSafely(output, readLong(buffer, buffer.readerIndex(), end)); break; case BULK: if ((end = findLineEnd(buffer)) == -1) break loop; length = (int) readLong(buffer, buffer.readerIndex(), end); if (length == -1) { - output.set(null); + setCommandOutputSafely(output, null); } else { state.type = BYTES; state.count = length + 2; @@ -109,7 +109,7 @@ public class RedisStateMachine { continue loop; case BYTES: if ((bytes = readBytes(buffer, state.count)) == null) break loop; - output.set(bytes); + setCommandOutputSafely(output, bytes); } buffer.markReaderIndex(); @@ -171,4 +171,28 @@ public class RedisStateMachine { } return bytes; } + + + private boolean setCommandOutputSafely(CommandOutput output, ByteBuffer bytes) { + boolean success = false; + try { + output.set(bytes); + success = true; + } catch (Throwable t) { + output.setException(t); + } + return success; + } + + private boolean setCommandOutputSafely(CommandOutput output, long value) { + boolean success = false; + try { + output.set(value); + success = true; + } catch (Throwable t) { + output.setException(t); + } + return success; + } + } diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index f9f3c6ef5..856e5ff87 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -410,7 +410,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (future.isSuccess()) { return future.getNow(); } - throw ((RedisException)future.cause()); + throw future.cause() instanceof RedisException ? + (RedisException) future.cause() : + new RedisException("Unexpected exception while processing command", future.cause()); } public T read(String key, AsyncOperation asyncOperation) {