diff --git a/src/main/java/org/redisson/RedissonSortedSet.java b/src/main/java/org/redisson/RedissonSortedSet.java index 72be69e9f..273f54b1b 100644 --- a/src/main/java/org/redisson/RedissonSortedSet.java +++ b/src/main/java/org/redisson/RedissonSortedSet.java @@ -155,8 +155,7 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet { log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); } - if (data instanceof CommandData) { + if (data == null) { + decode(in, null, null, ctx.channel(), currentDecoder); + } else if (data instanceof CommandData) { +// if (state() == null) { +// state(new DecoderState()); +// } CommandData cmd = (CommandData)data; try { - decode(in, cmd, null, ctx.channel(), currentDecoder); +// if (state().getSize() > 0) { +// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), state().getRespParts()); +// } else { + decode(in, cmd, null, ctx.channel(), currentDecoder); +// } } catch (IOException e) { cmd.getPromise().setFailure(e); } } else if (data instanceof CommandsData) { CommandsData commands = (CommandsData)data; + int i = 0; if (state() != null) { i = state().getIndex(); + } else { + state(new DecoderState()); } + while (in.writerIndex() > in.readerIndex()) { CommandData cmd = null; try { - checkpoint(new DecoderState(i)); + checkpoint(); + state().setIndex(i); cmd = (CommandData) commands.getCommands().get(i); decode(in, cmd, null, ctx.channel(), currentDecoder); i++; @@ -106,6 +120,8 @@ public class CommandDecoder extends ReplayingDecoder { ctx.channel().attr(CommandsQueue.REPLAY).remove(); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); + + state(null); } private void decode(ByteBuf in, CommandData data, List parts, Channel channel, Decoder currentDecoder) throws IOException { @@ -144,28 +160,38 @@ public class CommandDecoder extends ReplayingDecoder { handleResult(data, parts, result, false); } else if (code == '*') { long size = readLong(in); +// state().setSize(size); + List respParts = new ArrayList(); - for (int i = 0; i < size; i++) { - decode(in, data, respParts, channel, currentDecoder); - } +// state().setRespParts(respParts); - Object result = messageDecoder(data, respParts).decode(respParts); - if (result instanceof PubSubStatusMessage) { - if (parts == null) { - parts = new ArrayList(); - } - parts.add(result); - // has next status messages - if (in.writerIndex() > in.readerIndex()) { - decode(in, data, parts, channel, currentDecoder); - } else { - handleMultiResult(data, null, channel, parts); - } + decodeMulti(in, data, parts, channel, currentDecoder, size, respParts); + } else { + throw new IllegalStateException("Can't decode replay " + (char)code); + } + } + + private void decodeMulti(ByteBuf in, CommandData data, List parts, + Channel channel, Decoder currentDecoder, long size, List respParts) + throws IOException { + for (int i = respParts.size(); i < size; i++) { + decode(in, data, respParts, channel, currentDecoder); + } + + Object result = messageDecoder(data, respParts).decode(respParts); + if (result instanceof PubSubStatusMessage) { + if (parts == null) { + parts = new ArrayList(); + } + parts.add(result); + // has next status messages + if (in.writerIndex() > in.readerIndex()) { + decode(in, data, parts, channel, currentDecoder); } else { - handleMultiResult(data, parts, channel, result); + handleMultiResult(data, null, channel, parts); } } else { - throw new IllegalStateException("Can't decode replay " + (char)code); + handleMultiResult(data, parts, channel, result); } } diff --git a/src/main/java/org/redisson/client/handler/DecoderState.java b/src/main/java/org/redisson/client/handler/DecoderState.java index 18c0fe49e..2bc20d8a1 100644 --- a/src/main/java/org/redisson/client/handler/DecoderState.java +++ b/src/main/java/org/redisson/client/handler/DecoderState.java @@ -1,14 +1,35 @@ package org.redisson.client.handler; +import java.util.List; + public class DecoderState { private int index; - public DecoderState(int index) { + private long size; + private List respParts; + + public DecoderState() { super(); - this.index = index; } + public void setSize(long size) { + this.size = size; + } + public long getSize() { + return size; + } + + public void setRespParts(List respParts) { + this.respParts = respParts; + } + public List getRespParts() { + return respParts; + } + + public void setIndex(int index) { + this.index = index; + } public int getIndex() { return index; } diff --git a/src/test/java/org/redisson/RedisClientTest.java b/src/test/java/org/redisson/RedisClientTest.java index 1a473d319..c573ff35c 100644 --- a/src/test/java/org/redisson/RedisClientTest.java +++ b/src/test/java/org/redisson/RedisClientTest.java @@ -2,6 +2,7 @@ package org.redisson; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -38,6 +39,8 @@ public class RedisClientTest { pool.awaitTermination(1, TimeUnit.HOURS); Assert.assertEquals(100000L, conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test")); + + conn.sync(RedisCommands.FLUSHDB); } @Test @@ -63,14 +66,29 @@ public class RedisClientTest { Assert.assertEquals(1, (long)cmd2.getPromise().get()); Assert.assertEquals(2, (long)cmd3.getPromise().get()); Assert.assertEquals("PONG", cmd4.getPromise().get()); + + conn.sync(RedisCommands.FLUSHDB); } @Test - public void testPipelineBigRequest() throws InterruptedException, ExecutionException { + public void testBigRequest() throws InterruptedException, ExecutionException { RedisClient c = new RedisClient("localhost", 6379); RedisConnection conn = c.connect(); - conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0); + for (int i = 0; i < 50; i++) { + conn.sync(StringCodec.INSTANCE, RedisCommands.HSET, "testmap", i, "2"); + } + + Map res = conn.sync(StringCodec.INSTANCE, RedisCommands.HGETALL, "testmap"); + Assert.assertEquals(50, res.size()); + + conn.sync(RedisCommands.FLUSHDB); + } + + @Test + public void testPipelineBigResponse() throws InterruptedException, ExecutionException { + RedisClient c = new RedisClient("localhost", 6379); + RedisConnection conn = c.connect(); List> commands = new ArrayList>(); for (int i = 0; i < 1000; i++) { @@ -83,6 +101,8 @@ public class RedisClientTest { for (CommandData commandData : commands) { commandData.getPromise().get(); } + + conn.sync(RedisCommands.FLUSHDB); } }