diff --git a/redisson/src/main/java/org/redisson/RedissonGeo.java b/redisson/src/main/java/org/redisson/RedissonGeo.java index 30385eeca..32ca55d87 100644 --- a/redisson/src/main/java/org/redisson/RedissonGeo.java +++ b/redisson/src/main/java/org/redisson/RedissonGeo.java @@ -35,12 +35,11 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.CodecDecoder; import org.redisson.client.protocol.decoder.GeoDistanceDecoder; -import org.redisson.client.protocol.decoder.GeoMapReplayDecoder; import org.redisson.client.protocol.decoder.GeoPositionDecoder; import org.redisson.client.protocol.decoder.GeoPositionMapDecoder; -import org.redisson.client.protocol.decoder.ListMultiDecoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder2; import org.redisson.client.protocol.decoder.MultiDecoder; -import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder2; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; @@ -53,10 +52,15 @@ import org.redisson.connection.decoder.MapGetAllDecoder; */ public class RedissonGeo extends RedissonScoredSortedSet implements RGeo { - private static final MultiDecoder> POSTITION_DECODER = new ListMultiDecoder(new CodecDecoder(), - new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoMapReplayDecoder()); - private static final MultiDecoder> DISTANCE_DECODER = new ListMultiDecoder( - new GeoDistanceDecoder(), new GeoMapReplayDecoder()); + private static final MultiDecoder> POSTITION_DECODER = new ListMultiDecoder2( + new ObjectMapReplayDecoder2(), + new CodecDecoder(), + new GeoPositionDecoder()); + + private static final MultiDecoder> DISTANCE_DECODER = new ListMultiDecoder2( + new ObjectMapReplayDecoder2(), + new GeoDistanceDecoder()); + private static final RedisCommand> GEORADIUS_RO_DISTANCE = new RedisCommand>( "GEORADIUS_RO", DISTANCE_DECODER); private static final RedisCommand> GEORADIUS_RO_POS = new RedisCommand>( @@ -147,9 +151,9 @@ public class RedissonGeo extends RedissonScoredSortedSet implements RGeo> decoder = new ListMultiDecoder(0, new GeoPositionDecoder(), - // new ObjectListReplayDecoder(ListMultiDecoder.RESET), - new GeoPositionMapDecoder((List) Arrays.asList(members))); + MultiDecoder> decoder = new ListMultiDecoder2( + new GeoPositionMapDecoder((List) Arrays.asList(members)), + new GeoPositionDecoder()); RedisCommand> command = new RedisCommand>("GEOPOS", decoder); return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, command, params.toArray()); } diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index e3ca5dcf4..96046628a 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -45,12 +45,10 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.NumberConvertor; -import org.redisson.client.protocol.decoder.ListMultiDecoder; -import org.redisson.client.protocol.decoder.LongMultiDecoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder2; import org.redisson.client.protocol.decoder.MapCacheScanResult; import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResult; -import org.redisson.client.protocol.decoder.ObjectListDecoder; import org.redisson.client.protocol.decoder.ObjectMapDecoder; import org.redisson.codec.MapCacheEventCodec; import org.redisson.codec.MapCacheEventCodec.OSType; @@ -1231,7 +1229,9 @@ public class RedissonMapCache extends RedissonMap implements RMapCac params.add(count); RedisCommand> command = new RedisCommand>("EVAL", - new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(codec), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); + new ListMultiDecoder2( + new MapCacheScanResultReplayDecoder(), + new ObjectMapDecoder(codec, true)), ValueType.MAP); RFuture> f = commandExecutor.evalReadAsync(client, name, codec, command, "local result = {}; " + "local idleKeys = {}; " diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index 60347d0e3..8f83b04dc 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -38,10 +38,9 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.ListMultiDecoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder2; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; -import org.redisson.client.protocol.decoder.LongMultiDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.command.CommandAsyncExecutor; @@ -55,7 +54,7 @@ import org.redisson.command.CommandAsyncExecutor; public class RedissonSetMultimapValues extends RedissonExpirable implements RSet { private static final RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", - new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE); + new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder()), ValueType.MAP_VALUE); private final RSet set; private final Object key; diff --git a/redisson/src/main/java/org/redisson/RedissonStream.java b/redisson/src/main/java/org/redisson/RedissonStream.java index d57ba7b51..61eb095ed 100644 --- a/redisson/src/main/java/org/redisson/RedissonStream.java +++ b/redisson/src/main/java/org/redisson/RedissonStream.java @@ -35,10 +35,10 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.ListMultiDecoder; -import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; +import org.redisson.client.protocol.decoder.CodecDecoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder2; +import org.redisson.client.protocol.decoder.ObjectMapDecoder; import org.redisson.client.protocol.decoder.StreamInfoDecoder; -import org.redisson.client.protocol.decoder.StreamInfoMapDecoder; import org.redisson.command.CommandAsyncExecutor; /** @@ -50,18 +50,12 @@ import org.redisson.command.CommandAsyncExecutor; */ public class RedissonStream extends RedissonExpirable implements RStream { - private final RedisCommand> xinfoStreamCommand; - public RedissonStream(Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); - xinfoStreamCommand = new RedisCommand>("XINFO", "STREAM", - new ListMultiDecoder(new StreamInfoMapDecoder(getCodec()), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new StreamInfoDecoder())); } public RedissonStream(CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); - xinfoStreamCommand = new RedisCommand>("XINFO", "STREAM", - new ListMultiDecoder(new StreamInfoMapDecoder(getCodec()), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new StreamInfoDecoder())); } protected void checkKey(Object key) { @@ -966,6 +960,12 @@ public class RedissonStream extends RedissonExpirable implements RStream> getInfoAsync() { + RedisCommand> xinfoStreamCommand = new RedisCommand<>("XINFO", "STREAM", + new ListMultiDecoder2( + new StreamInfoDecoder(), + new CodecDecoder(), + new ObjectMapDecoder(getCodec(), false))); + return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, xinfoStreamCommand, getName()); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 5bd5722ef..831c51de8 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -46,21 +46,20 @@ import org.redisson.client.protocol.convertor.TypeConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; -import org.redisson.client.protocol.decoder.ListMultiDecoder; +import org.redisson.client.protocol.decoder.ListMultiDecoder2; import org.redisson.client.protocol.decoder.ListObjectDecoder; import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.Long2MultiDecoder; -import org.redisson.client.protocol.decoder.LongMultiDecoder; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.ObjectDecoder; import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; -import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder2; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.PendingEntryDecoder; import org.redisson.client.protocol.decoder.PendingResultDecoder; @@ -154,10 +153,10 @@ public interface RedisCommands { RedisCommand>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZRANGE_ENTRY = new RedisCommand>>("ZRANGE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZRANGEBYSCORE_ENTRY = new RedisCommand>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder()); - RedisCommand> ZSCAN = new RedisCommand>("ZSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ScoredSortedSetScanDecoder(), new ScoredSortedSetScanReplayDecoder())); + RedisCommand> ZSCAN = new RedisCommand>("ZSCAN", new ListMultiDecoder2(new ScoredSortedSetScanReplayDecoder(), new ScoredSortedSetScanDecoder())); RedisStrictCommand ZINCRBY = new RedisStrictCommand("ZINCRBY", new DoubleNullSafeReplayConvertor()); - RedisCommand> SCAN = new RedisCommand>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); + RedisCommand> SCAN = new RedisCommand>("SCAN", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder())); RedisStrictCommand RANDOM_KEY = new RedisStrictCommand("RANDOMKEY"); RedisCommand PING = new RedisCommand("PING"); RedisStrictCommand PING_BOOL = new RedisStrictCommand("PING", new BooleanNotNullReplayConvertor()); @@ -179,9 +178,9 @@ public interface RedisCommands { RedisCommand> SMEMBERS = new RedisCommand>("SMEMBERS", new ObjectSetReplayDecoder()); RedisCommand> SRANDMEMBER = new RedisCommand>("SRANDMEMBER", new ObjectSetReplayDecoder()); RedisCommand SRANDMEMBER_SINGLE = new RedisCommand("SRANDMEMBER"); - RedisCommand> SSCAN = new RedisCommand>("SSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); - RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); - RedisCommand> EVAL_ZSCAN = new RedisCommand>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); + RedisCommand> SSCAN = new RedisCommand>("SSCAN", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder())); + RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder())); + RedisCommand> EVAL_ZSCAN = new RedisCommand>("EVAL", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder())); RedisCommand SISMEMBER = new RedisCommand("SISMEMBER", new BooleanReplayConvertor()); RedisStrictCommand SCARD_INT = new RedisStrictCommand("SCARD", new IntegerReplayConvertor()); RedisStrictCommand SCARD = new RedisStrictCommand("SCARD"); @@ -295,7 +294,7 @@ public interface RedisCommands { RedisStrictCommand HSET = new RedisStrictCommand("HSET", new BooleanReplayConvertor()); RedisStrictCommand HSET_VOID = new RedisStrictCommand("HSET", new VoidReplayConvertor()); RedisCommand> HSCAN = new RedisCommand>("HSCAN", - new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); + new ListMultiDecoder2(new MapScanResultReplayDecoder(), new ObjectMapReplayDecoder()), ValueType.MAP); RedisCommand> HGETALL = new RedisCommand>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP); RedisCommand>> HGETALL_ENTRY = new RedisCommand>>("HGETALL", new ObjectMapEntryReplayDecoder(), ValueType.MAP); RedisCommand> HVALS = new RedisCommand>("HVALS", new ObjectListReplayDecoder(), ValueType.MAP_VALUE); @@ -331,77 +330,67 @@ public interface RedisCommands { RedisCommand PSETEX = new RedisCommand("PSETEX", new VoidReplayConvertor()); RedisCommand>>> XRANGE = new RedisCommand>>>("XRANGE", - new ListMultiDecoder( + new ListMultiDecoder2( + new ObjectMapReplayDecoder2(), new ObjectDecoder(new StreamIdDecoder()), - new StreamObjectMapReplayDecoder(), - new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET), - new ObjectMapJoinDecoder()), ValueType.MAP); + new StreamObjectMapReplayDecoder()), ValueType.MAP); RedisCommand>>> XREVRANGE = new RedisCommand>>>("XREVRANGE", XRANGE.getReplayMultiDecoder(), ValueType.MAP); RedisCommand>>> XREAD = new RedisCommand>>>("XREAD", - new ListMultiDecoder( + new ListMultiDecoder2( + new StreamResultDecoder(false), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), - new StreamObjectMapReplayDecoder(), - new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), - new ObjectMapJoinDecoder(), - new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), - new StreamResultDecoder()), ValueType.MAP); + new ObjectDecoder(new StreamIdDecoder()), + new StreamObjectMapReplayDecoder()), ValueType.MAP); RedisCommand>>> XREAD_BLOCKING = new RedisCommand>>>("XREAD", XREAD.getReplayMultiDecoder()); RedisCommand>> XREAD_SINGLE = new RedisCommand>>("XREAD", - new ListMultiDecoder( + new ListMultiDecoder2( + new StreamResultDecoder(true), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), - new StreamObjectMapReplayDecoder(), - new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), - new ObjectMapJoinDecoder(), - new StreamObjectMapReplayDecoder(), - new StreamResultDecoder()), ValueType.MAP); + new ObjectDecoder(new StreamIdDecoder()), + new StreamObjectMapReplayDecoder()), ValueType.MAP); RedisCommand>> XREAD_BLOCKING_SINGLE = new RedisCommand>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder(), ValueType.MAP); RedisCommand>>> XREADGROUP = new RedisCommand>>>("XREADGROUP", - new ListMultiDecoder( + new ListMultiDecoder2( + new StreamResultDecoder(false), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), - new StreamObjectMapReplayDecoder(), - new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), - new ObjectMapJoinDecoder(), - new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX), - new StreamResultDecoder()), ValueType.MAP); + new ObjectDecoder(new StreamIdDecoder()), + new StreamObjectMapReplayDecoder()), ValueType.MAP); RedisCommand>>> XREADGROUP_BLOCKING = new RedisCommand>>>("XREADGROUP", XREADGROUP.getReplayMultiDecoder(), ValueType.MAP); RedisCommand>> XREADGROUP_SINGLE = new RedisCommand>>("XREADGROUP", - new ListMultiDecoder( + new ListMultiDecoder2( + new StreamResultDecoder(true), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(new StreamIdDecoder()), - new StreamObjectMapReplayDecoder(), - new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), - new ObjectMapJoinDecoder(), - new StreamObjectMapReplayDecoder(), - new StreamResultDecoder()), ValueType.MAP); + new ObjectDecoder(new StreamIdDecoder()), + new StreamObjectMapReplayDecoder()), ValueType.MAP); RedisCommand> XINFO_GROUPS = new RedisCommand>("XINFO", "GROUPS", - new ListMultiDecoder(0, new StreamGroupInfoDecoder(), new ObjectListReplayDecoder())); + new ListMultiDecoder2(new ObjectListReplayDecoder(), new StreamGroupInfoDecoder())); - RedisCommand> XINFO_CONSUMERS = new RedisCommand>("XINFO", "CONSUMERS", - new ListMultiDecoder(0, new StreamConsumerInfoDecoder(), new ObjectListReplayDecoder())); + RedisCommand> XINFO_CONSUMERS = new RedisCommand<>("XINFO", "CONSUMERS", + new ListMultiDecoder2(new ObjectListReplayDecoder(), new StreamConsumerInfoDecoder())); RedisCommand> XCLAIM_IDS = new RedisCommand>("XCLAIM", new StreamIdListDecoder()); RedisCommand>> XCLAIM = new RedisCommand>>("XCLAIM", - new ListMultiDecoder( + new ListMultiDecoder2( + new ObjectMapReplayDecoder2(), new ObjectDecoder(new StreamIdDecoder()), - new StreamObjectMapReplayDecoder(), - new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET), - new ObjectMapJoinDecoder()), ValueType.MAP); + new StreamObjectMapReplayDecoder()), ValueType.MAP); RedisCommand>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand>>("XREADGROUP", XREADGROUP_SINGLE.getReplayMultiDecoder()); @@ -418,7 +407,7 @@ public interface RedisCommands { RedisStrictCommand XDEL = new RedisStrictCommand("XDEL"); RedisStrictCommand XTRIM = new RedisStrictCommand("XTRIM"); RedisCommand XPENDING = new RedisCommand("XPENDING", - new ListMultiDecoder(0, new ObjectListReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new PendingResultDecoder())); + new ListMultiDecoder2(new PendingResultDecoder(), new ObjectListReplayDecoder(), new ObjectListReplayDecoder())); RedisCommand XPENDING_ENTRIES = new RedisCommand("XPENDING", new PendingEntryDecoder()); @@ -458,11 +447,11 @@ public interface RedisCommands { RedisStrictCommand> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder()); RedisCommand>> SENTINEL_MASTERS = new RedisCommand>>("SENTINEL", "MASTERS", - new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new ListResultReplayDecoder())); + new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder())); RedisCommand>> SENTINEL_SLAVES = new RedisCommand>>("SENTINEL", "SLAVES", - new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new ListResultReplayDecoder())); + new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder())); RedisCommand>> SENTINEL_SENTINELS = new RedisCommand>>("SENTINEL", "SENTINELS", - new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new ListResultReplayDecoder())); + new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder())); RedisStrictCommand CLUSTER_ADDSLOTS = new RedisStrictCommand("CLUSTER", "ADDSLOTS"); RedisStrictCommand CLUSTER_REPLICATE = new RedisStrictCommand("CLUSTER", "REPLICATE"); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/CodecDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/CodecDecoder.java index 24f47f4f9..e67dc233b 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/CodecDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/CodecDecoder.java @@ -34,7 +34,7 @@ public class CodecDecoder implements MultiDecoder { @Override public Object decode(List parts, State state) { - return null; + return parts; } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/GeoMapReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/GeoMapReplayDecoder.java index cf3f6d843..d85c1f50a 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/GeoMapReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/GeoMapReplayDecoder.java @@ -27,6 +27,7 @@ import org.redisson.client.protocol.Decoder; * @author Nikita Koksharov * */ +@Deprecated public class GeoMapReplayDecoder implements MultiDecoder> { @Override diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java index 862c43d51..cd9fbeb97 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder.java @@ -29,6 +29,7 @@ import io.netty.buffer.ByteBuf; * * @param type */ +@Deprecated public class ListMultiDecoder implements MultiDecoder { public static final Decoder RESET = new Decoder() { diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder2.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder2.java new file mode 100644 index 000000000..d640e08f9 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListMultiDecoder2.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.List; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +/** + * + * @author Nikita Koksharov + * + * @param type + */ +public class ListMultiDecoder2 implements MultiDecoder { + + private final MultiDecoder[] decoders; + + public ListMultiDecoder2(MultiDecoder... decoders) { + this.decoders = decoders; + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + int index = state.getLevel(); + return decoders[index].getDecoder(paramNum, state); + } + + @Override + public Object decode(List parts, State state) { + int index = state.getLevel(); + return decoders[index].decode(parts, state); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResultReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResultReplayDecoder.java index 67060d9d3..fe41058ad 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResultReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResultReplayDecoder.java @@ -17,6 +17,7 @@ package org.redisson.client.protocol.decoder; import java.util.List; +import org.redisson.client.codec.LongCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -29,7 +30,7 @@ public class ListScanResultReplayDecoder implements MultiDecoder getDecoder(int paramNum, State state) { - return null; + return LongCodec.INSTANCE.getValueDecoder(); } @Override diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResultReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResultReplayDecoder.java index c505f074d..e021267e4 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResultReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/MapCacheScanResultReplayDecoder.java @@ -18,6 +18,7 @@ package org.redisson.client.protocol.decoder; import java.util.List; import java.util.Map; +import org.redisson.client.codec.LongCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -38,7 +39,7 @@ public class MapCacheScanResultReplayDecoder implements MultiDecoder getDecoder(int paramNum, State state) { - return null; + return LongCodec.INSTANCE.getValueDecoder(); } } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java index 40008f3c5..c8cf530e0 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/MapScanResultReplayDecoder.java @@ -18,6 +18,7 @@ package org.redisson.client.protocol.decoder; import java.util.List; import java.util.Map; +import org.redisson.client.codec.LongCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -30,7 +31,7 @@ public class MapScanResultReplayDecoder implements MultiDecoder getDecoder(int paramNum, State state) { - return null; + return LongCodec.INSTANCE.getValueDecoder(); } @Override diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectDecoder.java index 1d284642d..f8e49c67f 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectDecoder.java @@ -41,7 +41,6 @@ public class ObjectDecoder implements MultiDecoder { @Override public Object decode(List parts, State state) { - parts.clear(); return parts; } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectListDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectListDecoder.java index 2f4e77da1..15f11e1ab 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectListDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectListDecoder.java @@ -27,6 +27,7 @@ import org.redisson.client.protocol.Decoder; * * @param type */ +@Deprecated public class ObjectListDecoder implements MultiDecoder> { private Codec codec; @@ -41,6 +42,7 @@ public class ObjectListDecoder implements MultiDecoder> { return codec.getMapKeyDecoder(); } + @SuppressWarnings("unchecked") @Override public List decode(List parts, State state) { return (List) parts; diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapDecoder.java index 23618d8c8..979b55e77 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapDecoder.java @@ -28,19 +28,26 @@ import org.redisson.client.protocol.Decoder; * @author Nikita Koksharov * */ -public class ObjectMapDecoder implements MultiDecoder> { +public class ObjectMapDecoder implements MultiDecoder { - private Codec codec; + private final Codec codec; + private final boolean decodeList; - public ObjectMapDecoder(Codec codec) { + public ObjectMapDecoder(Codec codec, boolean decodeList) { super(); this.codec = codec; + this.decodeList = decodeList; } private int pos; + private boolean mapDecoded; @Override public Decoder getDecoder(int paramNum, State state) { + if (mapDecoded) { + return codec.getMapKeyDecoder(); + } + if (pos++ % 2 == 0) { return codec.getMapKeyDecoder(); } @@ -48,13 +55,19 @@ public class ObjectMapDecoder implements MultiDecoder> { } @Override - public Map decode(List parts, State state) { + public Object decode(List parts, State state) { + if (decodeList && mapDecoded) { + return parts; + } + Map result = new LinkedHashMap(parts.size()/2); for (int i = 0; i < parts.size(); i++) { if (i % 2 != 0) { result.put(parts.get(i-1), parts.get(i)); } } + + mapDecoded = true; return result; } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapJoinDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapJoinDecoder.java index 25edd7209..c18d7da6e 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapJoinDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapJoinDecoder.java @@ -27,6 +27,7 @@ import org.redisson.client.protocol.Decoder; * @author Nikita Koksharov * */ +@Deprecated public class ObjectMapJoinDecoder implements MultiDecoder> { @Override diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder2.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder2.java new file mode 100644 index 000000000..3019c1ccc --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ObjectMapReplayDecoder2.java @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2013-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +/** + * + * @author Nikita Koksharov + * + */ +public class ObjectMapReplayDecoder2 implements MultiDecoder> { + + @Override + public Map decode(List parts, State state) { + List> list = (List>) (Object) parts; + Map result = new LinkedHashMap(parts.size()/2); + for (List entry : list) { + result.put(entry.get(0), entry.get(1)); + } + return result; + } + + @Override + public Decoder getDecoder(int paramNum, State state) { + return null; + } + +} diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingResultDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingResultDecoder.java index cbb5f9e14..6ba25208a 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingResultDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/PendingResultDecoder.java @@ -46,7 +46,7 @@ public class PendingResultDecoder implements MultiDecoder { } List> customerParts = (List>) parts.get(3); - if (customerParts == null) { + if (customerParts.isEmpty()) { return new PendingResult(0, null, null, Collections.emptyMap()); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetScanReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetScanReplayDecoder.java index e579cc90a..1c4056c3f 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetScanReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ScoredSortedSetScanReplayDecoder.java @@ -17,6 +17,7 @@ package org.redisson.client.protocol.decoder; import java.util.List; +import org.redisson.client.codec.LongCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; @@ -29,7 +30,7 @@ public class ScoredSortedSetScanReplayDecoder implements MultiDecoder getDecoder(int paramNum, State state) { - return null; + return LongCodec.INSTANCE.getValueDecoder(); } @Override diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamInfoMapDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamInfoMapDecoder.java index 1d3d449b3..30e87a8be 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamInfoMapDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamInfoMapDecoder.java @@ -26,6 +26,7 @@ import org.redisson.client.protocol.Decoder; * @author Nikita Koksharov * */ +@Deprecated public class StreamInfoMapDecoder implements MultiDecoder { boolean hasNonZeroLevel = false; @@ -33,7 +34,7 @@ public class StreamInfoMapDecoder implements MultiDecoder { final ObjectMapDecoder decoder; public StreamInfoMapDecoder(Codec codec) { - decoder = new ObjectMapDecoder(codec); + decoder = new ObjectMapDecoder(codec, false); } @Override diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java index 1218aa7ce..cdace2033 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/StreamResultDecoder.java @@ -15,7 +15,7 @@ */ package org.redisson.client.protocol.decoder; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,13 +30,34 @@ import org.redisson.client.protocol.Decoder; */ public class StreamResultDecoder implements MultiDecoder { + private final boolean firstResult; + + public StreamResultDecoder(boolean firstResult) { + super(); + this.firstResult = firstResult; + } + @Override public Object decode(List parts, State state) { - if (!parts.isEmpty()) { - Map>> result = (Map>>) parts.get(0); - return result.values().iterator().next(); + List> list = (List>) (Object) parts; + Map>> result = new HashMap<>(); + for (List entries : list) { + List> streamEntries = (List>) entries.get(1); + if (!streamEntries.isEmpty()) { + String name = (String) entries.get(0); + Map> ee = new HashMap<>(); + result.put(name, ee); + + for (List se : streamEntries) { + ee.put((StreamMessageId) se.get(0), (Map) se.get(1)); + } + + if (firstResult) { + return ee; + } + } } - return Collections.emptyMap(); + return result; } @Override diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 81c6cd0c8..ab1008813 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -101,38 +101,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { this.sentinelResolver = resolverGroup.getResolver(getGroup().next()); - boolean connected = false; - - for (String address : cfg.getSentinelAddresses()) { - RedisURI addr = new RedisURI(address); - RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getTimeout(), null); - try { - RedisConnection c = client.connect(); - connected = true; - try { - c.sync(RedisCommands.PING); - scheme = addr.getScheme(); - } catch (RedisAuthRequiredException e) { - usePassword = true; - } - break; - } catch (RedisConnectionException e) { - log.warn("Can't connect to sentinel server. {}", e.getMessage()); - } catch (Exception e) { - // skip - } finally { - client.shutdown(); - } - } - - if (!connected) { - stopThreads(); - StringBuilder list = new StringBuilder(); - for (String address : cfg.getSentinelAddresses()) { - list.append(address).append(", "); - } - throw new RedisConnectionException("Unable to connect to Redis sentinel servers: " + list); - } + checkAuth(cfg); for (String address : cfg.getSentinelAddresses()) { RedisURI addr = new RedisURI(address); @@ -231,6 +200,41 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { scheduleChangeCheck(cfg, null); } + + private void checkAuth(SentinelServersConfig cfg) { + boolean connected = false; + + for (String address : cfg.getSentinelAddresses()) { + RedisURI addr = new RedisURI(address); + RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getTimeout(), null); + try { + RedisConnection c = client.connect(); + connected = true; + try { + c.sync(RedisCommands.PING); + scheme = addr.getScheme(); + } catch (RedisAuthRequiredException e) { + usePassword = true; + } + break; + } catch (RedisConnectionException e) { + log.warn("Can't connect to sentinel server. {}", e.getMessage()); + } catch (Exception e) { + // skip + } finally { + client.shutdown(); + } + } + + if (!connected) { + stopThreads(); + StringBuilder list = new StringBuilder(); + for (String address : cfg.getSentinelAddresses()) { + list.append(address).append(", "); + } + throw new RedisConnectionException("Unable to connect to Redis sentinel servers: " + list); + } + } @Override protected void startDNSMonitoring(RedisClient masterHost) { diff --git a/redisson/src/test/java/org/redisson/RedissonGeoTest.java b/redisson/src/test/java/org/redisson/RedissonGeoTest.java index abc58558d..cfaf8c19e 100644 --- a/redisson/src/test/java/org/redisson/RedissonGeoTest.java +++ b/redisson/src/test/java/org/redisson/RedissonGeoTest.java @@ -505,6 +505,13 @@ public class RedissonGeoTest extends BaseTest { Map descExpected = new LinkedHashMap(); descExpected.put("Catania", new GeoPosition(15.087267458438873, 37.50266842333162)); assertThat(geo.radiusWithPosition("Palermo", 200, GeoUnit.KILOMETERS, GeoOrder.DESC, 1).entrySet()).containsExactlyElementsOf(descExpected.entrySet()); + + RGeo geo2 = redisson.getGeo("test2"); + geo2.add(new GeoEntry(13.361389, 38.115556, "Palermo"), new GeoEntry(13.361390, 38.115557, "Catania")); + Map ascExpected2 = new LinkedHashMap(); + ascExpected2.put("Catania", new GeoPosition(13.361389338970184, 38.115556395496299)); + ascExpected2.put("Palermo", new GeoPosition(13.361389338970184, 38.115556395496299)); + assertThat(geo2.radiusWithPosition("Palermo", 200, GeoUnit.KILOMETERS, GeoOrder.DESC, 2).entrySet()).containsExactlyElementsOf(ascExpected2.entrySet()); } @Test diff --git a/redisson/src/test/java/org/redisson/RedissonStreamTest.java b/redisson/src/test/java/org/redisson/RedissonStreamTest.java index b6b72b3fb..07e6cd834 100644 --- a/redisson/src/test/java/org/redisson/RedissonStreamTest.java +++ b/redisson/src/test/java/org/redisson/RedissonStreamTest.java @@ -131,25 +131,25 @@ public class RedissonStreamTest extends BaseTest { @Test public void testClaimIds() throws InterruptedException { - RStream stream = redisson.getStream("test"); + RStream stream = redisson.getStream("test3"); stream.add("0", "0"); - stream.createGroup("testGroup"); + stream.createGroup("testGroup3"); StreamMessageId id1 = stream.add("1", "1"); StreamMessageId id2 = stream.add("2", "2"); - Map> s = stream.readGroup("testGroup", "consumer1"); + Map> s = stream.readGroup("testGroup3", "consumer1"); assertThat(s.size()).isEqualTo(2); StreamMessageId id3 = stream.add("3", "33"); StreamMessageId id4 = stream.add("4", "44"); - Map> s2 = stream.readGroup("testGroup", "consumer2"); + Map> s2 = stream.readGroup("testGroup3", "consumer2"); assertThat(s2.size()).isEqualTo(2); - List res = stream.fastClaim("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4); + List res = stream.fastClaim("testGroup3", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4); assertThat(res.size()).isEqualTo(2); assertThat(res).containsExactly(id3, id4); }