DecoderState added

pull/243/head
Nikita 10 years ago
parent df90d9be93
commit 6b76cd8c7c

@ -155,8 +155,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override @Override
public int size() { public int size() {
Long size = connectionManager.read(getName(), RedisCommands.LLEN, getName()); return connectionManager.read(getName(), RedisCommands.LLEN, getName());
return size.intValue();
} }
private int size(RedisConnection connection) { private int size(RedisConnection connection) {

@ -78,23 +78,37 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
} }
if (data instanceof CommandData) { if (data == null) {
decode(in, null, null, ctx.channel(), currentDecoder);
} else if (data instanceof CommandData) {
// if (state() == null) {
// state(new DecoderState());
// }
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try { try {
decode(in, cmd, null, ctx.channel(), currentDecoder); // if (state().getSize() > 0) {
// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), state().getRespParts());
// } else {
decode(in, cmd, null, ctx.channel(), currentDecoder);
// }
} catch (IOException e) { } catch (IOException e) {
cmd.getPromise().setFailure(e); cmd.getPromise().setFailure(e);
} }
} else if (data instanceof CommandsData) { } else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data; CommandsData commands = (CommandsData)data;
int i = 0; int i = 0;
if (state() != null) { if (state() != null) {
i = state().getIndex(); i = state().getIndex();
} else {
state(new DecoderState());
} }
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null; CommandData<Object, Object> cmd = null;
try { try {
checkpoint(new DecoderState(i)); checkpoint();
state().setIndex(i);
cmd = (CommandData<Object, Object>) commands.getCommands().get(i); cmd = (CommandData<Object, Object>) commands.getCommands().get(i);
decode(in, cmd, null, ctx.channel(), currentDecoder); decode(in, cmd, null, ctx.channel(), currentDecoder);
i++; i++;
@ -106,6 +120,8 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
ctx.channel().attr(CommandsQueue.REPLAY).remove(); ctx.channel().attr(CommandsQueue.REPLAY).remove();
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
state(null);
} }
private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException { private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException {
@ -144,28 +160,38 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
handleResult(data, parts, result, false); handleResult(data, parts, result, false);
} else if (code == '*') { } else if (code == '*') {
long size = readLong(in); long size = readLong(in);
// state().setSize(size);
List<Object> respParts = new ArrayList<Object>(); List<Object> respParts = new ArrayList<Object>();
for (int i = 0; i < size; i++) { // state().setRespParts(respParts);
decode(in, data, respParts, channel, currentDecoder);
}
Object result = messageDecoder(data, respParts).decode(respParts); decodeMulti(in, data, parts, channel, currentDecoder, size, respParts);
if (result instanceof PubSubStatusMessage) { } else {
if (parts == null) { throw new IllegalStateException("Can't decode replay " + (char)code);
parts = new ArrayList<Object>(); }
} }
parts.add(result);
// has next status messages private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
if (in.writerIndex() > in.readerIndex()) { Channel channel, Decoder<Object> currentDecoder, long size, List<Object> respParts)
decode(in, data, parts, channel, currentDecoder); throws IOException {
} else { for (int i = respParts.size(); i < size; i++) {
handleMultiResult(data, null, channel, parts); decode(in, data, respParts, channel, currentDecoder);
} }
Object result = messageDecoder(data, respParts).decode(respParts);
if (result instanceof PubSubStatusMessage) {
if (parts == null) {
parts = new ArrayList<Object>();
}
parts.add(result);
// has next status messages
if (in.writerIndex() > in.readerIndex()) {
decode(in, data, parts, channel, currentDecoder);
} else { } else {
handleMultiResult(data, parts, channel, result); handleMultiResult(data, null, channel, parts);
} }
} else { } else {
throw new IllegalStateException("Can't decode replay " + (char)code); handleMultiResult(data, parts, channel, result);
} }
} }

@ -1,14 +1,35 @@
package org.redisson.client.handler; package org.redisson.client.handler;
import java.util.List;
public class DecoderState { public class DecoderState {
private int index; private int index;
public DecoderState(int index) { private long size;
private List<Object> respParts;
public DecoderState() {
super(); super();
this.index = index;
} }
public void setSize(long size) {
this.size = size;
}
public long getSize() {
return size;
}
public void setRespParts(List<Object> respParts) {
this.respParts = respParts;
}
public List<Object> getRespParts() {
return respParts;
}
public void setIndex(int index) {
this.index = index;
}
public int getIndex() { public int getIndex() {
return index; return index;
} }

@ -2,6 +2,7 @@ package org.redisson;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -38,6 +39,8 @@ public class RedisClientTest {
pool.awaitTermination(1, TimeUnit.HOURS); pool.awaitTermination(1, TimeUnit.HOURS);
Assert.assertEquals(100000L, conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test")); Assert.assertEquals(100000L, conn.sync(LongCodec.INSTANCE, RedisCommands.GET, "test"));
conn.sync(RedisCommands.FLUSHDB);
} }
@Test @Test
@ -63,14 +66,29 @@ public class RedisClientTest {
Assert.assertEquals(1, (long)cmd2.getPromise().get()); Assert.assertEquals(1, (long)cmd2.getPromise().get());
Assert.assertEquals(2, (long)cmd3.getPromise().get()); Assert.assertEquals(2, (long)cmd3.getPromise().get());
Assert.assertEquals("PONG", cmd4.getPromise().get()); Assert.assertEquals("PONG", cmd4.getPromise().get());
conn.sync(RedisCommands.FLUSHDB);
} }
@Test @Test
public void testPipelineBigRequest() throws InterruptedException, ExecutionException { public void testBigRequest() throws InterruptedException, ExecutionException {
RedisClient c = new RedisClient("localhost", 6379); RedisClient c = new RedisClient("localhost", 6379);
RedisConnection conn = c.connect(); RedisConnection conn = c.connect();
conn.sync(StringCodec.INSTANCE, RedisCommands.SET, "test", 0); for (int i = 0; i < 50; i++) {
conn.sync(StringCodec.INSTANCE, RedisCommands.HSET, "testmap", i, "2");
}
Map<Object, Object> res = conn.sync(StringCodec.INSTANCE, RedisCommands.HGETALL, "testmap");
Assert.assertEquals(50, res.size());
conn.sync(RedisCommands.FLUSHDB);
}
@Test
public void testPipelineBigResponse() throws InterruptedException, ExecutionException {
RedisClient c = new RedisClient("localhost", 6379);
RedisConnection conn = c.connect();
List<CommandData<?, ?>> commands = new ArrayList<CommandData<?, ?>>(); List<CommandData<?, ?>> commands = new ArrayList<CommandData<?, ?>>();
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
@ -83,6 +101,8 @@ public class RedisClientTest {
for (CommandData<?, ?> commandData : commands) { for (CommandData<?, ?> commandData : commands) {
commandData.getPromise().get(); commandData.getPromise().get();
} }
conn.sync(RedisCommands.FLUSHDB);
} }
} }

Loading…
Cancel
Save