new commands

pull/243/head
Nikita 10 years ago
parent 8082f9f8fa
commit a0522a7b0c

@ -20,7 +20,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;

@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.util.concurrent.Promise;

@ -28,7 +28,7 @@ import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.handler.RedisCommandsQueue.QueueCommands;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
@ -62,7 +62,11 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
System.out.println("message " + in.writerIndex() + "-" + in.readerIndex() + " in: " + in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
decode(in, data, null, pubSubConnection, currentDecoder);
try {
decode(in, data, null, pubSubConnection, currentDecoder);
} catch (Exception e) {
data.getPromise().setFailure(e);
}
ctx.channel().attr(RedisCommandsQueue.REPLAY).remove();
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
@ -103,7 +107,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
decode(in, data, respParts, pubSubConnection, currentDecoder);
}
Object result = messageDecoder(data, respParts).decode(respParts);
Object result = messageDecoder(data, respParts).get().decode(respParts);
handleMultiResult(data, parts, pubSubConnection, result);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);

@ -46,10 +46,17 @@ public class RedisEncoder extends MessageToByteEncoder<RedisData<Object, Object>
int i = 1;
for (Object param : msg.getParams()) {
Encoder encoder = msg.getCommand().getParamsEncoder();
if (msg.getCommand().getInParamIndex() == i && msg.getCommand().getInParamType() == ValueType.OBJECT) {
encoder = msg.getCodec().getValueEncoder();
} else if (msg.getCommand().getInParamIndex() <= i && msg.getCommand().getInParamType() != ValueType.OBJECT) {
encoder = encoder(msg, i - msg.getCommand().getInParamIndex());
if (msg.getCommand().getInParamType().size() == 1) {
if (msg.getCommand().getInParamIndex() == i && msg.getCommand().getInParamType().get(0) == ValueType.OBJECT) {
encoder = msg.getCodec().getValueEncoder();
} else if (msg.getCommand().getInParamIndex() <= i && msg.getCommand().getInParamType().get(0) != ValueType.OBJECT) {
encoder = encoder(msg, i - msg.getCommand().getInParamIndex());
}
} else {
int paramNum = i - msg.getCommand().getInParamIndex();
if (msg.getCommand().getInParamIndex() <= i) {
encoder = encoder(msg, paramNum);
}
}
writeArgument(out, encoder.encode(i, param));
@ -62,17 +69,21 @@ public class RedisEncoder extends MessageToByteEncoder<RedisData<Object, Object>
}
private Encoder encoder(RedisData<Object, Object> msg, int param) {
if (msg.getCommand().getInParamType() == ValueType.MAP) {
int typeIndex = 0;
if (msg.getCommand().getInParamType().size() > 1) {
typeIndex = param;
}
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP) {
if (param % 2 != 0) {
return msg.getCodec().getMapValueEncoder();
} else {
return msg.getCodec().getMapKeyEncoder();
}
}
if (msg.getCommand().getInParamType() == ValueType.MAP_KEY) {
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_KEY) {
return msg.getCodec().getMapKeyEncoder();
}
if (msg.getCommand().getInParamType() == ValueType.MAP_KEY) {
if (msg.getCommand().getInParamType().get(typeIndex) == ValueType.MAP_VALUE) {
return msg.getCodec().getMapValueEncoder();
}
throw new IllegalStateException();

@ -15,14 +15,17 @@
*/
package org.redisson.client.protocol;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import java.util.Arrays;
import java.util.List;
import org.redisson.client.protocol.decoder.MultiDecoder;
public class RedisCommand<R> {
public enum ValueType {OBJECT, MAP_VALUE, MAP_KEY, MAP}
private ValueType outParamType = ValueType.OBJECT;
private ValueType inParamType = ValueType.OBJECT;
private List<ValueType> inParamType = Arrays.asList(ValueType.OBJECT);
private final int inParamIndex;
private final String name;
@ -42,6 +45,17 @@ public class RedisCommand<R> {
this.outParamType = outParamType;
}
public RedisCommand(String name, int objectParamIndex, ValueType inParamType) {
this(name, null, null, null, objectParamIndex);
this.inParamType = Arrays.asList(inParamType);
}
public RedisCommand(String name, ValueType inParamType, ValueType outParamType) {
this(name, (String)null);
this.inParamType = Arrays.asList(inParamType);
this.outParamType = outParamType;
}
public RedisCommand(String name, String subName) {
this(name, subName, null, null, -1);
}
@ -56,15 +70,31 @@ public class RedisCommand<R> {
public RedisCommand(String name, int encodeParamIndex, ValueType inParamType, ValueType outParamType) {
this(name, null, null, null, encodeParamIndex);
this.inParamType = Arrays.asList(inParamType);
this.outParamType = outParamType;
}
public RedisCommand(String name, int encodeParamIndex, List<ValueType> inParamType, ValueType outParamType) {
this(name, null, null, null, encodeParamIndex);
this.inParamType = inParamType;
this.outParamType = outParamType;
}
public RedisCommand(String name, Decoder<R> reponseDecoder, int encodeParamIndex, List<ValueType> inParamType, ValueType outParamType) {
this(name, null, null, reponseDecoder, encodeParamIndex);
this.inParamType = inParamType;
this.outParamType = outParamType;
}
public RedisCommand(String name, Decoder<R> reponseDecoder, int encodeParamIndex, List<ValueType> inParamType) {
this(name, null, null, reponseDecoder, encodeParamIndex);
this.inParamType = inParamType;
}
public RedisCommand(String name, Convertor<R> convertor, int encodeParamIndex, ValueType inParamType) {
this(name, null, null, null, encodeParamIndex);
this.convertor = convertor;
this.inParamType = inParamType;
this.inParamType = Arrays.asList(inParamType);
}
public RedisCommand(String name, Convertor<R> convertor, int encodeParamIndex) {
@ -78,7 +108,7 @@ public class RedisCommand<R> {
public RedisCommand(String name, Decoder<R> reponseDecoder, int objectParamIndex, ValueType inParamType) {
this(name, null, null, reponseDecoder, objectParamIndex);
this.inParamType = inParamType;
this.inParamType = Arrays.asList(inParamType);
}
public RedisCommand(String name, Decoder<R> reponseDecoder, int objectParamIndex) {
@ -93,7 +123,7 @@ public class RedisCommand<R> {
public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, int objectParamIndex, ValueType inParamType, ValueType outParamType) {
this(name, replayMultiDecoder, objectParamIndex);
this.outParamType = outParamType;
this.inParamType = inParamType;
this.inParamType = Arrays.asList(inParamType);
}
public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder) {
@ -146,7 +176,7 @@ public class RedisCommand<R> {
return paramsEncoder;
}
public ValueType getInParamType() {
public List<ValueType> getInParamType() {
return inParamType;
}

@ -19,8 +19,9 @@ import java.util.List;
import java.util.Map;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.BooleanReplayDecoder;
import org.redisson.client.protocol.decoder.BooleanStatusReplayDecoder;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
@ -30,6 +31,8 @@ import org.redisson.client.protocol.decoder.StringReplayDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import com.lambdaworks.redis.output.MapScanResult;
public interface RedisCommands {
RedisCommand<Object> LPOP = new RedisCommand<Object>("LPOP");
@ -50,7 +53,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> EVAL_INTEGER = new RedisStrictCommand<Long>("EVAL");
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder());
RedisCommand<Object> EVAL_OBJECT = new RedisCommand<Object>("EVAL");
RedisCommand<Object> EVAL_MAP_VALUE = new RedisCommand<Object>("EVAL", ValueType.MAP_VALUE);
RedisCommand<Object> EVAL_MAP_VALUE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
RedisStrictCommand<Long> INCR = new RedisStrictCommand<Long>("INCR");
RedisStrictCommand<Long> INCRBY = new RedisStrictCommand<Long>("INCRBY");
@ -64,24 +67,27 @@ public interface RedisCommands {
RedisStrictCommand<List<String>> KEYS = new RedisStrictCommand<List<String>>("KEYS", new StringListReplayDecoder());
RedisStrictCommand<String> HINCRBYFLOAT = new RedisStrictCommand<String>("HINCRBYFLOAT");
RedisCommand<MapScanResult<Object, Object>> HSCAN = new RedisCommand<MapScanResult<Object, Object>>("HSCAN", new MapScanResultReplayDecoder(), ValueType.MAP);
RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP);
RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder(), ValueType.MAP_VALUE);
RedisCommand<Boolean> HEXISTS = new RedisCommand<Boolean>("HEXISTS", new BooleanReplayConvertor(), 1, ValueType.MAP_KEY);
RedisCommand<Boolean> HEXISTS = new RedisCommand<Boolean>("HEXISTS", new BooleanReplayConvertor(), 2, ValueType.MAP_KEY);
RedisStrictCommand<Long> HLEN = new RedisStrictCommand<Long>("HLEN");
RedisCommand<List<Object>> HKEYS = new RedisCommand<List<Object>>("HKEYS", new ObjectListReplayDecoder(), ValueType.MAP_KEY);
RedisCommand<String> HMSET = new RedisCommand<String>("HMSET", new StringReplayDecoder(), 1, ValueType.MAP);
RedisCommand<List<Object>> HMGET = new RedisCommand<List<Object>>("HMGET", new ObjectListReplayDecoder(), 1, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Object> HGET = new RedisCommand<Object>("HGET", 1, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<List<Object>> HMGET = new RedisCommand<List<Object>>("HMGET", new ObjectListReplayDecoder(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Object> HGET = new RedisCommand<Object>("HGET", 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Long> HDEL = new RedisStrictCommand<Long>("HDEL", 2, ValueType.MAP_KEY);
RedisStrictCommand<Boolean> DEL_BOOLEAN = new RedisStrictCommand<Boolean>("DEL", new BooleanReplayConvertor());
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisCommand<String> SET = new RedisCommand<String>("SET", new StringReplayDecoder(), 1);
RedisCommand<String> SET = new RedisCommand<String>("SET", new StringReplayDecoder(), 2);
RedisCommand<String> SETEX = new RedisCommand<String>("SETEX", new StringReplayDecoder(), 2);
RedisStrictCommand<Boolean> EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> RENAMENX = new RedisStrictCommand<Boolean>("RENAMENX", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> RENAME = new RedisStrictCommand<Boolean>("RENAME", new BooleanReplayDecoder());
RedisStrictCommand<Boolean> RENAME = new RedisStrictCommand<Boolean>("RENAME", new BooleanStatusReplayDecoder());
RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 1);

@ -15,10 +15,14 @@
*/
package org.redisson.client.protocol;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
public class RedisStrictCommand<T> extends RedisCommand<T> {
public RedisStrictCommand(String name, int objectParamIndex, ValueType inParamType) {
super(name, (Decoder<T>)null, objectParamIndex, inParamType);
}
public RedisStrictCommand(String name, MultiDecoder<T> replayMultiDecoder) {
super(name, replayMultiDecoder);
}

@ -0,0 +1,18 @@
package org.redisson.client.protocol.decoder;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class BooleanReplayDecoder2 implements Decoder<Boolean> {
@Override
public Boolean decode(ByteBuf buf) {
if (buf == null) {
return false;
}
return "OK".equals(buf.toString(CharsetUtil.UTF_8));
}
}

@ -5,7 +5,7 @@ import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class BooleanReplayDecoder implements Decoder<Boolean> {
public class BooleanStatusReplayDecoder implements Decoder<Boolean> {
@Override
public Boolean decode(ByteBuf buf) {

@ -2,13 +2,15 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class KeyValueObjectDecoder implements MultiDecoder<Object> {
public MultiDecoder<Object> get() {
return (MultiDecoder<Object>) this;
}
@Override
public Object decode(ByteBuf buf) {
String status = buf.toString(CharsetUtil.UTF_8);

@ -0,0 +1,18 @@
package org.redisson.client.protocol.decoder;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class LongReplayDecoder implements Decoder<Long> {
@Override
public Long decode(ByteBuf buf) {
if (buf == null) {
return 0L;
}
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
}

@ -0,0 +1,41 @@
package org.redisson.client.protocol.decoder;
import java.util.List;
import java.util.Map;
import com.lambdaworks.redis.output.MapScanResult;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class MapScanResultReplayDecoder implements MultiDecoder<MapScanResult<Object, Object>> {
ThreadLocal<MultiDecoder<?>> currentMultiDecoder = new ThreadLocal<MultiDecoder<?>>();
ThreadLocal<Boolean> posParsed = new ThreadLocal<Boolean>();
ObjectMapReplayDecoder nextDecoder = new ObjectMapReplayDecoder();
public MultiDecoder<?> get() {
if (currentMultiDecoder.get() == null) {
currentMultiDecoder.set(nextDecoder);
return nextDecoder;
}
return (MultiDecoder<?>) this;
}
@Override
public Object decode(ByteBuf buf) {
posParsed.set(true);
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
@Override
public MapScanResult<Object, Object> decode(List<Object> parts) {
return new MapScanResult<Object, Object>((Long)parts.get(0), (Map<Object, Object>)parts.get(1));
}
@Override
public boolean isApplicable(int paramNum) {
return paramNum == 0 && posParsed.get() == null;
}
}

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.pubsub;
package org.redisson.client.protocol.decoder;
import java.util.List;
@ -21,6 +21,8 @@ import org.redisson.client.protocol.Decoder;
public interface MultiDecoder<T> extends Decoder<Object> {
MultiDecoder<?> get();
boolean isApplicable(int paramNum);
T decode(List<Object> parts);

@ -2,8 +2,6 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import io.netty.buffer.ByteBuf;
public class ObjectListReplayDecoder implements MultiDecoder<List<Object>> {
@ -23,4 +21,9 @@ public class ObjectListReplayDecoder implements MultiDecoder<List<Object>> {
return false;
}
@Override
public MultiDecoder<?> get() {
return this;
}
}

@ -4,12 +4,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import io.netty.buffer.ByteBuf;
public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
@Override
public MultiDecoder<?> get() {
return this;
}
@Override
public Object decode(ByteBuf buf) {
throw new UnsupportedOperationException();
@ -20,7 +23,7 @@ public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>>
Map<Object, Object> result = new HashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
result.put(parts.get(i-1).toString(), parts.get(i).toString());
result.put(parts.get(i-1), parts.get(i));
}
}
return result;

@ -3,13 +3,16 @@ package org.redisson.client.protocol.decoder;
import java.util.Arrays;
import java.util.List;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringListReplayDecoder implements MultiDecoder<List<String>> {
@Override
public MultiDecoder<?> get() {
return this;
}
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);

@ -6,13 +6,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringMapReplayDecoder implements MultiDecoder<List<Map<String, String>>> {
@Override
public MultiDecoder<?> get() {
return this;
}
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);
@ -20,6 +23,7 @@ public class StringMapReplayDecoder implements MultiDecoder<List<Map<String, Str
@Override
public List<Map<String, String>> decode(List<Object> parts) {
// TODO refactor
if (!parts.isEmpty()) {
if (parts.get(0) instanceof List) {
List<Map<String, String>> result = new ArrayList<Map<String, String>>(parts.size());

@ -17,6 +17,8 @@ package org.redisson.client.protocol.pubsub;
import java.util.List;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -39,4 +41,9 @@ public class PubSubMessageDecoder implements MultiDecoder<Object> {
return true;
}
@Override
public MultiDecoder<?> get() {
return this;
}
}

@ -17,6 +17,8 @@ package org.redisson.client.protocol.pubsub;
import java.util.List;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -39,4 +41,9 @@ public class PubSubPatternMessageDecoder implements MultiDecoder<Object> {
return true;
}
@Override
public MultiDecoder<?> get() {
return this;
}
}

@ -18,6 +18,8 @@ package org.redisson.client.protocol.pubsub;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -44,4 +46,9 @@ public class PubSubStatusDecoder implements MultiDecoder<PubSubStatusMessage> {
return true;
}
@Override
public MultiDecoder<?> get() {
return this;
}
}

Loading…
Cancel
Save