Commands refactoring

pull/243/head
Nikita 10 years ago
parent 7a35bdd31e
commit 61d323c418

@ -105,15 +105,15 @@ public class RedisClient {
RedisConnection rc = c.connect();
RedisPubSubConnection rpsc = c.connectPubSub();
// String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
// System.out.println("res 12: " + res1);
// String res2 = rc.sync(RedisCommands.CLIENT_GETNAME);
// System.out.println("res name: " + res2);
String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
System.out.println("res 12: " + res1);
String res2 = rc.sync(RedisCommands.CLIENT_GETNAME);
System.out.println("res name: " + res2);
// Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33");
// System.out.println("res name 2: " + res3);
Future<Long> m = rpsc.publish("sss", "123");
System.out.println("out: " + m.get());
Long m = rc.sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "123");
System.out.println("out: " + m);
Future<PubSubStatusMessage> m1 = rpsc.psubscribe("ss*");
System.out.println("out: " + m1.get());
rpsc.addListener(new RedisPubSubListener<String>() {
@ -133,7 +133,7 @@ public class RedisClient {
final RedisClient c2 = new RedisClient("127.0.0.1", 6379);
Long res = c2.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
// System.out.println("published: " + res);
System.out.println("published: " + res);

@ -6,7 +6,7 @@ import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStringCommand;
import org.redisson.client.protocol.RedisStrictCommand;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -56,17 +56,11 @@ public class RedisConnection implements RedisCommands {
throw new RedisException("Unexpected exception while processing command", future.cause());
}
public String sync(RedisStringCommand command, Object ... params) {
Future<String> r = async(command, params);
public <T> T sync(RedisStrictCommand<T> command, Object ... params) {
Future<T> r = async(null, command, params);
return await(r);
}
public Future<String> async(RedisStringCommand command, Object ... params) {
Promise<String> promise = redisClient.getBootstrap().group().next().<String>newPromise();
channel.writeAndFlush(new RedisData<String, String>(promise, command.getCodec(), command, params));
return promise;
}
public <T, R> void send(RedisData<T, R> data) {
channel.writeAndFlush(data);
}

@ -5,6 +5,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.MultiDecoder;
import org.redisson.client.protocol.PubSubStatusMessage;
import org.redisson.client.protocol.PubSubStatusDecoder;
import org.redisson.client.protocol.PubSubMessage;
@ -54,30 +55,26 @@ public class RedisPubSubConnection {
}
public Future<PubSubStatusMessage> subscribe(String ... channel) {
return async(new PubSubStatusDecoder(), new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel);
return async(new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel);
}
public Future<PubSubStatusMessage> psubscribe(String ... channel) {
return async(new PubSubStatusDecoder(), new PubSubPatternMessageDecoder(), RedisCommands.PSUBSCRIBE, channel);
return async(new PubSubPatternMessageDecoder(), RedisCommands.PSUBSCRIBE, channel);
}
public Future<PubSubStatusMessage> unsubscribe(String ... channel) {
return async(new PubSubStatusDecoder(), RedisCommands.SUBSCRIBE, channel);
return async(null, RedisCommands.UNSUBSCRIBE, channel);
}
public Future<Long> publish(String channel, String msg) {
return async(new StringCodec(), RedisCommands.PUBLISH, channel, msg);
}
public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
return promise;
}
// public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
// Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
// channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
// return promise;
// }
public <T, R> Future<R> async(Codec encoder, Decoder<Object> nextDecoder, RedisCommand<T> command, Object ... params) {
public <T, R> Future<R> async(MultiDecoder<Object> nextDecoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, nextDecoder, encoder, command, params));
channel.writeAndFlush(new RedisData<T, R>(promise, nextDecoder, null, command, params));
return promise;
}

@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.MultiDecoder;
import org.redisson.client.protocol.RedisCommand;
import io.netty.util.concurrent.Promise;
@ -30,13 +31,13 @@ public class RedisData<T, R> {
final Object[] params;
final Codec codec;
final AtomicBoolean sended = new AtomicBoolean();
final Decoder<Object> nextDecoder;
final MultiDecoder<Object> nextDecoder;
public RedisData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
this(promise, null, codec, command, params);
}
public RedisData(Promise<R> promise, Decoder<Object> nextDecoder, Codec codec, RedisCommand<T> command, Object[] params) {
public RedisData(Promise<R> promise, MultiDecoder<Object> nextDecoder, Codec codec, RedisCommand<T> command, Object[] params) {
this.promise = promise;
this.command = command;
this.params = params;
@ -52,7 +53,7 @@ public class RedisData<T, R> {
return params;
}
public Decoder<Object> getNextDecoder() {
public MultiDecoder<Object> getNextDecoder() {
return nextDecoder;
}

@ -24,6 +24,7 @@ import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.handler.RedisCommandsQueue.QueueCommands;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.MultiDecoder;
import org.redisson.client.protocol.PubSubMessage;
import org.redisson.client.protocol.PubSubPatternMessage;
@ -38,7 +39,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
public static final char LF = '\n';
private static final char ZERO = '0';
private Decoder<Object> nextDecoder;
private MultiDecoder<Object> nextDecoder;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
@ -54,33 +55,19 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
int code = in.readByte();
// System.out.println("trying decode -- " + (char)code);
if (code == '+') {
Object result = data.getCommand().getReponseDecoder().decode(in);
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
Object result = data.getCommand().getReplayDecoder().decode(in);
handleResult(data, parts, result);
} else if (code == '-') {
Object result = data.getCommand().getReponseDecoder().decode(in);
Object result = data.getCommand().getReplayDecoder().decode(in);
data.getPromise().setFailure(new RedisException(result.toString()));
} else if (code == ':') {
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
Object result = Long.valueOf(status);
result = data.getCommand().getConvertor().convert(result);
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
handleResult(data, parts, result);
} else if (code == '$') {
Object result = decoder(data).decode(readBytes(in));
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
Object result = decoder(data, parts != null).decode(readBytes(in));
handleResult(data, parts, result);
} else if (code == '*') {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>();
@ -88,7 +75,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
decode(in, data, respParts, pubSubConnection);
}
Object result = decoder(data).decode(respParts);
Object result = ((MultiDecoder<Object>)decoder(data, true)).decode(respParts);
if (data != null) {
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) {
nextDecoder = data.getNextDecoder();
@ -106,11 +93,25 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
}
}
private Decoder<Object> decoder(RedisData<Object, Object> data) {
private void handleResult(RedisData<Object, Object> data, List<Object> parts, Object result) {
if (data != null) {
result = data.getCommand().getConvertor().convert(result);
}
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
}
private Decoder<Object> decoder(RedisData<Object, Object> data, boolean isMulti) {
if (data == null) {
return nextDecoder;
}
Decoder<Object> decoder = data.getCommand().getReponseDecoder();
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (isMulti) {
decoder = data.getCommand().getReplayMultiDecoder();
}
if (decoder == null) {
decoder = data.getCodec();
}

@ -20,7 +20,6 @@ import java.util.Arrays;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
public class RedisEncoder extends MessageToByteEncoder<RedisData<Object, Object>> {
@ -44,8 +43,10 @@ public class RedisEncoder extends MessageToByteEncoder<RedisData<Object, Object>
}
int i = 1;
for (Object param : msg.getParams()) {
if (Arrays.binarySearch(msg.getCommand().getEncodeParamIndexes(), i) != -1) {
if (Arrays.binarySearch(msg.getCommand().getObjectParamIndexes(), i) != -1) {
writeArgument(out, msg.getCodec().encode(i, param));
} else {
writeArgument(out, msg.getCommand().getParamsEncoder().encode(i, param));
}
i++;
}

@ -19,7 +19,7 @@ public class BooleanReplayConvertor implements Convertor<Boolean> {
@Override
public Boolean convert(Object obj) {
return "1".equals(obj);
return Long.valueOf(1).equals(obj);
}

@ -15,14 +15,10 @@
*/
package org.redisson.client.protocol;
import java.util.List;
import io.netty.buffer.ByteBuf;
public interface Decoder<R> {
R decode(ByteBuf buf);
R decode(List<Object> parts);
}

@ -0,0 +1,9 @@
package org.redisson.client.protocol;
import java.util.List;
public interface MultiDecoder<T> extends Decoder<Object> {
T decode(List<Object> parts);
}

@ -5,7 +5,7 @@ import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubMessageDecoder implements Decoder<Object> {
public class PubSubMessageDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf) {
@ -15,7 +15,7 @@ public class PubSubMessageDecoder implements Decoder<Object> {
}
@Override
public Object decode(List<Object> parts) {
public PubSubMessage decode(List<Object> parts) {
return new PubSubMessage(parts.get(1).toString(), parts.get(2).toString());
}

@ -5,7 +5,7 @@ import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubPatternMessageDecoder implements Decoder<Object> {
public class PubSubPatternMessageDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf) {
@ -15,7 +15,7 @@ public class PubSubPatternMessageDecoder implements Decoder<Object> {
}
@Override
public Object decode(List<Object> parts) {
public PubSubPatternMessage decode(List<Object> parts) {
return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3));
}

@ -15,17 +15,16 @@
*/
package org.redisson.client.protocol;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class PubSubStatusDecoder implements Codec {
public class PubSubStatusDecoder implements MultiDecoder<PubSubStatusMessage> {
@Override
public String decode(ByteBuf buf) {
public Object decode(ByteBuf buf) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
@ -40,13 +39,4 @@ public class PubSubStatusDecoder implements Codec {
return new PubSubStatusMessage(PubSubStatusMessage.Type.valueOf(parts.get(0).toString().toUpperCase()), channels);
}
@Override
public byte[] encode(int paramIndex, Object in) {
try {
return in.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}
}

@ -19,8 +19,11 @@ public class RedisCommand<R> {
private final String name;
private final String subName;
private final int[] encodeParamIndexes;
private Decoder<R> reponseDecoder;
private final int[] objectParamIndexes;
private Encoder paramsEncoder = new StringParamsEncoder();
private MultiDecoder<R> replayMultiDecoder;
private Decoder<R> replayDecoder;
private Convertor<R> convertor = new EmptyConvertor<R>();
public RedisCommand(String name, String subName, int ... encodeParamIndexes) {
@ -34,7 +37,7 @@ public class RedisCommand<R> {
public RedisCommand(String name, Convertor<R> convertor, int ... encodeParamIndexes) {
this.name = name;
this.subName = null;
this.encodeParamIndexes = encodeParamIndexes;
this.objectParamIndexes = encodeParamIndexes;
this.convertor = convertor;
}
@ -42,12 +45,19 @@ public class RedisCommand<R> {
this(name, null, reponseDecoder, encodeParamIndexes);
}
public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, int ... encodeParamIndexes) {
this.name = name;
this.subName = null;
this.objectParamIndexes = encodeParamIndexes;
this.replayMultiDecoder = replayMultiDecoder;
}
public RedisCommand(String name, String subName, Decoder<R> reponseDecoder, int ... encodeParamIndexes) {
super();
this.name = name;
this.subName = subName;
this.reponseDecoder = reponseDecoder;
this.encodeParamIndexes = encodeParamIndexes;
this.replayDecoder = reponseDecoder;
this.objectParamIndexes = encodeParamIndexes;
}
public String getSubName() {
@ -58,16 +68,24 @@ public class RedisCommand<R> {
return name;
}
public Decoder<R> getReponseDecoder() {
return reponseDecoder;
public Decoder<R> getReplayDecoder() {
return replayDecoder;
}
public int[] getEncodeParamIndexes() {
return encodeParamIndexes;
public int[] getObjectParamIndexes() {
return objectParamIndexes;
}
public MultiDecoder<R> getReplayMultiDecoder() {
return replayMultiDecoder;
}
public Convertor<R> getConvertor() {
return convertor;
}
public Encoder getParamsEncoder() {
return paramsEncoder;
}
}

@ -17,20 +17,21 @@ package org.redisson.client.protocol;
public interface RedisCommands {
RedisStringCommand AUTH = new RedisStringCommand("AUTH", new StringReplayDecoder());
RedisStringCommand SELECT = new RedisStringCommand("SELECT", new StringReplayDecoder());
RedisStringCommand CLIENT_SETNAME = new RedisStringCommand("CLIENT", "SETNAME", new StringReplayDecoder(), 1);
RedisStringCommand CLIENT_GETNAME = new RedisStringCommand("CLIENT", "GETNAME");
RedisStrictCommand<String> AUTH = new RedisStrictCommand<String>("AUTH", new StringReplayDecoder());
RedisStrictCommand<String> SELECT = new RedisStrictCommand<String>("SELECT", new StringReplayDecoder());
RedisStrictCommand<String> CLIENT_SETNAME = new RedisStrictCommand<String>("CLIENT", "SETNAME", new StringReplayDecoder());
RedisStrictCommand<String> CLIENT_GETNAME = new RedisStrictCommand<String>("CLIENT", "GETNAME", new StringDataDecoder());
RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisCommand<String> SET = new RedisCommand<String>("SET", new StringReplayDecoder(), 1);
RedisCommand<String> SETEX = new RedisCommand<String>("SETEX", new StringReplayDecoder(), 2);
RedisCommand<Boolean> EXISTS = new RedisCommand<Boolean>("EXISTS", new BooleanReplayConvertor(), 1);
RedisStrictCommand<Boolean> EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanReplayConvertor());
RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 1);
RedisCommand<PubSubStatusMessage> SUBSCRIBE = new RedisCommand<PubSubStatusMessage>("SUBSCRIBE", 1);
RedisCommand<PubSubStatusMessage> UNSUBSCRIBE = new RedisCommand<PubSubStatusMessage>("UNSUBSCRIBE", 1);
RedisCommand<PubSubStatusMessage> PSUBSCRIBE = new RedisCommand<PubSubStatusMessage>("PSUBSCRIBE", 1);
RedisStrictCommand<PubSubStatusMessage> SUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("SUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<PubSubStatusMessage> UNSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("UNSUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<PubSubStatusMessage> PSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("PSUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<PubSubStatusMessage> PUNSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("PUNSUBSCRIBE", new PubSubStatusDecoder());
}

@ -0,0 +1,30 @@
package org.redisson.client.protocol;
public class RedisStrictCommand<T> extends RedisCommand<T> {
public RedisStrictCommand(String name, MultiDecoder<T> replayMultiDecoder, int... encodeParamIndexes) {
super(name, replayMultiDecoder, encodeParamIndexes);
}
public RedisStrictCommand(String name, int... encodeParamIndexes) {
super(name, encodeParamIndexes);
}
public RedisStrictCommand(String name, Convertor<T> convertor, int ... encodeParamIndexes) {
super(name, convertor, encodeParamIndexes);
}
public RedisStrictCommand(String name, String subName, Decoder<T> reponseDecoder,
int... encodeParamIndexes) {
super(name, subName, reponseDecoder, encodeParamIndexes);
}
public RedisStrictCommand(String name, String subName, int... encodeParamIndexes) {
super(name, subName, encodeParamIndexes);
}
public RedisStrictCommand(String name, Decoder<T> reponseDecoder, int... encodeParamIndexes) {
super(name, reponseDecoder, encodeParamIndexes);
}
}

@ -1,28 +0,0 @@
package org.redisson.client.protocol;
public class RedisStringCommand extends RedisCommand<String> {
private Codec codec = new StringCodec();
public RedisStringCommand(String name, int... encodeParamIndexes) {
super(name, encodeParamIndexes);
}
public RedisStringCommand(String name, String subName, Decoder<String> reponseDecoder,
int... encodeParamIndexes) {
super(name, subName, reponseDecoder, encodeParamIndexes);
}
public RedisStringCommand(String name, String subName, int... encodeParamIndexes) {
super(name, subName, encodeParamIndexes);
}
public RedisStringCommand(String name, Decoder<String> reponseDecoder, int... encodeParamIndexes) {
super(name, reponseDecoder, encodeParamIndexes);
}
public Codec getCodec() {
return codec;
}
}

@ -1,7 +1,6 @@
package org.redisson.client.protocol;
import java.io.UnsupportedEncodingException;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -22,9 +21,4 @@ public class StringCodec implements Codec {
return buf.toString(CharsetUtil.UTF_8);
}
@Override
public Object decode(List<Object> parts) {
throw new IllegalStateException();
}
}

@ -0,0 +1,28 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringDataDecoder implements Decoder<String> {
@Override
public String decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);
}
}

@ -0,0 +1,16 @@
package org.redisson.client.protocol;
import java.io.UnsupportedEncodingException;
public class StringParamsEncoder implements Encoder {
@Override
public byte[] encode(int paramIndex, Object in) {
try {
return in.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}
}

@ -15,8 +15,6 @@
*/
package org.redisson.client.protocol;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -29,9 +27,4 @@ public class StringReplayDecoder implements Decoder<String> {
return status;
}
@Override
public String decode(List<Object> parts) {
throw new IllegalStateException();
}
}

Loading…
Cancel
Save