diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 308eb89e9..68e238f08 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -95,7 +95,7 @@ public class CommandDecoder extends ReplayingDecoder { state().setDecoderState(null); if (data == null) { - decode(in, null, null, ctx.channel(), currentDecoder); + decode(in, null, null, ctx.channel(), currentDecoder); } else if (data instanceof CommandData) { CommandData cmd = (CommandData)data; try { @@ -110,40 +110,45 @@ public class CommandDecoder extends ReplayingDecoder { } else if (data instanceof CommandsData) { CommandsData commands = (CommandsData)data; - int i = state().getIndex(); + handleCommandsDataResponse(ctx, in, data, currentDecoder, commands); + return; + } - while (in.writerIndex() > in.readerIndex()) { - CommandData cmd = null; - try { - checkpoint(); - state().setIndex(i); - cmd = (CommandData) commands.getCommands().get(i); - decode(in, cmd, null, ctx.channel(), currentDecoder); - i++; - } catch (IOException e) { - cmd.getPromise().setFailure(e); - } - } + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); - if (i == commands.getCommands().size()) { - Promise promise = commands.getPromise(); - if (!promise.trySuccess(null)) { - log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); - } + state(null); + } - ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); + private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, + Decoder currentDecoder, CommandsData commands) throws Exception { + int i = state().getIndex(); - state(null); - } else { + while (in.writerIndex() > in.readerIndex()) { + CommandData cmd = null; + try { checkpoint(); state().setIndex(i); + cmd = (CommandData) commands.getCommands().get(i); + decode(in, cmd, null, ctx.channel(), currentDecoder); + i++; + } catch (IOException e) { + cmd.getPromise().setFailure(e); } - return; } - ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); + if (i == commands.getCommands().size()) { + Promise promise = commands.getPromise(); + if (!promise.trySuccess(null)) { + log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); + } - state(null); + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx); + + state(null); + } else { + checkpoint(); + state().setIndex(i); + } } private void decode(ByteBuf in, CommandData data, List parts, Channel channel, Decoder currentDecoder) throws IOException { @@ -206,6 +211,9 @@ public class CommandDecoder extends ReplayingDecoder { Object result = decoder.decode(respParts, state()); + // store current message index + checkpoint(); + if (result instanceof Message) { handleMultiResult(data, null, channel, result); // has next messages? diff --git a/src/test/java/org/redisson/RedissonTopicTest.java b/src/test/java/org/redisson/RedissonTopicTest.java index 23158db91..5bb1f2981 100644 --- a/src/test/java/org/redisson/RedissonTopicTest.java +++ b/src/test/java/org/redisson/RedissonTopicTest.java @@ -203,6 +203,8 @@ public class RedissonTopicTest { redisson2.shutdown(); } + volatile long counter; + @Test public void testHeavyLoad() throws InterruptedException { final CountDownLatch messageRecieved = new CountDownLatch(1000); @@ -214,6 +216,7 @@ public class RedissonTopicTest { public void onMessage(String channel, Message msg) { Assert.assertEquals(new Message("123"), msg); messageRecieved.countDown(); + counter++; } }); @@ -233,11 +236,13 @@ public class RedissonTopicTest { messageRecieved.await(); + Thread.sleep(1000); + + Assert.assertEquals(500, counter); + redisson1.shutdown(); redisson2.shutdown(); } - - @Test public void testListenerRemove() throws InterruptedException { Redisson redisson1 = BaseTest.createInstance();