diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java index bcba87319..10e4671b7 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -172,7 +172,10 @@ public class RedisDecoder extends ReplayingDecoder { Decoder decoder = data.getCommand().getReplayDecoder(); if (parts != null) { - decoder = data.getCommand().getReplayMultiDecoder(); + MultiDecoder multiDecoder = data.getCommand().getReplayMultiDecoder(); + if (multiDecoder.isApplicable(parts.size())) { + decoder = multiDecoder; + } } if (decoder == null) { decoder = data.getCodec(); diff --git a/src/main/java/org/redisson/client/protocol/IntegerCodec.java b/src/main/java/org/redisson/client/protocol/IntegerCodec.java new file mode 100644 index 000000000..9044fe688 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/IntegerCodec.java @@ -0,0 +1,44 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol; + +import java.io.UnsupportedEncodingException; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class IntegerCodec implements Codec { + + public static final IntegerCodec INSTANCE = new IntegerCodec(); + + @Override + public byte[] encode(int paramIndex, Object in) { + try { + return in.toString().getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException(e); + } + } + + @Override + public Object decode(ByteBuf buf) { + if (buf == null) { + return null; + } + return Integer.valueOf(buf.toString(CharsetUtil.UTF_8)); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/JsonCodec.java b/src/main/java/org/redisson/client/protocol/JsonCodec.java new file mode 100644 index 000000000..5a381f748 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/JsonCodec.java @@ -0,0 +1,35 @@ +package org.redisson.client.protocol; + +import java.io.IOException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; + +public class JsonCodec implements Codec { + + public static final JsonCodec INSTANCE = new JsonCodec(); + + private final ObjectMapper mapper = new ObjectMapper(); + + @Override + public byte[] encode(int paramIndex, Object in) { + try { + return mapper.writeValueAsBytes(in); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e); + } + } + + @Override + public Object decode(ByteBuf buf) { + try { + return mapper.readValue(new ByteBufInputStream(buf), Object.class); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/src/main/java/org/redisson/client/protocol/KeyValueMessage.java b/src/main/java/org/redisson/client/protocol/KeyValueMessage.java new file mode 100644 index 000000000..be6312bf3 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/KeyValueMessage.java @@ -0,0 +1,22 @@ +package org.redisson.client.protocol; + +public class KeyValueMessage { + + private K key; + private V value; + + public KeyValueMessage(K key, V value) { + super(); + this.key = key; + this.value = value; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/KeyValueObjectDecoder.java b/src/main/java/org/redisson/client/protocol/KeyValueObjectDecoder.java new file mode 100644 index 000000000..768ad549b --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/KeyValueObjectDecoder.java @@ -0,0 +1,29 @@ +package org.redisson.client.protocol; + +import java.util.List; + +import org.redisson.client.protocol.pubsub.MultiDecoder; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class KeyValueObjectDecoder implements MultiDecoder { + + @Override + public Object decode(ByteBuf buf) { + String status = buf.toString(CharsetUtil.UTF_8); + buf.skipBytes(2); + return status; + } + + @Override + public Object decode(List parts) { + return new KeyValueMessage(parts.get(0), parts.get(1)); + } + + @Override + public boolean isApplicable(int paramNum) { + return paramNum == 0; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index d577be876..e32e92378 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -19,8 +19,9 @@ import java.util.List; import java.util.Map; import org.redisson.client.protocol.decoder.BooleanReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder; -import org.redisson.client.protocol.decoder.StringListObjectReplayDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder; import org.redisson.client.protocol.decoder.StringMapReplayDecoder; import org.redisson.client.protocol.decoder.StringReplayDecoder; @@ -29,8 +30,24 @@ import org.redisson.client.protocol.pubsub.PubSubStatusMessage; public interface RedisCommands { + RedisCommand LPOP = new RedisCommand("LPOP"); + RedisCommand LINDEX = new RedisCommand("LINDEX"); + RedisStrictCommand LLEN = new RedisStrictCommand("LLEN"); + + RedisStrictCommand EXPIRE = new RedisStrictCommand("EXPIRE", new BooleanReplayConvertor()); + RedisStrictCommand EXPIREAT = new RedisStrictCommand("EXPIREAT", new BooleanReplayConvertor()); + RedisStrictCommand PERSIST = new RedisStrictCommand("PERSIST", new BooleanReplayConvertor()); + RedisStrictCommand TTL = new RedisStrictCommand("TTL"); + + RedisCommand BRPOPLPUSH = new RedisCommand("BRPOPLPUSH"); + RedisCommand BLPOP = new RedisCommand("BLPOP", new KeyValueObjectDecoder()); + + RedisCommand RPUSH = new RedisCommand("RPUSH"); + RedisStrictCommand EVAL_BOOLEAN = new RedisStrictCommand("EVAL", new BooleanReplayConvertor()); RedisStrictCommand EVAL_INTEGER = new RedisStrictCommand("EVAL"); + RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); + RedisCommand EVAL_OBJECT = new RedisCommand("EVAL"); RedisStrictCommand INCR = new RedisStrictCommand("INCR"); RedisStrictCommand INCRBY = new RedisStrictCommand("INCRBY"); @@ -44,10 +61,16 @@ public interface RedisCommands { RedisStrictCommand> KEYS = new RedisStrictCommand>("KEYS", new StringListReplayDecoder()); + RedisCommand> HGETALL = new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()); + RedisCommand> HVALS = new RedisCommand>("HVALS", new ObjectListReplayDecoder()); + RedisCommand HEXISTS = new RedisCommand("HEXISTS", new BooleanReplayConvertor(), 1); + RedisStrictCommand HLEN = new RedisStrictCommand("HLEN"); + RedisCommand> HKEYS = new RedisCommand>("HKEYS", new ObjectListReplayDecoder()); RedisCommand HMSET = new RedisCommand("HMSET", new StringReplayDecoder(), 2, 3); - RedisCommand HMGET = new RedisCommand("HMGET", new StringListObjectReplayDecoder(), 2, 3); + RedisCommand> HMGET = new RedisCommand>("HMGET", new ObjectListReplayDecoder()); + RedisCommand HGET = new RedisCommand("HMGET", 2); - RedisStrictCommand DEL_ONE = new RedisStrictCommand("DEL", new BooleanReplayConvertor()); + RedisStrictCommand DEL_BOOLEAN = new RedisStrictCommand("DEL", new BooleanReplayConvertor()); RedisCommand GET = new RedisCommand("GET"); RedisCommand SET = new RedisCommand("SET", new StringReplayDecoder(), 1); diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java new file mode 100644 index 000000000..d052c1ac1 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectListReplayDecoder.java @@ -0,0 +1,26 @@ +package org.redisson.client.protocol.decoder; + +import java.util.List; + +import org.redisson.client.protocol.pubsub.MultiDecoder; + +import io.netty.buffer.ByteBuf; + +public class ObjectListReplayDecoder implements MultiDecoder> { + + @Override + public Object decode(ByteBuf buf) { + throw new UnsupportedOperationException(); + } + + @Override + public List decode(List parts) { + return parts; + } + + @Override + public boolean isApplicable(int paramNum) { + return false; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java new file mode 100644 index 000000000..cea6185bd --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder.java @@ -0,0 +1,34 @@ +package org.redisson.client.protocol.decoder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.protocol.pubsub.MultiDecoder; + +import io.netty.buffer.ByteBuf; + +public class ObjectMapReplayDecoder implements MultiDecoder> { + + @Override + public Object decode(ByteBuf buf) { + throw new UnsupportedOperationException(); + } + + @Override + public Map decode(List parts) { + Map result = new HashMap(parts.size()/2); + for (int i = 0; i < parts.size(); i++) { + if (i % 2 != 0) { + result.put(parts.get(i-1).toString(), parts.get(i).toString()); + } + } + return result; + } + + @Override + public boolean isApplicable(int paramNum) { + return false; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringListObjectReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringListObjectReplayDecoder.java deleted file mode 100644 index 3989a427b..000000000 --- a/src/main/java/org/redisson/client/protocol/decoder/StringListObjectReplayDecoder.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.redisson.client.protocol.decoder; - -import java.util.List; - -import org.redisson.client.protocol.pubsub.MultiDecoder; - -import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; - -public class StringListObjectReplayDecoder implements MultiDecoder { - - @Override - public Object decode(ByteBuf buf) { - return buf.toString(CharsetUtil.UTF_8); - } - - @Override - public Object decode(List parts) { - return parts; - } - -} diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java index 5f0941a72..bcaed6f46 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java @@ -20,4 +20,9 @@ public class StringListReplayDecoder implements MultiDecoder> { return Arrays.asList(Arrays.copyOf(parts.toArray(), parts.size(), String[].class)); } + @Override + public boolean isApplicable(int paramNum) { + return true; + } + } diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java index f99641b43..8058d868d 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java @@ -40,4 +40,9 @@ public class StringMapReplayDecoder implements MultiDecoder { + + @Override + public String decode(ByteBuf buf) { + String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); + buf.skipBytes(2); + return status; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/pubsub/MultiDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/MultiDecoder.java index 0c095f26a..2b5900316 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/MultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/MultiDecoder.java @@ -21,6 +21,8 @@ import org.redisson.client.protocol.Decoder; public interface MultiDecoder extends Decoder { + boolean isApplicable(int paramNum); + T decode(List parts); } diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessage.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessage.java index 2ac65e850..7912747ae 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessage.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessage.java @@ -15,12 +15,12 @@ */ package org.redisson.client.protocol.pubsub; -public class PubSubMessage { +public class PubSubMessage { private final String channel; - private final Object value; + private final V value; - public PubSubMessage(String channel, Object value) { + public PubSubMessage(String channel, V value) { super(); this.channel = channel; this.value = value; @@ -30,13 +30,13 @@ public class PubSubMessage { return channel; } - public Object getValue() { + public V getValue() { return value; } @Override public String toString() { - return "PubSubMessage [channel=" + channel + ", value=" + value + "]"; + return "Message [channel=" + channel + ", value=" + value + "]"; } } diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java index 628efe051..03eabe831 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubMessageDecoder.java @@ -34,4 +34,9 @@ public class PubSubMessageDecoder implements MultiDecoder { return new PubSubMessage(parts.get(1).toString(), parts.get(2).toString()); } + @Override + public boolean isApplicable(int paramNum) { + return true; + } + } diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java index f3143be9c..4440c5456 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubPatternMessageDecoder.java @@ -34,4 +34,9 @@ public class PubSubPatternMessageDecoder implements MultiDecoder { return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3)); } + @Override + public boolean isApplicable(int paramNum) { + return true; + } + } diff --git a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java index f5a8df92b..b700a6806 100644 --- a/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java +++ b/src/main/java/org/redisson/client/protocol/pubsub/PubSubStatusDecoder.java @@ -39,4 +39,9 @@ public class PubSubStatusDecoder implements MultiDecoder { return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), channels); } + @Override + public boolean isApplicable(int paramNum) { + return true; + } + }