diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java index 725e3d727..d276275fd 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -113,7 +113,7 @@ public class RedisDecoder extends ReplayingDecoder { decode(in, data, respParts, pubSubConnection, currentDecoder); } - Object result = messageDecoder(data, respParts).get().decode(respParts); + Object result = messageDecoder(data, respParts).decode(respParts); handleMultiResult(data, parts, pubSubConnection, result); } else { throw new IllegalStateException("Can't decode replay " + (char)code); diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 6e73abf1d..1967f7991 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -22,7 +22,7 @@ import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.BooleanStatusReplayDecoder; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; -import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder2; +import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; @@ -80,7 +80,7 @@ public interface RedisCommands { RedisCommand HSET = new RedisCommand("HSET", new BooleanReplayConvertor(), 2, ValueType.MAP); RedisStrictCommand HINCRBYFLOAT = new RedisStrictCommand("HINCRBYFLOAT"); - RedisCommand> HSCAN = new RedisCommand>("HSCAN", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder2()), ValueType.MAP); + RedisCommand> HSCAN = new RedisCommand>("HSCAN", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); RedisCommand> HGETALL = new RedisCommand>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP); RedisCommand> HVALS = new RedisCommand>("HVALS", new ObjectListReplayDecoder(), ValueType.MAP_VALUE); RedisCommand HEXISTS = new RedisCommand("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY); diff --git a/src/main/java/org/redisson/client/protocol/decoder/KeyValueObjectDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/KeyValueObjectDecoder.java index d6c1b4b56..4f8603457 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/KeyValueObjectDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/KeyValueObjectDecoder.java @@ -7,10 +7,6 @@ import io.netty.util.CharsetUtil; public class KeyValueObjectDecoder implements MultiDecoder { - public MultiDecoder get() { - return (MultiDecoder) this; - } - @Override public Object decode(ByteBuf buf) { String status = buf.toString(CharsetUtil.UTF_8); diff --git a/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java index 807202c7d..6018816c2 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java @@ -10,21 +10,8 @@ import io.netty.util.CharsetUtil; public class MapScanResultReplayDecoder implements MultiDecoder> { - ThreadLocal> currentMultiDecoder = new ThreadLocal>(); - ThreadLocal posParsed = new ThreadLocal(); - ObjectMapReplayDecoder nextDecoder = new ObjectMapReplayDecoder(); - - public MultiDecoder get() { - if (currentMultiDecoder.get() == null) { - currentMultiDecoder.set(nextDecoder); - return nextDecoder; - } - return (MultiDecoder) this; - } - @Override public Object decode(ByteBuf buf) { - posParsed.set(true); return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); } @@ -35,7 +22,7 @@ public class MapScanResultReplayDecoder implements MultiDecoder> { - - public MultiDecoder get() { - return (MultiDecoder) this; - } - - @Override - public Object decode(ByteBuf buf) { - return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); - } - - @Override - public MapScanResult decode(List parts) { - return new MapScanResult((Long)parts.get(0), (Map)parts.get(1)); - } - - @Override - public boolean isApplicable(int paramNum) { - return paramNum == 0; - } - -} diff --git a/src/main/java/org/redisson/client/protocol/decoder/MultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/MultiDecoder.java index 0ee02b2b1..846596d02 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/MultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/MultiDecoder.java @@ -21,8 +21,6 @@ import org.redisson.client.protocol.Decoder; public interface MultiDecoder extends Decoder { - MultiDecoder get(); - boolean isApplicable(int paramNum); T decode(List parts); diff --git a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java index ae0aaaaee..aca337880 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java @@ -13,46 +13,53 @@ public class NestedMultiDecoder implements MultiDecoder { private final MultiDecoder firstDecoder; private final MultiDecoder secondDecoder; - private Deque> iterator; - private Deque> flipIterator; + private ThreadLocal>> decoders = new ThreadLocal>>() { + protected Deque> initialValue() { + return new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder)); + }; + }; + + private ThreadLocal>> flipDecoders = new ThreadLocal>>() { + protected Deque> initialValue() { + return new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder)); + }; + }; public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { this.firstDecoder = firstDecoder; this.secondDecoder = secondDecoder; - - init(firstDecoder, secondDecoder); - } - - private void init(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { - iterator = new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder)); - flipIterator = new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder)); } @Override public Object decode(ByteBuf buf) throws IOException { - return flipIterator.peek().decode(buf); - } - - @Override - public MultiDecoder get() { - return this; + return flipDecoders.get().peek().decode(buf); } @Override public boolean isApplicable(int paramNum) { if (paramNum == 0) { - flipIterator.poll(); - if (flipIterator.isEmpty()) { - init(firstDecoder, secondDecoder); - flipIterator.poll(); + flipDecoders.get().poll(); + // in case of incoming buffer tail + // state should be reseted + if (flipDecoders.get().isEmpty()) { + flipDecoders.remove(); + decoders.remove(); + + flipDecoders.get().poll(); } } - return flipIterator.peek().isApplicable(paramNum); + return flipDecoders.get().peek().isApplicable(paramNum); } @Override public Object decode(List parts) { - return iterator.poll().decode(parts); + Object result = decoders.get().poll().decode(parts); + // clear state on last decoding + if (decoders.get().isEmpty()) { + flipDecoders.remove(); + decoders.remove(); + } + return result; } } diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java index 80ab0caf9..9883929f8 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java @@ -21,9 +21,4 @@ public class ObjectListReplayDecoder implements MultiDecoder> { return false; } - @Override - public MultiDecoder get() { - return this; - } - } diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java index 7788f3aa3..75fff9cde 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java @@ -8,11 +8,6 @@ import io.netty.buffer.ByteBuf; public class ObjectMapReplayDecoder implements MultiDecoder> { - @Override - public MultiDecoder get() { - return this; - } - @Override public Object decode(ByteBuf buf) { throw new UnsupportedOperationException(); diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java index ef852e38b..29d9e76d8 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java @@ -8,11 +8,6 @@ import io.netty.util.CharsetUtil; public class StringListReplayDecoder implements MultiDecoder> { - @Override - public MultiDecoder get() { - return this; - } - @Override public Object decode(ByteBuf buf) { return buf.toString(CharsetUtil.UTF_8); diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java index f39b96d93..cd5afca84 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java @@ -11,11 +11,6 @@ import io.netty.util.CharsetUtil; public class StringMapReplayDecoder implements MultiDecoder>> { - @Override - public MultiDecoder get() { - return this; - } - @Override public Object decode(ByteBuf buf) { return buf.toString(CharsetUtil.UTF_8); diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java index a6032b49e..512217a46 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java @@ -41,9 +41,4 @@ public class PubSubMessageDecoder implements MultiDecoder { return true; } - @Override - public MultiDecoder get() { - return this; - } - } diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java index 18787b997..02fb8aa50 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java @@ -41,9 +41,4 @@ public class PubSubPatternMessageDecoder implements MultiDecoder { return true; } - @Override - public MultiDecoder get() { - return this; - } - } diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java index 5d4b115a8..bdd78c90f 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java @@ -46,9 +46,4 @@ public class PubSubStatusDecoder implements MultiDecoder { return true; } - @Override - public MultiDecoder get() { - return this; - } - }