Decoder state introduced. #183

pull/243/head
Nikita 10 years ago
parent 6b76cd8c7c
commit d06b9ba964

@ -50,7 +50,7 @@ import io.netty.util.CharsetUtil;
* @author Nikita Koksharov
*
*/
public class CommandDecoder extends ReplayingDecoder<DecoderState> {
public class CommandDecoder extends ReplayingDecoder<State> {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -68,7 +68,7 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
if (data == null) {
currentDecoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
};
@ -78,12 +78,14 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
}
if (state() == null) {
state(new State());
}
state().setDecoderState(null);
if (data == null) {
decode(in, null, null, ctx.channel(), currentDecoder);
} else if (data instanceof CommandData) {
// if (state() == null) {
// state(new DecoderState());
// }
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try {
// if (state().getSize() > 0) {
@ -97,12 +99,7 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
} else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data;
int i = 0;
if (state() != null) {
i = state().getIndex();
} else {
state(new DecoderState());
}
int i = state().getIndex();
while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null;
@ -155,7 +152,7 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
ByteBuf buf = readBytes(in);
Object result = null;
if (buf != null) {
result = decoder(data, parts, currentDecoder).decode(buf);
result = decoder(data, parts, currentDecoder).decode(buf, state());
}
handleResult(data, parts, result, false);
} else if (code == '*') {
@ -178,7 +175,7 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
decode(in, data, respParts, channel, currentDecoder);
}
Object result = messageDecoder(data, respParts).decode(respParts);
Object result = messageDecoder(data, respParts).decode(respParts, state());
if (result instanceof PubSubStatusMessage) {
if (parts == null) {
parts = new ArrayList<Object>();
@ -265,7 +262,7 @@ public class CommandDecoder extends ReplayingDecoder<DecoderState> {
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (parts != null) {
MultiDecoder<Object> multiDecoder = data.getCommand().getReplayMultiDecoder();
if (multiDecoder.isApplicable(parts.size())) {
if (multiDecoder.isApplicable(parts.size(), state())) {
decoder = multiDecoder;
}
}

@ -2,14 +2,15 @@ package org.redisson.client.handler;
import java.util.List;
public class DecoderState {
public class State {
private int index;
private Object decoderState;
private long size;
private List<Object> respParts;
public DecoderState() {
public State() {
super();
}
@ -34,5 +35,11 @@ public class DecoderState {
return index;
}
public <T> T getDecoderState() {
return (T)decoderState;
}
public void setDecoderState(Object decoderState) {
this.decoderState = decoderState;
}
}

@ -17,10 +17,12 @@ package org.redisson.client.protocol;
import java.io.IOException;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public interface Decoder<R> {
R decode(ByteBuf buf) throws IOException;
R decode(ByteBuf buf, State state) throws IOException;
}

@ -15,6 +15,8 @@
*/
package org.redisson.client.protocol;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -26,7 +28,7 @@ public class LongCodec extends StringCodec {
public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
};

@ -17,6 +17,8 @@ package org.redisson.client.protocol;
import java.io.IOException;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -28,7 +30,7 @@ public class StringCodec implements Codec {
public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
};

@ -17,20 +17,22 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class KeyValueObjectDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
}
@Override
public Object decode(List<Object> parts) {
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
@ -38,7 +40,7 @@ public class KeyValueObjectDecoder implements MultiDecoder<Object> {
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}

@ -17,23 +17,25 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class ListScanResultReplayDecoder implements MultiDecoder<ListScanResult<Object>> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
@Override
public ListScanResult<Object> decode(List<Object> parts) {
public ListScanResult<Object> decode(List<Object> parts, State state) {
return new ListScanResult<Object>((Long)parts.get(0), (List<Object>)parts.get(1));
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}

@ -18,23 +18,25 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class MapScanResultReplayDecoder implements MultiDecoder<MapScanResult<Object, Object>> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
@Override
public MapScanResult<Object, Object> decode(List<Object> parts) {
public MapScanResult<Object, Object> decode(List<Object> parts, State state) {
return new MapScanResult<Object, Object>((Long)parts.get(0), (Map<Object, Object>)parts.get(1));
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}

@ -17,12 +17,13 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
public interface MultiDecoder<T> extends Decoder<Object> {
boolean isApplicable(int paramNum);
boolean isApplicable(int paramNum, State state);
T decode(List<Object> parts);
T decode(List<Object> parts, State state);
}

@ -21,24 +21,36 @@ import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
private final MultiDecoder<Object> firstDecoder;
private final MultiDecoder<Object> secondDecoder;
public static class DecoderState {
Deque<MultiDecoder<?>> decoders;
Deque<MultiDecoder<?>> flipDecoders;
public DecoderState(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
super();
this.decoders = new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder));
this.flipDecoders = new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder));
}
public Deque<MultiDecoder<?>> getDecoders() {
return decoders;
}
private ThreadLocal<Deque<MultiDecoder<?>>> decoders = new ThreadLocal<Deque<MultiDecoder<?>>>() {
protected Deque<MultiDecoder<?>> initialValue() {
return new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder));
};
};
public Deque<MultiDecoder<?>> getFlipDecoders() {
return flipDecoders;
}
}
private ThreadLocal<Deque<MultiDecoder<?>>> flipDecoders = new ThreadLocal<Deque<MultiDecoder<?>>>() {
protected Deque<MultiDecoder<?>> initialValue() {
return new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder));
};
};
private final MultiDecoder<Object> firstDecoder;
private final MultiDecoder<Object> secondDecoder;
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
this.firstDecoder = firstDecoder;
@ -46,35 +58,33 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
}
@Override
public Object decode(ByteBuf buf) throws IOException {
return flipDecoders.get().peek().decode(buf);
public Object decode(ByteBuf buf, State state) throws IOException {
DecoderState ds = getDecoder(state);
return ds.getFlipDecoders().peek().decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
DecoderState ds = getDecoder(state);
if (paramNum == 0) {
flipDecoders.get().poll();
// in case of incoming buffer tail
// state should be reseted
if (flipDecoders.get().isEmpty()) {
flipDecoders.remove();
decoders.remove();
flipDecoders.get().poll();
}
ds.getFlipDecoders().poll();
}
return flipDecoders.get().peek().isApplicable(paramNum);
return ds.getFlipDecoders().peek().isApplicable(paramNum, state);
}
@Override
public Object decode(List<Object> parts) {
Object result = decoders.get().poll().decode(parts);
// clear state on last decoding
if (decoders.get().isEmpty()) {
flipDecoders.remove();
decoders.remove();
private DecoderState getDecoder(State state) {
DecoderState ds = state.getDecoderState();
if (ds == null) {
ds = new DecoderState(firstDecoder, secondDecoder);
state.setDecoderState(ds);
}
return result;
return ds;
}
@Override
public Object decode(List<Object> parts, State state) {
DecoderState ds = getDecoder(state);
return ds.getDecoders().poll().decode(parts, state);
}
}

@ -17,22 +17,24 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class ObjectListReplayDecoder implements MultiDecoder<List<Object>> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public List<Object> decode(List<Object> parts) {
public List<Object> decode(List<Object> parts, State state) {
return parts;
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return false;
}

@ -19,17 +19,19 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public Map<Object, Object> decode(List<Object> parts) {
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new HashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
@ -40,7 +42,7 @@ public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>>
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return false;
}

@ -19,22 +19,24 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class ObjectSetReplayDecoder implements MultiDecoder<Set<Object>> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public Set<Object> decode(List<Object> parts) {
public Set<Object> decode(List<Object> parts, State state) {
return new HashSet<Object>(parts);
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return false;
}

@ -15,6 +15,7 @@
*/
package org.redisson.client.protocol.decoder;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
@ -23,7 +24,7 @@ import io.netty.util.CharsetUtil;
public class StringDataDecoder implements Decoder<String> {
@Override
public String decode(ByteBuf buf) {
public String decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}

@ -18,23 +18,25 @@ package org.redisson.client.protocol.decoder;
import java.util.Arrays;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringListReplayDecoder implements MultiDecoder<List<String>> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
@Override
public List<String> decode(List<Object> parts) {
public List<String> decode(List<Object> parts, State state) {
return Arrays.asList(Arrays.copyOf(parts.toArray(), parts.size(), String[].class));
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return true;
}

@ -21,18 +21,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringMapReplayDecoder implements MultiDecoder<List<Map<String, String>>> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
@Override
public List<Map<String, String>> decode(List<Object> parts) {
public List<Map<String, String>> decode(List<Object> parts, State state) {
// TODO refactor
if (!parts.isEmpty()) {
if (parts.get(0) instanceof List) {
@ -55,7 +57,7 @@ public class StringMapReplayDecoder implements MultiDecoder<List<Map<String, Str
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return true;
}

@ -15,6 +15,7 @@
*/
package org.redisson.client.protocol.decoder;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
@ -23,7 +24,7 @@ import io.netty.util.CharsetUtil;
public class StringObjectDecoder implements Decoder<Object> {
@Override
public String decode(ByteBuf buf) {
public String decode(ByteBuf buf, State state) {
String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;

@ -15,6 +15,7 @@
*/
package org.redisson.client.protocol.decoder;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
@ -23,7 +24,7 @@ import io.netty.util.CharsetUtil;
public class StringReplayDecoder implements Decoder<String> {
@Override
public String decode(ByteBuf buf) {
public String decode(ByteBuf buf, State state) {
String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;

@ -18,6 +18,7 @@ package org.redisson.client.protocol.pubsub;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
@ -33,17 +34,17 @@ public class PubSubMessageDecoder implements MultiDecoder<Object> {
}
@Override
public Object decode(ByteBuf buf) throws IOException {
return decoder.decode(buf);
public Object decode(ByteBuf buf, State state) throws IOException {
return decoder.decode(buf, null);
}
@Override
public PubSubMessage<Object> decode(List<Object> parts) {
public PubSubMessage<Object> decode(List<Object> parts, State state) {
return new PubSubMessage<Object>(parts.get(1).toString(), parts.get(2));
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return true;
}

@ -18,6 +18,7 @@ package org.redisson.client.protocol.pubsub;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
@ -33,17 +34,17 @@ public class PubSubPatternMessageDecoder implements MultiDecoder<Object> {
}
@Override
public Object decode(ByteBuf buf) throws IOException {
return decoder.decode(buf);
public Object decode(ByteBuf buf, State state) throws IOException {
return decoder.decode(buf, null);
}
@Override
public PubSubPatternMessage decode(List<Object> parts) {
public PubSubPatternMessage decode(List<Object> parts, State state) {
return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3));
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return true;
}

@ -17,6 +17,7 @@ package org.redisson.client.protocol.pubsub;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
@ -25,19 +26,19 @@ import io.netty.util.CharsetUtil;
public class PubSubStatusDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
}
@Override
public PubSubStatusMessage decode(List<Object> parts) {
public PubSubStatusMessage decode(List<Object> parts, State state) {
return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString());
}
@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return true;
}

@ -17,6 +17,7 @@ package org.redisson.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
@ -96,7 +97,7 @@ public class JsonJacksonCodec implements Codec {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) throws IOException {
public Object decode(ByteBuf buf, State state) throws IOException {
return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class);
}
};

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
@ -130,7 +131,7 @@ public class KryoCodec implements Codec {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) throws IOException {
public Object decode(ByteBuf buf, State state) throws IOException {
Kryo kryo = null;
try {
kryo = kryoPool.get();

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
@ -58,7 +59,7 @@ public class SerializationCodec implements Codec {
public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) throws IOException {
public Object decode(ByteBuf buf, State state) throws IOException {
try {
ObjectInputStream inputStream = new ObjectInputStream(new ByteBufInputStream(buf));
return inputStream.readObject();

Loading…
Cancel
Save