diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 47a55248a..ad43da42e 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -16,6 +16,8 @@ package org.redisson.client; import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import org.redisson.client.handler.RedisCommandsQueue; @@ -83,17 +85,29 @@ public class RedisClient { } public RedisConnection connect() { - ChannelFuture future = bootstrap.connect(); - future.syncUninterruptibly(); - channels.add(future.channel()); - return new RedisConnection(this, future.channel()); + try { + ChannelFuture future = bootstrap.connect(); + future.syncUninterruptibly(); + channels.add(future.channel()); + return new RedisConnection(this, future.channel()); + } catch (Exception e) { + throw new RedisConnectionException("unable to connect", e); + } } public RedisPubSubConnection connectPubSub() { - ChannelFuture future = bootstrap.connect(); - future.syncUninterruptibly(); - channels.add(future.channel()); - return new RedisPubSubConnection(this, future.channel()); + try { + ChannelFuture future = bootstrap.connect(); + future.syncUninterruptibly(); + channels.add(future.channel()); + return new RedisPubSubConnection(this, future.channel()); + } catch (Exception e) { + throw new RedisConnectionException("unable to connect", e); + } + } + + public void shutdown() { + shutdownAsync().syncUninterruptibly(); } public ChannelGroupFuture shutdownAsync() { @@ -102,8 +116,23 @@ public class RedisClient { public static void main(String[] args) throws InterruptedException, ExecutionException { final RedisClient c = new RedisClient("127.0.0.1", 6379); - RedisConnection rc = c.connect(); - RedisPubSubConnection rpsc = c.connectPubSub(); + Object r = c.connect().sync(new StringCodec(), RedisCommands.GET, "test1"); + System.out.println(r); +// final RedisClient c = new RedisClient("127.0.0.1", 26379); +// RedisConnection rc = c.connect(); +// List res4 = rc.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, "mymaster"); +// System.out.println("r: " + res4); +// +// List> res5 = rc.sync(RedisCommands.SENTINEL_SLAVES, "mymaster"); +// System.out.println("r: " + res5); + + +/* RedisPubSubConnection rpsc = c.connectPubSub(); + + rc.sync(new StringCodec(), RedisCommands.HMSET, "test", "1", "2"); + rc.sync(new StringCodec(), RedisCommands.HMSET, "test", "2", "3"); + List r = rc.sync(new StringCodec(), RedisCommands.HMGET, "test", "1", "2"); + String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333"); System.out.println("res 12: " + res1); @@ -144,25 +173,6 @@ public class RedisClient { Long res3 = c3.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444"); System.out.println("published: " + res3); - -/* Future res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random()); - res.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { -// System.out.println("res 1: " + future.getNow()); - } - - }); - - Future r = rc.execute(new StringCodec(), RedisCommands.GET, "test"); - r.addListener(new FutureListener() { - - @Override - public void operationComplete(Future future) throws Exception { - System.out.println("res 2: " + future.getNow()); - } - }); -*/// } - } +*/ } } + diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index bf9e3f600..4b451967d 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -59,17 +59,17 @@ public class RedisConnection implements RedisCommands { return cmd.getNow(); } - public V get(Future future) { - future.awaitUninterruptibly(); - if (future.isSuccess()) { - return future.getNow(); - } - - if (future.cause() instanceof RedisException) { - throw (RedisException) future.cause(); - } - throw new RedisException("Unexpected exception while processing command", future.cause()); - } +// public V get(Future future) { +// future.awaitUninterruptibly(); +// if (future.isSuccess()) { +// return future.getNow(); +// } +// +// if (future.cause() instanceof RedisException) { +// throw (RedisException) future.cause(); +// } +// throw new RedisException("Unexpected exception while processing command", future.cause()); +// } public T sync(RedisStrictCommand command, Object ... params) { Future r = async(null, command, params); diff --git a/src/main/java/org/redisson/client/RedisConnectionException.java b/src/main/java/org/redisson/client/RedisConnectionException.java new file mode 100644 index 000000000..8b25eaadc --- /dev/null +++ b/src/main/java/org/redisson/client/RedisConnectionException.java @@ -0,0 +1,15 @@ +package org.redisson.client; + +public class RedisConnectionException extends RedisException { + + private static final long serialVersionUID = -4756928186967834601L; + + public RedisConnectionException(String msg) { + super(msg); + } + + public RedisConnectionException(String msg, Throwable e) { + super(msg, e); + } + +} diff --git a/src/main/java/org/redisson/client/RedisMovedException.java b/src/main/java/org/redisson/client/RedisMovedException.java new file mode 100644 index 000000000..eddefd342 --- /dev/null +++ b/src/main/java/org/redisson/client/RedisMovedException.java @@ -0,0 +1,17 @@ +package org.redisson.client; + +public class RedisMovedException extends RedisException { + + private static final long serialVersionUID = -6969734163155547631L; + + private int slot; + + public RedisMovedException(int slot) { + this.slot = slot; + } + + public int getSlot() { + return slot; + } + +} diff --git a/src/main/java/org/redisson/client/RedisPubSubConnection.java b/src/main/java/org/redisson/client/RedisPubSubConnection.java index fee152892..af152ad2a 100644 --- a/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -18,37 +18,28 @@ package org.redisson.client; import java.util.concurrent.ConcurrentLinkedQueue; import org.redisson.client.handler.RedisData; -import org.redisson.client.protocol.Codec; -import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.StringCodec; import org.redisson.client.protocol.pubsub.MultiDecoder; import org.redisson.client.protocol.pubsub.PubSubMessage; import org.redisson.client.protocol.pubsub.PubSubMessageDecoder; import org.redisson.client.protocol.pubsub.PubSubPatternMessage; import org.redisson.client.protocol.pubsub.PubSubPatternMessageDecoder; -import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -public class RedisPubSubConnection { +public class RedisPubSubConnection extends RedisConnection { public static final AttributeKey CONNECTION = AttributeKey.valueOf("connection"); final ConcurrentLinkedQueue> listeners = new ConcurrentLinkedQueue>(); - final Channel channel; - final RedisClient redisClient; - public RedisPubSubConnection(RedisClient redisClient, Channel channel) { - this.redisClient = redisClient; - this.channel = channel; + super(redisClient, channel); channel.attr(CONNECTION).set(this); } @@ -57,6 +48,10 @@ public class RedisPubSubConnection { listeners.add(listener); } + public void removeListener(RedisPubSubListener listener) { + listeners.remove(listener); + } + public void onMessage(PubSubMessage message) { for (RedisPubSubListener redisPubSubListener : listeners) { redisPubSubListener.onMessage(message.getChannel(), message.getValue()); @@ -78,28 +73,17 @@ public class RedisPubSubConnection { } public Future unsubscribe(String ... channel) { - return async(null, RedisCommands.UNSUBSCRIBE, channel); + return async((MultiDecoder)null, RedisCommands.UNSUBSCRIBE, channel); } public Future punsubscribe(String ... channel) { - return async(null, RedisCommands.PUNSUBSCRIBE, channel); + return async((MultiDecoder)null, RedisCommands.PUNSUBSCRIBE, channel); } -// public Future async(Codec encoder, RedisCommand command, Object ... params) { -// Promise promise = redisClient.getBootstrap().group().next().newPromise(); -// channel.writeAndFlush(new RedisData(promise, encoder, command, params)); -// return promise; -// } - - public Future async(MultiDecoder nextDecoder, RedisCommand command, Object ... params) { + public Future async(MultiDecoder messageDecoder, RedisCommand command, Object ... params) { Promise promise = redisClient.getBootstrap().group().next().newPromise(); - channel.writeAndFlush(new RedisData(promise, nextDecoder, null, command, params)); + channel.writeAndFlush(new RedisData(promise, messageDecoder, null, command, params)); return promise; } - - public ChannelFuture closeAsync() { - return channel.close(); - } - } diff --git a/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java b/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java index fded71d88..346b78bf9 100644 --- a/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java +++ b/src/main/java/org/redisson/client/handler/RedisCommandsQueue.java @@ -27,7 +27,7 @@ public class RedisCommandsQueue extends ChannelDuplexHandler { public enum QueueCommands {NEXT_COMMAND} - public static final AttributeKey> REPLAY_PROMISE = AttributeKey.valueOf("promise"); + public static final AttributeKey> REPLAY = AttributeKey.valueOf("promise"); private final Queue> queue = PlatformDependent.newMpscQueue(); @@ -59,7 +59,7 @@ public class RedisCommandsQueue extends ChannelDuplexHandler { private void sendData(ChannelHandlerContext ctx) throws Exception { RedisData data = queue.peek(); if (data != null && data.getSended().compareAndSet(false, true)) { - ctx.channel().attr(REPLAY_PROMISE).set(data); + ctx.channel().attr(REPLAY).set(data); ctx.channel().writeAndFlush(data); } } diff --git a/src/main/java/org/redisson/client/handler/RedisDecoder.java b/src/main/java/org/redisson/client/handler/RedisDecoder.java index 7ff23219b..bcba87319 100644 --- a/src/main/java/org/redisson/client/handler/RedisDecoder.java +++ b/src/main/java/org/redisson/client/handler/RedisDecoder.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.redisson.client.RedisException; +import org.redisson.client.RedisMovedException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.handler.RedisCommandsQueue.QueueCommands; import org.redisson.client.protocol.Decoder; @@ -45,7 +46,7 @@ public class RedisDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - RedisData data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove(); + RedisData data = ctx.channel().attr(RedisCommandsQueue.REPLAY).get(); RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get(); Decoder currentDecoder = null; @@ -58,20 +59,34 @@ public class RedisDecoder extends ReplayingDecoder { }; } + System.out.println("message " + in.writerIndex() + "-" + in.readerIndex() + " in: " + in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); + decode(in, data, null, pubSubConnection, currentDecoder); + ctx.channel().attr(RedisCommandsQueue.REPLAY).remove(); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); } private void decode(ByteBuf in, RedisData data, List parts, RedisPubSubConnection pubSubConnection, Decoder currentDecoder) throws IOException { int code = in.readByte(); -// System.out.println("trying decode -- " + (char)code); if (code == '+') { Object result = data.getCommand().getReplayDecoder().decode(in); handleResult(data, parts, result); } else if (code == '-') { - Object result = data.getCommand().getReplayDecoder().decode(in); - data.getPromise().setFailure(new RedisException(result.toString())); + String error = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); + in.skipBytes(2); + + if (error.startsWith("MOVED")) { + String[] errorParts = error.split(" "); + int slot = Integer.valueOf(errorParts[1]); + data.getPromise().setFailure(new RedisMovedException(slot)); + } else if (error.startsWith("(error) ASK")) { + String[] errorParts = error.split(" "); + int slot = Integer.valueOf(errorParts[2]); + data.getPromise().setFailure(new RedisMovedException(slot)); + } else { + data.getPromise().setFailure(new RedisException(error)); + } } else if (code == ':') { String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); in.skipBytes(2); @@ -99,7 +114,12 @@ public class RedisDecoder extends ReplayingDecoder { messageDecoders.remove(param.toString()); } } - data.getPromise().setSuccess(result); + + if (parts != null) { + parts.add(result); + } else { + data.getPromise().setSuccess(result); + } } else { if (result instanceof PubSubMessage) { pubSubConnection.onMessage((PubSubMessage) result); diff --git a/src/main/java/org/redisson/client/protocol/RedisCommand.java b/src/main/java/org/redisson/client/protocol/RedisCommand.java index 2fe26acc3..fa010d8d9 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommand.java @@ -29,11 +29,11 @@ public class RedisCommand { private Convertor convertor = new EmptyConvertor(); public RedisCommand(String name, String subName, int ... encodeParamIndexes) { - this(name, subName, null, encodeParamIndexes); + this(name, subName, null, null, encodeParamIndexes); } public RedisCommand(String name, int ... encodeParamIndexes) { - this(name, null, null, encodeParamIndexes); + this(name, null, null, null, encodeParamIndexes); } public RedisCommand(String name, Convertor convertor, int ... encodeParamIndexes) { @@ -44,7 +44,7 @@ public class RedisCommand { } public RedisCommand(String name, Decoder reponseDecoder, int ... encodeParamIndexes) { - this(name, null, reponseDecoder, encodeParamIndexes); + this(name, null, null, reponseDecoder, encodeParamIndexes); } public RedisCommand(String name, MultiDecoder replayMultiDecoder, int ... encodeParamIndexes) { @@ -54,10 +54,16 @@ public class RedisCommand { this.replayMultiDecoder = replayMultiDecoder; } - public RedisCommand(String name, String subName, Decoder reponseDecoder, int ... encodeParamIndexes) { + public RedisCommand(String name, String subName, MultiDecoder replayMultiDecoder, + int... encodeParamIndexes) { + this(name, subName, replayMultiDecoder, null, encodeParamIndexes); + } + + public RedisCommand(String name, String subName, MultiDecoder replayMultiDecoder, Decoder reponseDecoder, int ... encodeParamIndexes) { super(); this.name = name; this.subName = subName; + this.replayMultiDecoder = replayMultiDecoder; this.replayDecoder = reponseDecoder; this.objectParamIndexes = encodeParamIndexes; } diff --git a/src/main/java/org/redisson/client/protocol/RedisCommands.java b/src/main/java/org/redisson/client/protocol/RedisCommands.java index 0e0fcba79..fb625bbf6 100644 --- a/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -15,6 +15,15 @@ */ package org.redisson.client.protocol; +import java.util.List; +import java.util.Map; + +import org.redisson.client.protocol.decoder.BooleanReplayDecoder; +import org.redisson.client.protocol.decoder.StringDataDecoder; +import org.redisson.client.protocol.decoder.StringListObjectReplayDecoder; +import org.redisson.client.protocol.decoder.StringListReplayDecoder; +import org.redisson.client.protocol.decoder.StringMapReplayDecoder; +import org.redisson.client.protocol.decoder.StringReplayDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusMessage; @@ -24,12 +33,23 @@ public interface RedisCommands { RedisStrictCommand SELECT = new RedisStrictCommand("SELECT", new StringReplayDecoder()); RedisStrictCommand CLIENT_SETNAME = new RedisStrictCommand("CLIENT", "SETNAME", new StringReplayDecoder()); RedisStrictCommand CLIENT_GETNAME = new RedisStrictCommand("CLIENT", "GETNAME", new StringDataDecoder()); + RedisStrictCommand FLUSHDB = new RedisStrictCommand("FLUSHDB", new StringReplayDecoder()); + + RedisStrictCommand> KEYS = new RedisStrictCommand>("KEYS", new StringListReplayDecoder()); + + RedisCommand HMSET = new RedisCommand("HMSET", new StringReplayDecoder(), 2, 3); + RedisCommand HMGET = new RedisCommand("HMGET", new StringListObjectReplayDecoder(), 2, 3); + + RedisStrictCommand DEL_ONE = new RedisStrictCommand("DEL", new BooleanReplayConvertor()); RedisCommand GET = new RedisCommand("GET"); RedisCommand SET = new RedisCommand("SET", new StringReplayDecoder(), 1); RedisCommand SETEX = new RedisCommand("SETEX", new StringReplayDecoder(), 2); RedisStrictCommand EXISTS = new RedisStrictCommand("EXISTS", new BooleanReplayConvertor()); + RedisStrictCommand RENAMENX = new RedisStrictCommand("RENAMENX", new BooleanReplayConvertor()); + RedisStrictCommand RENAME = new RedisStrictCommand("RENAME", new BooleanReplayDecoder()); + RedisCommand PUBLISH = new RedisCommand("PUBLISH", 1); RedisStrictCommand SUBSCRIBE = new RedisStrictCommand("SUBSCRIBE", new PubSubStatusDecoder()); @@ -37,4 +57,9 @@ public interface RedisCommands { RedisStrictCommand PSUBSCRIBE = new RedisStrictCommand("PSUBSCRIBE", new PubSubStatusDecoder()); RedisStrictCommand PUNSUBSCRIBE = new RedisStrictCommand("PUNSUBSCRIBE", new PubSubStatusDecoder()); + RedisStrictCommand CLUSTER_NODES = new RedisStrictCommand("CLUSTER", "NODES", new StringDataDecoder()); + + RedisStrictCommand> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder()); + RedisStrictCommand>> SENTINEL_SLAVES = new RedisStrictCommand>>("SENTINEL", "SLAVES", new StringMapReplayDecoder()); + } diff --git a/src/main/java/org/redisson/client/protocol/RedisStrictCommand.java b/src/main/java/org/redisson/client/protocol/RedisStrictCommand.java index 8e24a2491..03b49b241 100644 --- a/src/main/java/org/redisson/client/protocol/RedisStrictCommand.java +++ b/src/main/java/org/redisson/client/protocol/RedisStrictCommand.java @@ -23,6 +23,11 @@ public class RedisStrictCommand extends RedisCommand { super(name, replayMultiDecoder, encodeParamIndexes); } + public RedisStrictCommand(String name, String subName, MultiDecoder replayMultiDecoder, + int... encodeParamIndexes) { + super(name, subName, replayMultiDecoder, encodeParamIndexes); + } + public RedisStrictCommand(String name, int... encodeParamIndexes) { super(name, encodeParamIndexes); } @@ -33,7 +38,7 @@ public class RedisStrictCommand extends RedisCommand { public RedisStrictCommand(String name, String subName, Decoder reponseDecoder, int... encodeParamIndexes) { - super(name, subName, reponseDecoder, encodeParamIndexes); + super(name, subName, null, reponseDecoder, encodeParamIndexes); } public RedisStrictCommand(String name, String subName, int... encodeParamIndexes) { diff --git a/src/main/java/org/redisson/client/protocol/StringCodec.java b/src/main/java/org/redisson/client/protocol/StringCodec.java index 340770331..ab108bcbf 100644 --- a/src/main/java/org/redisson/client/protocol/StringCodec.java +++ b/src/main/java/org/redisson/client/protocol/StringCodec.java @@ -33,6 +33,9 @@ public class StringCodec implements Codec { @Override public Object decode(ByteBuf buf) { + if (buf == null) { + return null; + } return buf.toString(CharsetUtil.UTF_8); } diff --git a/src/main/java/org/redisson/client/protocol/decoder/BooleanReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/BooleanReplayDecoder.java new file mode 100644 index 000000000..93ad4ef23 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/BooleanReplayDecoder.java @@ -0,0 +1,17 @@ +package org.redisson.client.protocol.decoder; + +import org.redisson.client.protocol.Decoder; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class BooleanReplayDecoder implements Decoder { + + @Override + public Boolean decode(ByteBuf buf) { + String status = buf.readBytes(buf.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); + buf.skipBytes(2); + return "OK".equals(status); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/StringDataDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringDataDecoder.java similarity index 90% rename from src/main/java/org/redisson/client/protocol/StringDataDecoder.java rename to src/main/java/org/redisson/client/protocol/decoder/StringDataDecoder.java index cdd929bf4..8407adc53 100644 --- a/src/main/java/org/redisson/client/protocol/StringDataDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringDataDecoder.java @@ -13,7 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.client.protocol; +package org.redisson.client.protocol.decoder; + +import org.redisson.client.protocol.Decoder; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringListObjectReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringListObjectReplayDecoder.java new file mode 100644 index 000000000..3989a427b --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/StringListObjectReplayDecoder.java @@ -0,0 +1,22 @@ +package org.redisson.client.protocol.decoder; + +import java.util.List; + +import org.redisson.client.protocol.pubsub.MultiDecoder; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class StringListObjectReplayDecoder implements MultiDecoder { + + @Override + public Object decode(ByteBuf buf) { + return buf.toString(CharsetUtil.UTF_8); + } + + @Override + public Object decode(List parts) { + return parts; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java new file mode 100644 index 000000000..5f0941a72 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/StringListReplayDecoder.java @@ -0,0 +1,23 @@ +package org.redisson.client.protocol.decoder; + +import java.util.Arrays; +import java.util.List; + +import org.redisson.client.protocol.pubsub.MultiDecoder; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class StringListReplayDecoder implements MultiDecoder> { + + @Override + public Object decode(ByteBuf buf) { + return buf.toString(CharsetUtil.UTF_8); + } + + @Override + public List decode(List parts) { + return Arrays.asList(Arrays.copyOf(parts.toArray(), parts.size(), String[].class)); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java new file mode 100644 index 000000000..f99641b43 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/StringMapReplayDecoder.java @@ -0,0 +1,43 @@ +package org.redisson.client.protocol.decoder; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.protocol.pubsub.MultiDecoder; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; + +public class StringMapReplayDecoder implements MultiDecoder>> { + + @Override + public Object decode(ByteBuf buf) { + return buf.toString(CharsetUtil.UTF_8); + } + + @Override + public List> decode(List parts) { + if (!parts.isEmpty()) { + if (parts.get(0) instanceof List) { + List> result = new ArrayList>(parts.size()); + for (Object object : parts) { + List> list = (List>) object; + result.addAll(list); + } + return result; + } + } + + Map result = new HashMap(parts.size()/2); + for (int i = 0; i < parts.size(); i++) { + if (i % 2 != 0) { + result.put(parts.get(i-1).toString(), parts.get(i).toString()); + } + } + return Collections.singletonList(result); + } + +} diff --git a/src/main/java/org/redisson/client/protocol/StringReplayDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/StringReplayDecoder.java similarity index 91% rename from src/main/java/org/redisson/client/protocol/StringReplayDecoder.java rename to src/main/java/org/redisson/client/protocol/decoder/StringReplayDecoder.java index 05d841794..938aff8c2 100644 --- a/src/main/java/org/redisson/client/protocol/StringReplayDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/StringReplayDecoder.java @@ -13,7 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.redisson.client.protocol; +package org.redisson.client.protocol.decoder; + +import org.redisson.client.protocol.Decoder; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil;