PubSub decoding improvements

pull/243/head
Nikita 10 years ago
parent 4783371ba4
commit 0e7c7c7edc

@ -16,6 +16,8 @@
package org.redisson.client; package org.redisson.client;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import org.redisson.client.handler.RedisCommandsQueue; import org.redisson.client.handler.RedisCommandsQueue;
@ -83,17 +85,29 @@ public class RedisClient {
} }
public RedisConnection connect() { public RedisConnection connect() {
try {
ChannelFuture future = bootstrap.connect(); ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly(); future.syncUninterruptibly();
channels.add(future.channel()); channels.add(future.channel());
return new RedisConnection(this, future.channel()); return new RedisConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
}
} }
public RedisPubSubConnection connectPubSub() { public RedisPubSubConnection connectPubSub() {
try {
ChannelFuture future = bootstrap.connect(); ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly(); future.syncUninterruptibly();
channels.add(future.channel()); channels.add(future.channel());
return new RedisPubSubConnection(this, future.channel()); return new RedisPubSubConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
}
}
public void shutdown() {
shutdownAsync().syncUninterruptibly();
} }
public ChannelGroupFuture shutdownAsync() { public ChannelGroupFuture shutdownAsync() {
@ -102,8 +116,23 @@ public class RedisClient {
public static void main(String[] args) throws InterruptedException, ExecutionException { public static void main(String[] args) throws InterruptedException, ExecutionException {
final RedisClient c = new RedisClient("127.0.0.1", 6379); final RedisClient c = new RedisClient("127.0.0.1", 6379);
RedisConnection rc = c.connect(); Object r = c.connect().sync(new StringCodec(), RedisCommands.GET, "test1");
RedisPubSubConnection rpsc = c.connectPubSub(); System.out.println(r);
// final RedisClient c = new RedisClient("127.0.0.1", 26379);
// RedisConnection rc = c.connect();
// List<String> res4 = rc.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, "mymaster");
// System.out.println("r: " + res4);
//
// List<Map<String, String>> res5 = rc.sync(RedisCommands.SENTINEL_SLAVES, "mymaster");
// System.out.println("r: " + res5);
/* RedisPubSubConnection rpsc = c.connectPubSub();
rc.sync(new StringCodec(), RedisCommands.HMSET, "test", "1", "2");
rc.sync(new StringCodec(), RedisCommands.HMSET, "test", "2", "3");
List<String> r = rc.sync(new StringCodec(), RedisCommands.HMGET, "test", "1", "2");
String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333"); String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
System.out.println("res 12: " + res1); System.out.println("res 12: " + res1);
@ -144,25 +173,6 @@ public class RedisClient {
Long res3 = c3.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444"); Long res3 = c3.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
System.out.println("published: " + res3); System.out.println("published: " + res3);
*/ }
/* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random());
res.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
// System.out.println("res 1: " + future.getNow());
}
});
Future<String> r = rc.execute(new StringCodec(), RedisCommands.GET, "test");
r.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
System.out.println("res 2: " + future.getNow());
}
});
*/// }
}
} }

@ -59,17 +59,17 @@ public class RedisConnection implements RedisCommands {
return cmd.getNow(); return cmd.getNow();
} }
public <V> V get(Future<V> future) { // public <V> V get(Future<V> future) {
future.awaitUninterruptibly(); // future.awaitUninterruptibly();
if (future.isSuccess()) { // if (future.isSuccess()) {
return future.getNow(); // return future.getNow();
} // }
//
if (future.cause() instanceof RedisException) { // if (future.cause() instanceof RedisException) {
throw (RedisException) future.cause(); // throw (RedisException) future.cause();
} // }
throw new RedisException("Unexpected exception while processing command", future.cause()); // throw new RedisException("Unexpected exception while processing command", future.cause());
} // }
public <T> T sync(RedisStrictCommand<T> command, Object ... params) { public <T> T sync(RedisStrictCommand<T> command, Object ... params) {
Future<T> r = async(null, command, params); Future<T> r = async(null, command, params);

@ -0,0 +1,15 @@
package org.redisson.client;
public class RedisConnectionException extends RedisException {
private static final long serialVersionUID = -4756928186967834601L;
public RedisConnectionException(String msg) {
super(msg);
}
public RedisConnectionException(String msg, Throwable e) {
super(msg, e);
}
}

@ -0,0 +1,17 @@
package org.redisson.client;
public class RedisMovedException extends RedisException {
private static final long serialVersionUID = -6969734163155547631L;
private int slot;
public RedisMovedException(int slot) {
this.slot = slot;
}
public int getSlot() {
return slot;
}
}

@ -18,37 +18,28 @@ package org.redisson.client;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import org.redisson.client.handler.RedisData; import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.pubsub.MultiDecoder; import org.redisson.client.protocol.pubsub.MultiDecoder;
import org.redisson.client.protocol.pubsub.PubSubMessage; import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage; import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
public class RedisPubSubConnection { public class RedisPubSubConnection extends RedisConnection {
public static final AttributeKey<RedisPubSubConnection> CONNECTION = AttributeKey.valueOf("connection"); public static final AttributeKey<RedisPubSubConnection> CONNECTION = AttributeKey.valueOf("connection");
final ConcurrentLinkedQueue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>(); final ConcurrentLinkedQueue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();
final Channel channel;
final RedisClient redisClient;
public RedisPubSubConnection(RedisClient redisClient, Channel channel) { public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
this.redisClient = redisClient; super(redisClient, channel);
this.channel = channel;
channel.attr(CONNECTION).set(this); channel.attr(CONNECTION).set(this);
} }
@ -57,6 +48,10 @@ public class RedisPubSubConnection {
listeners.add(listener); listeners.add(listener);
} }
public void removeListener(RedisPubSubListener listener) {
listeners.remove(listener);
}
public void onMessage(PubSubMessage message) { public void onMessage(PubSubMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) { for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onMessage(message.getChannel(), message.getValue()); redisPubSubListener.onMessage(message.getChannel(), message.getValue());
@ -78,28 +73,17 @@ public class RedisPubSubConnection {
} }
public Future<PubSubStatusMessage> unsubscribe(String ... channel) { public Future<PubSubStatusMessage> unsubscribe(String ... channel) {
return async(null, RedisCommands.UNSUBSCRIBE, channel); return async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel);
} }
public Future<PubSubStatusMessage> punsubscribe(String ... channel) { public Future<PubSubStatusMessage> punsubscribe(String ... channel) {
return async(null, RedisCommands.PUNSUBSCRIBE, channel); return async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel);
} }
// public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(MultiDecoder<Object> messageDecoder, RedisCommand<T> command, Object ... params) {
// Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
// channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
// return promise;
// }
public <T, R> Future<R> async(MultiDecoder<Object> nextDecoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise(); Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, nextDecoder, null, command, params)); channel.writeAndFlush(new RedisData<T, R>(promise, messageDecoder, null, command, params));
return promise; return promise;
} }
public ChannelFuture closeAsync() {
return channel.close();
}
} }

@ -27,7 +27,7 @@ public class RedisCommandsQueue extends ChannelDuplexHandler {
public enum QueueCommands {NEXT_COMMAND} public enum QueueCommands {NEXT_COMMAND}
public static final AttributeKey<RedisData<Object, Object>> REPLAY_PROMISE = AttributeKey.valueOf("promise"); public static final AttributeKey<RedisData<Object, Object>> REPLAY = AttributeKey.valueOf("promise");
private final Queue<RedisData<Object, Object>> queue = PlatformDependent.newMpscQueue(); private final Queue<RedisData<Object, Object>> queue = PlatformDependent.newMpscQueue();
@ -59,7 +59,7 @@ public class RedisCommandsQueue extends ChannelDuplexHandler {
private void sendData(ChannelHandlerContext ctx) throws Exception { private void sendData(ChannelHandlerContext ctx) throws Exception {
RedisData<Object, Object> data = queue.peek(); RedisData<Object, Object> data = queue.peek();
if (data != null && data.getSended().compareAndSet(false, true)) { if (data != null && data.getSended().compareAndSet(false, true)) {
ctx.channel().attr(REPLAY_PROMISE).set(data); ctx.channel().attr(REPLAY).set(data);
ctx.channel().writeAndFlush(data); ctx.channel().writeAndFlush(data);
} }
} }

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.handler.RedisCommandsQueue.QueueCommands; import org.redisson.client.handler.RedisCommandsQueue.QueueCommands;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
@ -45,7 +46,7 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove(); RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY).get();
RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get(); RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get();
Decoder<Object> currentDecoder = null; Decoder<Object> currentDecoder = null;
@ -58,20 +59,34 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
}; };
} }
System.out.println("message " + in.writerIndex() + "-" + in.readerIndex() + " in: " + in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
decode(in, data, null, pubSubConnection, currentDecoder); decode(in, data, null, pubSubConnection, currentDecoder);
ctx.channel().attr(RedisCommandsQueue.REPLAY).remove();
ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
} }
private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts, RedisPubSubConnection pubSubConnection, Decoder<Object> currentDecoder) throws IOException { private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts, RedisPubSubConnection pubSubConnection, Decoder<Object> currentDecoder) throws IOException {
int code = in.readByte(); int code = in.readByte();
// System.out.println("trying decode -- " + (char)code);
if (code == '+') { if (code == '+') {
Object result = data.getCommand().getReplayDecoder().decode(in); Object result = data.getCommand().getReplayDecoder().decode(in);
handleResult(data, parts, result); handleResult(data, parts, result);
} else if (code == '-') { } else if (code == '-') {
Object result = data.getCommand().getReplayDecoder().decode(in); String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
data.getPromise().setFailure(new RedisException(result.toString())); in.skipBytes(2);
if (error.startsWith("MOVED")) {
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[1]);
data.getPromise().setFailure(new RedisMovedException(slot));
} else if (error.startsWith("(error) ASK")) {
String[] errorParts = error.split(" ");
int slot = Integer.valueOf(errorParts[2]);
data.getPromise().setFailure(new RedisMovedException(slot));
} else {
data.getPromise().setFailure(new RedisException(error));
}
} else if (code == ':') { } else if (code == ':') {
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2); in.skipBytes(2);
@ -99,7 +114,12 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
messageDecoders.remove(param.toString()); messageDecoders.remove(param.toString());
} }
} }
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result); data.getPromise().setSuccess(result);
}
} else { } else {
if (result instanceof PubSubMessage) { if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result); pubSubConnection.onMessage((PubSubMessage) result);

@ -29,11 +29,11 @@ public class RedisCommand<R> {
private Convertor<R> convertor = new EmptyConvertor<R>(); private Convertor<R> convertor = new EmptyConvertor<R>();
public RedisCommand(String name, String subName, int ... encodeParamIndexes) { public RedisCommand(String name, String subName, int ... encodeParamIndexes) {
this(name, subName, null, encodeParamIndexes); this(name, subName, null, null, encodeParamIndexes);
} }
public RedisCommand(String name, int ... encodeParamIndexes) { public RedisCommand(String name, int ... encodeParamIndexes) {
this(name, null, null, encodeParamIndexes); this(name, null, null, null, encodeParamIndexes);
} }
public RedisCommand(String name, Convertor<R> convertor, int ... encodeParamIndexes) { public RedisCommand(String name, Convertor<R> convertor, int ... encodeParamIndexes) {
@ -44,7 +44,7 @@ public class RedisCommand<R> {
} }
public RedisCommand(String name, Decoder<R> reponseDecoder, int ... encodeParamIndexes) { public RedisCommand(String name, Decoder<R> reponseDecoder, int ... encodeParamIndexes) {
this(name, null, reponseDecoder, encodeParamIndexes); this(name, null, null, reponseDecoder, encodeParamIndexes);
} }
public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, int ... encodeParamIndexes) { public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, int ... encodeParamIndexes) {
@ -54,10 +54,16 @@ public class RedisCommand<R> {
this.replayMultiDecoder = replayMultiDecoder; this.replayMultiDecoder = replayMultiDecoder;
} }
public RedisCommand(String name, String subName, Decoder<R> reponseDecoder, int ... encodeParamIndexes) { public RedisCommand(String name, String subName, MultiDecoder<R> replayMultiDecoder,
int... encodeParamIndexes) {
this(name, subName, replayMultiDecoder, null, encodeParamIndexes);
}
public RedisCommand(String name, String subName, MultiDecoder<R> replayMultiDecoder, Decoder<R> reponseDecoder, int ... encodeParamIndexes) {
super(); super();
this.name = name; this.name = name;
this.subName = subName; this.subName = subName;
this.replayMultiDecoder = replayMultiDecoder;
this.replayDecoder = reponseDecoder; this.replayDecoder = reponseDecoder;
this.objectParamIndexes = encodeParamIndexes; this.objectParamIndexes = encodeParamIndexes;
} }

@ -15,6 +15,15 @@
*/ */
package org.redisson.client.protocol; package org.redisson.client.protocol;
import java.util.List;
import java.util.Map;
import org.redisson.client.protocol.decoder.BooleanReplayDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListObjectReplayDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapReplayDecoder;
import org.redisson.client.protocol.decoder.StringReplayDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import org.redisson.client.protocol.pubsub.PubSubStatusMessage;
@ -24,12 +33,23 @@ public interface RedisCommands {
RedisStrictCommand<String> SELECT = new RedisStrictCommand<String>("SELECT", new StringReplayDecoder()); RedisStrictCommand<String> SELECT = new RedisStrictCommand<String>("SELECT", new StringReplayDecoder());
RedisStrictCommand<String> CLIENT_SETNAME = new RedisStrictCommand<String>("CLIENT", "SETNAME", new StringReplayDecoder()); RedisStrictCommand<String> CLIENT_SETNAME = new RedisStrictCommand<String>("CLIENT", "SETNAME", new StringReplayDecoder());
RedisStrictCommand<String> CLIENT_GETNAME = new RedisStrictCommand<String>("CLIENT", "GETNAME", new StringDataDecoder()); RedisStrictCommand<String> CLIENT_GETNAME = new RedisStrictCommand<String>("CLIENT", "GETNAME", new StringDataDecoder());
RedisStrictCommand<String> FLUSHDB = new RedisStrictCommand<String>("FLUSHDB", new StringReplayDecoder());
RedisStrictCommand<List<String>> KEYS = new RedisStrictCommand<List<String>>("KEYS", new StringListReplayDecoder());
RedisCommand<String> HMSET = new RedisCommand<String>("HMSET", new StringReplayDecoder(), 2, 3);
RedisCommand<Object> HMGET = new RedisCommand<Object>("HMGET", new StringListObjectReplayDecoder(), 2, 3);
RedisStrictCommand<Boolean> DEL_ONE = new RedisStrictCommand<Boolean>("DEL", new BooleanReplayConvertor());
RedisCommand<Object> GET = new RedisCommand<Object>("GET"); RedisCommand<Object> GET = new RedisCommand<Object>("GET");
RedisCommand<String> SET = new RedisCommand<String>("SET", new StringReplayDecoder(), 1); RedisCommand<String> SET = new RedisCommand<String>("SET", new StringReplayDecoder(), 1);
RedisCommand<String> SETEX = new RedisCommand<String>("SETEX", new StringReplayDecoder(), 2); RedisCommand<String> SETEX = new RedisCommand<String>("SETEX", new StringReplayDecoder(), 2);
RedisStrictCommand<Boolean> EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> EXISTS = new RedisStrictCommand<Boolean>("EXISTS", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> RENAMENX = new RedisStrictCommand<Boolean>("RENAMENX", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> RENAME = new RedisStrictCommand<Boolean>("RENAME", new BooleanReplayDecoder());
RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 1); RedisCommand<Long> PUBLISH = new RedisCommand<Long>("PUBLISH", 1);
RedisStrictCommand<PubSubStatusMessage> SUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("SUBSCRIBE", new PubSubStatusDecoder()); RedisStrictCommand<PubSubStatusMessage> SUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("SUBSCRIBE", new PubSubStatusDecoder());
@ -37,4 +57,9 @@ public interface RedisCommands {
RedisStrictCommand<PubSubStatusMessage> PSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("PSUBSCRIBE", new PubSubStatusDecoder()); RedisStrictCommand<PubSubStatusMessage> PSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("PSUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<PubSubStatusMessage> PUNSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("PUNSUBSCRIBE", new PubSubStatusDecoder()); RedisStrictCommand<PubSubStatusMessage> PUNSUBSCRIBE = new RedisStrictCommand<PubSubStatusMessage>("PUNSUBSCRIBE", new PubSubStatusDecoder());
RedisStrictCommand<String> CLUSTER_NODES = new RedisStrictCommand<String>("CLUSTER", "NODES", new StringDataDecoder());
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisStrictCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisStrictCommand<List<Map<String, String>>>("SENTINEL", "SLAVES", new StringMapReplayDecoder());
} }

@ -23,6 +23,11 @@ public class RedisStrictCommand<T> extends RedisCommand<T> {
super(name, replayMultiDecoder, encodeParamIndexes); super(name, replayMultiDecoder, encodeParamIndexes);
} }
public RedisStrictCommand(String name, String subName, MultiDecoder<T> replayMultiDecoder,
int... encodeParamIndexes) {
super(name, subName, replayMultiDecoder, encodeParamIndexes);
}
public RedisStrictCommand(String name, int... encodeParamIndexes) { public RedisStrictCommand(String name, int... encodeParamIndexes) {
super(name, encodeParamIndexes); super(name, encodeParamIndexes);
} }
@ -33,7 +38,7 @@ public class RedisStrictCommand<T> extends RedisCommand<T> {
public RedisStrictCommand(String name, String subName, Decoder<T> reponseDecoder, public RedisStrictCommand(String name, String subName, Decoder<T> reponseDecoder,
int... encodeParamIndexes) { int... encodeParamIndexes) {
super(name, subName, reponseDecoder, encodeParamIndexes); super(name, subName, null, reponseDecoder, encodeParamIndexes);
} }
public RedisStrictCommand(String name, String subName, int... encodeParamIndexes) { public RedisStrictCommand(String name, String subName, int... encodeParamIndexes) {

@ -33,6 +33,9 @@ public class StringCodec implements Codec {
@Override @Override
public Object decode(ByteBuf buf) { public Object decode(ByteBuf buf) {
if (buf == null) {
return null;
}
return buf.toString(CharsetUtil.UTF_8); return buf.toString(CharsetUtil.UTF_8);
} }

@ -0,0 +1,17 @@
package org.redisson.client.protocol.decoder;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class BooleanReplayDecoder implements Decoder<Boolean> {
@Override
public Boolean decode(ByteBuf buf) {
String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return "OK".equals(status);
}
}

@ -13,7 +13,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.client.protocol; package org.redisson.client.protocol.decoder;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;

@ -0,0 +1,22 @@
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringListObjectReplayDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);
}
@Override
public Object decode(List<Object> parts) {
return parts;
}
}

@ -0,0 +1,23 @@
package org.redisson.client.protocol.decoder;
import java.util.Arrays;
import java.util.List;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringListReplayDecoder implements MultiDecoder<List<String>> {
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);
}
@Override
public List<String> decode(List<Object> parts) {
return Arrays.asList(Arrays.copyOf(parts.toArray(), parts.size(), String[].class));
}
}

@ -0,0 +1,43 @@
package org.redisson.client.protocol.decoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.protocol.pubsub.MultiDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class StringMapReplayDecoder implements MultiDecoder<List<Map<String, String>>> {
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);
}
@Override
public List<Map<String, String>> decode(List<Object> parts) {
if (!parts.isEmpty()) {
if (parts.get(0) instanceof List) {
List<Map<String, String>> result = new ArrayList<Map<String, String>>(parts.size());
for (Object object : parts) {
List<Map<String, String>> list = (List<Map<String, String>>) object;
result.addAll(list);
}
return result;
}
}
Map<String, String> result = new HashMap<String, String>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) {
result.put(parts.get(i-1).toString(), parts.get(i).toString());
}
}
return Collections.singletonList(result);
}
}

@ -13,7 +13,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.client.protocol; package org.redisson.client.protocol.decoder;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
Loading…
Cancel
Save