diff --git a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java index bca6d64f3..182220b88 100644 --- a/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/redisson/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -98,6 +98,7 @@ public class CommandDecoder extends ReplayingDecoder { } } catch (Exception e) { log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e); + sendNext(ctx); throw e; } } else if (data instanceof CommandData) { @@ -111,6 +112,7 @@ public class CommandDecoder extends ReplayingDecoder { } catch (Exception e) { log.error("Unable to decode data. channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8), e); cmd.tryFailure(e); + sendNext(ctx); throw e; } } else if (data instanceof CommandsData) { @@ -119,13 +121,17 @@ public class CommandDecoder extends ReplayingDecoder { decodeCommandBatch(ctx, in, data, commands); } catch (Exception e) { commands.getPromise().tryFailure(e); + sendNext(ctx); throw e; } return; } - ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); + sendNext(ctx); + } + protected void sendNext(ChannelHandlerContext ctx) { + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); state(null); } @@ -242,9 +248,7 @@ public class CommandDecoder extends ReplayingDecoder { } } - ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); - - state(null); + sendNext(ctx); } else { checkpoint(); state().setBatchIndex(i); diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index ef8a2ebbe..59f2d27c4 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -1,7 +1,7 @@ package org.redisson; -import static org.awaitility.Awaitility.*; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.redisson.BaseTest.createInstance; import java.io.IOException; @@ -31,31 +31,44 @@ import org.redisson.RedisRunner.RedisProcess; import org.redisson.api.ClusterNode; import org.redisson.api.Node; import org.redisson.api.Node.InfoSection; -import org.redisson.api.listener.MessageListener; -import org.redisson.api.listener.StatusListener; import org.redisson.api.NodesGroup; import org.redisson.api.RFuture; import org.redisson.api.RMap; -import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisOutOfMemoryException; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; +import org.redisson.codec.JsonJacksonCodec; import org.redisson.codec.SerializationCodec; import org.redisson.config.Config; import org.redisson.connection.ConnectionListener; import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.misc.HashValue; -import io.netty.buffer.Unpooled; - public class RedissonTest { protected RedissonClient redisson; protected static RedissonClient defaultRedisson; + @Test + public void testDecoderError() { + redisson.getBucket("testbucket", new StringCodec()).set("{INVALID JSON!}"); + + for (int i = 0; i < 256; i++) { + try { + redisson.getBucket("testbucket", new JsonJacksonCodec()).get(); + Assert.fail(); + } catch (Exception e) { + // skip + } + } + + redisson.getBucket("testbucket2").set("should work"); + } + @Test public void testSmallPool() throws InterruptedException { Config config = new Config();