MultiDecoder refactoring

pull/243/head
Nikita 10 years ago
parent 16741637e5
commit 0b7e3c70bf

@ -113,7 +113,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
decode(in, data, respParts, pubSubConnection, currentDecoder);
}
Object result = messageDecoder(data, respParts).get().decode(respParts);
Object result = messageDecoder(data, respParts).decode(respParts);
handleMultiResult(data, parts, pubSubConnection, result);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);

@ -22,7 +22,7 @@ import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.BooleanStatusReplayDecoder;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder2;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
@ -80,7 +80,7 @@ public interface RedisCommands {
RedisCommand<Boolean> HSET = new RedisCommand<Boolean>("HSET", new BooleanReplayConvertor(), 2, ValueType.MAP);
RedisStrictCommand<String> HINCRBYFLOAT = new RedisStrictCommand<String>("HINCRBYFLOAT");
RedisCommand<MapScanResult<Object, Object>> HSCAN = new RedisCommand<MapScanResult<Object, Object>>("HSCAN", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder2()), ValueType.MAP);
RedisCommand<MapScanResult<Object, Object>> HSCAN = new RedisCommand<MapScanResult<Object, Object>>("HSCAN", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP);
RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder(), ValueType.MAP_VALUE);
RedisCommand<Boolean> HEXISTS = new RedisCommand<Boolean>("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY);

@ -7,10 +7,6 @@ import io.netty.util.CharsetUtil;
public class KeyValueObjectDecoder implements MultiDecoder<Object> {
public MultiDecoder<Object> get() {
return (MultiDecoder<Object>) this;
}
@Override
public Object decode(ByteBuf buf) {
String status = buf.toString(CharsetUtil.UTF_8);

@ -10,21 +10,8 @@ import io.netty.util.CharsetUtil;
public class MapScanResultReplayDecoder implements MultiDecoder<MapScanResult<Object, Object>> {
ThreadLocal<MultiDecoder<?>> currentMultiDecoder = new ThreadLocal<MultiDecoder<?>>();
ThreadLocal<Boolean> posParsed = new ThreadLocal<Boolean>();
ObjectMapReplayDecoder nextDecoder = new ObjectMapReplayDecoder();
public MultiDecoder<?> get() {
if (currentMultiDecoder.get() == null) {
currentMultiDecoder.set(nextDecoder);
return nextDecoder;
}
return (MultiDecoder<?>) this;
}
@Override
public Object decode(ByteBuf buf) {
posParsed.set(true);
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
@ -35,7 +22,7 @@ public class MapScanResultReplayDecoder implements MultiDecoder<MapScanResult<Ob
@Override
public boolean isApplicable(int paramNum) {
return paramNum == 0 && posParsed.get() == null;
return paramNum == 0;
}
}

@ -1,32 +0,0 @@
package org.redisson.client.protocol.decoder;
import java.util.List;
import java.util.Map;
import com.lambdaworks.redis.output.MapScanResult;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class MapScanResultReplayDecoder2 implements MultiDecoder<MapScanResult<Object, Object>> {
public MultiDecoder<?> get() {
return (MultiDecoder<?>) this;
}
@Override
public Object decode(ByteBuf buf) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
@Override
public MapScanResult<Object, Object> decode(List<Object> parts) {
return new MapScanResult<Object, Object>((Long)parts.get(0), (Map<Object, Object>)parts.get(1));
}
@Override
public boolean isApplicable(int paramNum) {
return paramNum == 0;
}
}

@ -21,8 +21,6 @@ import org.redisson.client.protocol.Decoder;
public interface MultiDecoder<T> extends Decoder<Object> {
MultiDecoder<?> get();
boolean isApplicable(int paramNum);
T decode(List<Object> parts);

@ -13,46 +13,53 @@ public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
private final MultiDecoder<Object> firstDecoder;
private final MultiDecoder<Object> secondDecoder;
private Deque<MultiDecoder<?>> iterator;
private Deque<MultiDecoder<?>> flipIterator;
private ThreadLocal<Deque<MultiDecoder<?>>> decoders = new ThreadLocal<Deque<MultiDecoder<?>>>() {
protected Deque<MultiDecoder<?>> initialValue() {
return new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder));
};
};
private ThreadLocal<Deque<MultiDecoder<?>>> flipDecoders = new ThreadLocal<Deque<MultiDecoder<?>>>() {
protected Deque<MultiDecoder<?>> initialValue() {
return new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder));
};
};
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
this.firstDecoder = firstDecoder;
this.secondDecoder = secondDecoder;
init(firstDecoder, secondDecoder);
}
private void init(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
iterator = new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder));
flipIterator = new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder));
}
@Override
public Object decode(ByteBuf buf) throws IOException {
return flipIterator.peek().decode(buf);
}
@Override
public MultiDecoder<?> get() {
return this;
return flipDecoders.get().peek().decode(buf);
}
@Override
public boolean isApplicable(int paramNum) {
if (paramNum == 0) {
flipIterator.poll();
if (flipIterator.isEmpty()) {
init(firstDecoder, secondDecoder);
flipIterator.poll();
flipDecoders.get().poll();
// in case of incoming buffer tail
// state should be reseted
if (flipDecoders.get().isEmpty()) {
flipDecoders.remove();
decoders.remove();
flipDecoders.get().poll();
}
}
return flipIterator.peek().isApplicable(paramNum);
return flipDecoders.get().peek().isApplicable(paramNum);
}
@Override
public Object decode(List<Object> parts) {
return iterator.poll().decode(parts);
Object result = decoders.get().poll().decode(parts);
// clear state on last decoding
if (decoders.get().isEmpty()) {
flipDecoders.remove();
decoders.remove();
}
return result;
}
}

@ -21,9 +21,4 @@ public class ObjectListReplayDecoder implements MultiDecoder<List<Object>> {
return false;
}
@Override
public MultiDecoder<?> get() {
return this;
}
}

@ -8,11 +8,6 @@ import io.netty.buffer.ByteBuf;
public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
@Override
public MultiDecoder<?> get() {
return this;
}
@Override
public Object decode(ByteBuf buf) {
throw new UnsupportedOperationException();

@ -8,11 +8,6 @@ import io.netty.util.CharsetUtil;
public class StringListReplayDecoder implements MultiDecoder<List<String>> {
@Override
public MultiDecoder<?> get() {
return this;
}
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);

@ -11,11 +11,6 @@ import io.netty.util.CharsetUtil;
public class StringMapReplayDecoder implements MultiDecoder<List<Map<String, String>>> {
@Override
public MultiDecoder<?> get() {
return this;
}
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);

@ -41,9 +41,4 @@ public class PubSubMessageDecoder implements MultiDecoder<Object> {
return true;
}
@Override
public MultiDecoder<?> get() {
return this;
}
}

@ -41,9 +41,4 @@ public class PubSubPatternMessageDecoder implements MultiDecoder<Object> {
return true;
}
@Override
public MultiDecoder<?> get() {
return this;
}
}

@ -46,9 +46,4 @@ public class PubSubStatusDecoder implements MultiDecoder<PubSubStatusMessage> {
return true;
}
@Override
public MultiDecoder<?> get() {
return this;
}
}

Loading…
Cancel
Save