diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index eb518b494..4b1ed1302 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -50,7 +50,7 @@ import io.netty.util.CharsetUtil; * @author Nikita Koksharov * */ -public class CommandDecoder extends ReplayingDecoder { +public class CommandDecoder extends ReplayingDecoder { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -68,7 +68,7 @@ public class CommandDecoder extends ReplayingDecoder { if (data == null) { currentDecoder = new Decoder() { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { return buf.toString(CharsetUtil.UTF_8); } }; @@ -78,12 +78,14 @@ public class CommandDecoder extends ReplayingDecoder { log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); } + if (state() == null) { + state(new State()); + } + state().setDecoderState(null); + if (data == null) { decode(in, null, null, ctx.channel(), currentDecoder); } else if (data instanceof CommandData) { -// if (state() == null) { -// state(new DecoderState()); -// } CommandData cmd = (CommandData)data; try { // if (state().getSize() > 0) { @@ -97,12 +99,7 @@ public class CommandDecoder extends ReplayingDecoder { } else if (data instanceof CommandsData) { CommandsData commands = (CommandsData)data; - int i = 0; - if (state() != null) { - i = state().getIndex(); - } else { - state(new DecoderState()); - } + int i = state().getIndex(); while (in.writerIndex() > in.readerIndex()) { CommandData cmd = null; @@ -155,7 +152,7 @@ public class CommandDecoder extends ReplayingDecoder { ByteBuf buf = readBytes(in); Object result = null; if (buf != null) { - result = decoder(data, parts, currentDecoder).decode(buf); + result = decoder(data, parts, currentDecoder).decode(buf, state()); } handleResult(data, parts, result, false); } else if (code == '*') { @@ -178,7 +175,7 @@ public class CommandDecoder extends ReplayingDecoder { decode(in, data, respParts, channel, currentDecoder); } - Object result = messageDecoder(data, respParts).decode(respParts); + Object result = messageDecoder(data, respParts).decode(respParts, state()); if (result instanceof PubSubStatusMessage) { if (parts == null) { parts = new ArrayList(); @@ -265,7 +262,7 @@ public class CommandDecoder extends ReplayingDecoder { Decoder decoder = data.getCommand().getReplayDecoder(); if (parts != null) { MultiDecoder multiDecoder = data.getCommand().getReplayMultiDecoder(); - if (multiDecoder.isApplicable(parts.size())) { + if (multiDecoder.isApplicable(parts.size(), state())) { decoder = multiDecoder; } } diff --git a/src/main/java/org/redisson/client/handler/DecoderState.java b/src/main/java/org/redisson/client/handler/State.java similarity index 70% rename from src/main/java/org/redisson/client/handler/DecoderState.java rename to src/main/java/org/redisson/client/handler/State.java index 2bc20d8a1..ebf810cec 100644 --- a/src/main/java/org/redisson/client/handler/DecoderState.java +++ b/src/main/java/org/redisson/client/handler/State.java @@ -2,14 +2,15 @@ package org.redisson.client.handler; import java.util.List; -public class DecoderState { +public class State { private int index; + private Object decoderState; private long size; private List respParts; - public DecoderState() { + public State() { super(); } @@ -34,5 +35,11 @@ public class DecoderState { return index; } + public T getDecoderState() { + return (T)decoderState; + } + public void setDecoderState(Object decoderState) { + this.decoderState = decoderState; + } } diff --git a/src/main/java/org/redisson/client/protocol/Decoder.java b/src/main/java/org/redisson/client/protocol/Decoder.java index 41aff8bd6..76e44d596 100644 --- a/src/main/java/org/redisson/client/protocol/Decoder.java +++ b/src/main/java/org/redisson/client/protocol/Decoder.java @@ -17,10 +17,12 @@ package org.redisson.client.protocol; import java.io.IOException; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; public interface Decoder { - R decode(ByteBuf buf) throws IOException; + R decode(ByteBuf buf, State state) throws IOException; } diff --git a/src/main/java/org/redisson/client/protocol/LongCodec.java b/src/main/java/org/redisson/client/protocol/LongCodec.java index b8478973c..f90e98c9b 100644 --- a/src/main/java/org/redisson/client/protocol/LongCodec.java +++ b/src/main/java/org/redisson/client/protocol/LongCodec.java @@ -15,6 +15,8 @@ */ package org.redisson.client.protocol; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; @@ -26,7 +28,7 @@ public class LongCodec extends StringCodec { public Decoder getValueDecoder() { return new Decoder() { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); } }; diff --git a/src/main/java/org/redisson/client/protocol/StringCodec.java b/src/main/java/org/redisson/client/protocol/StringCodec.java index 0fbde3643..a6a038a94 100644 --- a/src/main/java/org/redisson/client/protocol/StringCodec.java +++ b/src/main/java/org/redisson/client/protocol/StringCodec.java @@ -17,6 +17,8 @@ package org.redisson.client.protocol; import java.io.IOException; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; @@ -28,7 +30,7 @@ public class StringCodec implements Codec { public Decoder getValueDecoder() { return new Decoder() { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { return buf.toString(CharsetUtil.UTF_8); } }; diff --git a/src/main/java/org/redisson/client/protocol/decoder/KeyValueObjectDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/KeyValueObjectDecoder.java index 36df2ca78..614e9b8df 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/KeyValueObjectDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/KeyValueObjectDecoder.java @@ -17,20 +17,22 @@ package org.redisson.client.protocol.decoder; import java.util.List; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; public class KeyValueObjectDecoder implements MultiDecoder { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { String status = buf.toString(CharsetUtil.UTF_8); buf.skipBytes(2); return status; } @Override - public Object decode(List parts) { + public Object decode(List parts, State state) { if (parts.isEmpty()) { return null; } @@ -38,7 +40,7 @@ public class KeyValueObjectDecoder implements MultiDecoder { } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return paramNum == 0; } diff --git a/src/main/java/org/redisson/client/protocol/decoder/ListScanResultReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ListScanResultReplayDecoder.java index f32512859..d78e19c64 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ListScanResultReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ListScanResultReplayDecoder.java @@ -17,23 +17,25 @@ package org.redisson.client.protocol.decoder; import java.util.List; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; public class ListScanResultReplayDecoder implements MultiDecoder> { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); } @Override - public ListScanResult decode(List parts) { + public ListScanResult decode(List parts, State state) { return new ListScanResult((Long)parts.get(0), (List)parts.get(1)); } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return paramNum == 0; } diff --git a/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java index e9779d154..3a0185ffb 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java @@ -18,23 +18,25 @@ package org.redisson.client.protocol.decoder; import java.util.List; import java.util.Map; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; public class MapScanResultReplayDecoder implements MultiDecoder> { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); } @Override - public MapScanResult decode(List parts) { + public MapScanResult decode(List parts, State state) { return new MapScanResult((Long)parts.get(0), (Map)parts.get(1)); } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return paramNum == 0; } diff --git a/src/main/java/org/redisson/client/protocol/decoder/MultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/MultiDecoder.java index 846596d02..162947f55 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/MultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/MultiDecoder.java @@ -17,12 +17,13 @@ package org.redisson.client.protocol.decoder; import java.util.List; +import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; public interface MultiDecoder extends Decoder { - boolean isApplicable(int paramNum); + boolean isApplicable(int paramNum, State state); - T decode(List parts); + T decode(List parts, State state); } diff --git a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java index 668e57121..f8ac24ff1 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java @@ -21,24 +21,36 @@ import java.util.Arrays; import java.util.Deque; import java.util.List; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; public class NestedMultiDecoder implements MultiDecoder { - private final MultiDecoder firstDecoder; - private final MultiDecoder secondDecoder; + public static class DecoderState { + + Deque> decoders; + + Deque> flipDecoders; + + public DecoderState(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { + super(); + this.decoders = new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder)); + this.flipDecoders = new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder)); + } + + public Deque> getDecoders() { + return decoders; + } - private ThreadLocal>> decoders = new ThreadLocal>>() { - protected Deque> initialValue() { - return new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder)); - }; - }; + public Deque> getFlipDecoders() { + return flipDecoders; + } + + } - private ThreadLocal>> flipDecoders = new ThreadLocal>>() { - protected Deque> initialValue() { - return new ArrayDeque>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder)); - }; - }; + private final MultiDecoder firstDecoder; + private final MultiDecoder secondDecoder; public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder) { this.firstDecoder = firstDecoder; @@ -46,35 +58,33 @@ public class NestedMultiDecoder implements MultiDecoder { } @Override - public Object decode(ByteBuf buf) throws IOException { - return flipDecoders.get().peek().decode(buf); + public Object decode(ByteBuf buf, State state) throws IOException { + DecoderState ds = getDecoder(state); + return ds.getFlipDecoders().peek().decode(buf, state); } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { + DecoderState ds = getDecoder(state); if (paramNum == 0) { - flipDecoders.get().poll(); - // in case of incoming buffer tail - // state should be reseted - if (flipDecoders.get().isEmpty()) { - flipDecoders.remove(); - decoders.remove(); - - flipDecoders.get().poll(); - } + ds.getFlipDecoders().poll(); } - return flipDecoders.get().peek().isApplicable(paramNum); + return ds.getFlipDecoders().peek().isApplicable(paramNum, state); } - @Override - public Object decode(List parts) { - Object result = decoders.get().poll().decode(parts); - // clear state on last decoding - if (decoders.get().isEmpty()) { - flipDecoders.remove(); - decoders.remove(); + private DecoderState getDecoder(State state) { + DecoderState ds = state.getDecoderState(); + if (ds == null) { + ds = new DecoderState(firstDecoder, secondDecoder); + state.setDecoderState(ds); } - return result; + return ds; + } + + @Override + public Object decode(List parts, State state) { + DecoderState ds = getDecoder(state); + return ds.getDecoders().poll().decode(parts, state); } } diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java index 762c4e7be..1e029fa0f 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java @@ -17,22 +17,24 @@ package org.redisson.client.protocol.decoder; import java.util.List; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; public class ObjectListReplayDecoder implements MultiDecoder> { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { throw new UnsupportedOperationException(); } @Override - public List decode(List parts) { + public List decode(List parts, State state) { return parts; } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return false; } diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java index 3f6da5735..b5ac7c493 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java @@ -19,17 +19,19 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; public class ObjectMapReplayDecoder implements MultiDecoder> { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { throw new UnsupportedOperationException(); } @Override - public Map decode(List parts) { + public Map decode(List parts, State state) { Map result = new HashMap(parts.size()/2); for (int i = 0; i < parts.size(); i++) { if (i % 2 != 0) { @@ -40,7 +42,7 @@ public class ObjectMapReplayDecoder implements MultiDecoder> } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return false; } diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectSetReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectSetReplayDecoder.java index 589b90014..e6b17b62b 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/ObjectSetReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectSetReplayDecoder.java @@ -19,22 +19,24 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; public class ObjectSetReplayDecoder implements MultiDecoder> { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { throw new UnsupportedOperationException(); } @Override - public Set decode(List parts) { + public Set decode(List parts, State state) { return new HashSet(parts); } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return false; } diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringDataDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringDataDecoder.java index 8407adc53..d849a5f38 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringDataDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringDataDecoder.java @@ -15,6 +15,7 @@ */ package org.redisson.client.protocol.decoder; +import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import io.netty.buffer.ByteBuf; @@ -23,7 +24,7 @@ import io.netty.util.CharsetUtil; public class StringDataDecoder implements Decoder { @Override - public String decode(ByteBuf buf) { + public String decode(ByteBuf buf, State state) { return buf.toString(CharsetUtil.UTF_8); } diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java index 128ce6ac3..2415299e7 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java @@ -18,23 +18,25 @@ package org.redisson.client.protocol.decoder; import java.util.Arrays; import java.util.List; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; public class StringListReplayDecoder implements MultiDecoder> { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { return buf.toString(CharsetUtil.UTF_8); } @Override - public List decode(List parts) { + public List decode(List parts, State state) { return Arrays.asList(Arrays.copyOf(parts.toArray(), parts.size(), String[].class)); } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return true; } diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java index 95d54e99e..ec9d8b5cb 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java @@ -21,18 +21,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.redisson.client.handler.State; + import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; public class StringMapReplayDecoder implements MultiDecoder>> { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { return buf.toString(CharsetUtil.UTF_8); } @Override - public List> decode(List parts) { + public List> decode(List parts, State state) { // TODO refactor if (!parts.isEmpty()) { if (parts.get(0) instanceof List) { @@ -55,7 +57,7 @@ public class StringMapReplayDecoder implements MultiDecoder { @Override - public String decode(ByteBuf buf) { + public String decode(ByteBuf buf, State state) { String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); buf.skipBytes(2); return status; diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringReplayDecoder.java index 938aff8c2..499c3f343 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringReplayDecoder.java @@ -15,6 +15,7 @@ */ package org.redisson.client.protocol.decoder; +import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import io.netty.buffer.ByteBuf; @@ -23,7 +24,7 @@ import io.netty.util.CharsetUtil; public class StringReplayDecoder implements Decoder { @Override - public String decode(ByteBuf buf) { + public String decode(ByteBuf buf, State state) { String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); buf.skipBytes(2); return status; diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java index 312474a16..75323f935 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java @@ -18,6 +18,7 @@ package org.redisson.client.protocol.pubsub; import java.io.IOException; import java.util.List; +import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.decoder.MultiDecoder; @@ -33,17 +34,17 @@ public class PubSubMessageDecoder implements MultiDecoder { } @Override - public Object decode(ByteBuf buf) throws IOException { - return decoder.decode(buf); + public Object decode(ByteBuf buf, State state) throws IOException { + return decoder.decode(buf, null); } @Override - public PubSubMessage decode(List parts) { + public PubSubMessage decode(List parts, State state) { return new PubSubMessage(parts.get(1).toString(), parts.get(2)); } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return true; } diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java index a6bf4bf4b..956517736 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java @@ -18,6 +18,7 @@ package org.redisson.client.protocol.pubsub; import java.io.IOException; import java.util.List; +import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.decoder.MultiDecoder; @@ -33,17 +34,17 @@ public class PubSubPatternMessageDecoder implements MultiDecoder { } @Override - public Object decode(ByteBuf buf) throws IOException { - return decoder.decode(buf); + public Object decode(ByteBuf buf, State state) throws IOException { + return decoder.decode(buf, null); } @Override - public PubSubPatternMessage decode(List parts) { + public PubSubPatternMessage decode(List parts, State state) { return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3)); } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return true; } diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java index 81adecf92..8d75d73c7 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java @@ -17,6 +17,7 @@ package org.redisson.client.protocol.pubsub; import java.util.List; +import org.redisson.client.handler.State; import org.redisson.client.protocol.decoder.MultiDecoder; import io.netty.buffer.ByteBuf; @@ -25,19 +26,19 @@ import io.netty.util.CharsetUtil; public class PubSubStatusDecoder implements MultiDecoder { @Override - public Object decode(ByteBuf buf) { + public Object decode(ByteBuf buf, State state) { String status = buf.toString(CharsetUtil.UTF_8); buf.skipBytes(2); return status; } @Override - public PubSubStatusMessage decode(List parts) { + public PubSubStatusMessage decode(List parts, State state) { return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString()); } @Override - public boolean isApplicable(int paramNum) { + public boolean isApplicable(int paramNum, State state) { return true; } diff --git a/src/main/java/org/redisson/codec/JsonJacksonCodec.java b/src/main/java/org/redisson/codec/JsonJacksonCodec.java index 9a71b64b9..54b53345c 100755 --- a/src/main/java/org/redisson/codec/JsonJacksonCodec.java +++ b/src/main/java/org/redisson/codec/JsonJacksonCodec.java @@ -17,6 +17,7 @@ package org.redisson.codec; import java.io.IOException; +import org.redisson.client.handler.State; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; @@ -96,7 +97,7 @@ public class JsonJacksonCodec implements Codec { return new Decoder() { @Override - public Object decode(ByteBuf buf) throws IOException { + public Object decode(ByteBuf buf, State state) throws IOException { return mapObjectMapper.readValue(new ByteBufInputStream(buf), Object.class); } }; diff --git a/src/main/java/org/redisson/codec/KryoCodec.java b/src/main/java/org/redisson/codec/KryoCodec.java index 633db6599..5ba7ee820 100755 --- a/src/main/java/org/redisson/codec/KryoCodec.java +++ b/src/main/java/org/redisson/codec/KryoCodec.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import org.redisson.client.handler.State; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; @@ -130,7 +131,7 @@ public class KryoCodec implements Codec { return new Decoder() { @Override - public Object decode(ByteBuf buf) throws IOException { + public Object decode(ByteBuf buf, State state) throws IOException { Kryo kryo = null; try { kryo = kryoPool.get(); diff --git a/src/main/java/org/redisson/codec/SerializationCodec.java b/src/main/java/org/redisson/codec/SerializationCodec.java index 34965f5dd..45b5a6859 100644 --- a/src/main/java/org/redisson/codec/SerializationCodec.java +++ b/src/main/java/org/redisson/codec/SerializationCodec.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import org.redisson.client.handler.State; import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; @@ -58,7 +59,7 @@ public class SerializationCodec implements Codec { public Decoder getValueDecoder() { return new Decoder() { @Override - public Object decode(ByteBuf buf) throws IOException { + public Object decode(ByteBuf buf, State state) throws IOException { try { ObjectInputStream inputStream = new ObjectInputStream(new ByteBufInputStream(buf)); return inputStream.readObject();