refactoring

pull/2300/head
Nikita Koksharov 6 years ago
parent 7ccc1335d3
commit dec65f7cc6

@ -35,12 +35,11 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.CodecDecoder; import org.redisson.client.protocol.decoder.CodecDecoder;
import org.redisson.client.protocol.decoder.GeoDistanceDecoder; import org.redisson.client.protocol.decoder.GeoDistanceDecoder;
import org.redisson.client.protocol.decoder.GeoMapReplayDecoder;
import org.redisson.client.protocol.decoder.GeoPositionDecoder; import org.redisson.client.protocol.decoder.GeoPositionDecoder;
import org.redisson.client.protocol.decoder.GeoPositionMapDecoder; import org.redisson.client.protocol.decoder.GeoPositionMapDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder2;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.connection.decoder.MapGetAllDecoder;
@ -53,10 +52,15 @@ import org.redisson.connection.decoder.MapGetAllDecoder;
*/ */
public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V> { public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V> {
private static final MultiDecoder<Map<Object, Object>> POSTITION_DECODER = new ListMultiDecoder(new CodecDecoder(), private static final MultiDecoder<Map<Object, Object>> POSTITION_DECODER = new ListMultiDecoder2(
new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoMapReplayDecoder()); new ObjectMapReplayDecoder2(),
private static final MultiDecoder<Map<Object, Object>> DISTANCE_DECODER = new ListMultiDecoder( new CodecDecoder(),
new GeoDistanceDecoder(), new GeoMapReplayDecoder()); new GeoPositionDecoder());
private static final MultiDecoder<Map<Object, Object>> DISTANCE_DECODER = new ListMultiDecoder2(
new ObjectMapReplayDecoder2(),
new GeoDistanceDecoder());
private static final RedisCommand<Map<Object, Object>> GEORADIUS_RO_DISTANCE = new RedisCommand<Map<Object, Object>>( private static final RedisCommand<Map<Object, Object>> GEORADIUS_RO_DISTANCE = new RedisCommand<Map<Object, Object>>(
"GEORADIUS_RO", DISTANCE_DECODER); "GEORADIUS_RO", DISTANCE_DECODER);
private static final RedisCommand<Map<Object, Object>> GEORADIUS_RO_POS = new RedisCommand<Map<Object, Object>>( private static final RedisCommand<Map<Object, Object>> GEORADIUS_RO_POS = new RedisCommand<Map<Object, Object>>(
@ -147,9 +151,9 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
params.add(encode(member)); params.add(encode(member));
} }
MultiDecoder<Map<Object, Object>> decoder = new ListMultiDecoder(0, new GeoPositionDecoder(), MultiDecoder<Map<Object, Object>> decoder = new ListMultiDecoder2(
// new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoPositionMapDecoder((List<Object>) Arrays.asList(members)),
new GeoPositionMapDecoder((List<Object>) Arrays.asList(members))); new GeoPositionDecoder());
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOPOS", decoder); RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOPOS", decoder);
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, command, params.toArray()); return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, command, params.toArray());
} }

@ -45,12 +45,10 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.MapCacheScanResult; import org.redisson.client.protocol.decoder.MapCacheScanResult;
import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ObjectListDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder; import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.codec.MapCacheEventCodec; import org.redisson.codec.MapCacheEventCodec;
import org.redisson.codec.MapCacheEventCodec.OSType; import org.redisson.codec.MapCacheEventCodec.OSType;
@ -1231,7 +1229,9 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
params.add(count); params.add(count);
RedisCommand<MapCacheScanResult<Object, Object>> command = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL", RedisCommand<MapCacheScanResult<Object, Object>> command = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(codec), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); new ListMultiDecoder2(
new MapCacheScanResultReplayDecoder(),
new ObjectMapDecoder(codec, true)), ValueType.MAP);
RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, command, RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, command,
"local result = {}; " "local result = {}; "
+ "local idleKeys = {}; " + "local idleKeys = {}; "

@ -38,10 +38,9 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
@ -55,7 +54,7 @@ import org.redisson.command.CommandAsyncExecutor;
public class RedissonSetMultimapValues<V> extends RedissonExpirable implements RSet<V> { public class RedissonSetMultimapValues<V> extends RedissonExpirable implements RSet<V> {
private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE); new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder()), ValueType.MAP_VALUE);
private final RSet<V> set; private final RSet<V> set;
private final Object key; private final Object key;

@ -35,10 +35,10 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.CodecDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.StreamInfoDecoder; import org.redisson.client.protocol.decoder.StreamInfoDecoder;
import org.redisson.client.protocol.decoder.StreamInfoMapDecoder;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
/** /**
@ -50,18 +50,12 @@ import org.redisson.command.CommandAsyncExecutor;
*/ */
public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K, V> { public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K, V> {
private final RedisCommand<StreamInfo<Object, Object>> xinfoStreamCommand;
public RedissonStream(Codec codec, CommandAsyncExecutor connectionManager, String name) { public RedissonStream(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name); super(codec, connectionManager, name);
xinfoStreamCommand = new RedisCommand<StreamInfo<Object, Object>>("XINFO", "STREAM",
new ListMultiDecoder(new StreamInfoMapDecoder(getCodec()), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new StreamInfoDecoder()));
} }
public RedissonStream(CommandAsyncExecutor connectionManager, String name) { public RedissonStream(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name); super(connectionManager, name);
xinfoStreamCommand = new RedisCommand<StreamInfo<Object, Object>>("XINFO", "STREAM",
new ListMultiDecoder(new StreamInfoMapDecoder(getCodec()), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new StreamInfoDecoder()));
} }
protected void checkKey(Object key) { protected void checkKey(Object key) {
@ -966,6 +960,12 @@ public class RedissonStream<K, V> extends RedissonExpirable implements RStream<K
@Override @Override
public RFuture<StreamInfo<K, V>> getInfoAsync() { public RFuture<StreamInfo<K, V>> getInfoAsync() {
RedisCommand<StreamInfo<Object, Object>> xinfoStreamCommand = new RedisCommand<>("XINFO", "STREAM",
new ListMultiDecoder2(
new StreamInfoDecoder(),
new CodecDecoder(),
new ObjectMapDecoder(getCodec(), false)));
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, xinfoStreamCommand, getName()); return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, xinfoStreamCommand, getName());
} }

@ -46,21 +46,20 @@ import org.redisson.client.protocol.convertor.TypeConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.ClusterNodesDecoder;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ListObjectDecoder; import org.redisson.client.protocol.decoder.ListObjectDecoder;
import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListResultReplayDecoder;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.Long2MultiDecoder; import org.redisson.client.protocol.decoder.Long2MultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectDecoder; import org.redisson.client.protocol.decoder.ObjectDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder; import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapJoinDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder2;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.PendingEntryDecoder; import org.redisson.client.protocol.decoder.PendingEntryDecoder;
import org.redisson.client.protocol.decoder.PendingResultDecoder; import org.redisson.client.protocol.decoder.PendingResultDecoder;
@ -154,10 +153,10 @@ public interface RedisCommands {
RedisCommand<List<ScoredEntry<Object>>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>()); RedisCommand<List<ScoredEntry<Object>>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>()); RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>()); RedisCommand<List<ScoredEntry<Object>>> ZRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ScoredSortedSetScanDecoder<Object>(), new ScoredSortedSetScanReplayDecoder())); RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new ListMultiDecoder2(new ScoredSortedSetScanReplayDecoder(), new ScoredSortedSetScanDecoder<Object>()));
RedisStrictCommand<Double> ZINCRBY = new RedisStrictCommand<Double>("ZINCRBY", new DoubleNullSafeReplayConvertor()); RedisStrictCommand<Double> ZINCRBY = new RedisStrictCommand<Double>("ZINCRBY", new DoubleNullSafeReplayConvertor());
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder())); RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<String>()));
RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY"); RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY");
RedisCommand<String> PING = new RedisCommand<String>("PING"); RedisCommand<String> PING = new RedisCommand<String>("PING");
RedisStrictCommand<Boolean> PING_BOOL = new RedisStrictCommand<Boolean>("PING", new BooleanNotNullReplayConvertor()); RedisStrictCommand<Boolean> PING_BOOL = new RedisStrictCommand<Boolean>("PING", new BooleanNotNullReplayConvertor());
@ -179,9 +178,9 @@ public interface RedisCommands {
RedisCommand<Set<Object>> SMEMBERS = new RedisCommand<Set<Object>>("SMEMBERS", new ObjectSetReplayDecoder<Object>()); RedisCommand<Set<Object>> SMEMBERS = new RedisCommand<Set<Object>>("SMEMBERS", new ObjectSetReplayDecoder<Object>());
RedisCommand<Set<Object>> SRANDMEMBER = new RedisCommand<Set<Object>>("SRANDMEMBER", new ObjectSetReplayDecoder<Object>()); RedisCommand<Set<Object>> SRANDMEMBER = new RedisCommand<Set<Object>>("SRANDMEMBER", new ObjectSetReplayDecoder<Object>());
RedisCommand<Object> SRANDMEMBER_SINGLE = new RedisCommand<Object>("SRANDMEMBER"); RedisCommand<Object> SRANDMEMBER_SINGLE = new RedisCommand<Object>("SRANDMEMBER");
RedisCommand<ListScanResult<Object>> SSCAN = new RedisCommand<ListScanResult<Object>>("SSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder())); RedisCommand<ListScanResult<Object>> SSCAN = new RedisCommand<ListScanResult<Object>>("SSCAN", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder()));
RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder())); RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<Object>()));
RedisCommand<ListScanResult<Object>> EVAL_ZSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder())); RedisCommand<ListScanResult<Object>> EVAL_ZSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<Object>()));
RedisCommand<Boolean> SISMEMBER = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor()); RedisCommand<Boolean> SISMEMBER = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor());
RedisStrictCommand<Integer> SCARD_INT = new RedisStrictCommand<Integer>("SCARD", new IntegerReplayConvertor()); RedisStrictCommand<Integer> SCARD_INT = new RedisStrictCommand<Integer>("SCARD", new IntegerReplayConvertor());
RedisStrictCommand<Long> SCARD = new RedisStrictCommand<Long>("SCARD"); RedisStrictCommand<Long> SCARD = new RedisStrictCommand<Long>("SCARD");
@ -295,7 +294,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> HSET = new RedisStrictCommand<Boolean>("HSET", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> HSET = new RedisStrictCommand<Boolean>("HSET", new BooleanReplayConvertor());
RedisStrictCommand<Void> HSET_VOID = new RedisStrictCommand<Void>("HSET", new VoidReplayConvertor()); RedisStrictCommand<Void> HSET_VOID = new RedisStrictCommand<Void>("HSET", new VoidReplayConvertor());
RedisCommand<MapScanResult<Object, Object>> HSCAN = new RedisCommand<MapScanResult<Object, Object>>("HSCAN", RedisCommand<MapScanResult<Object, Object>> HSCAN = new RedisCommand<MapScanResult<Object, Object>>("HSCAN",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); new ListMultiDecoder2(new MapScanResultReplayDecoder(), new ObjectMapReplayDecoder()), ValueType.MAP);
RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP); RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP);
RedisCommand<Set<Entry<Object, Object>>> HGETALL_ENTRY = new RedisCommand<Set<Entry<Object, Object>>>("HGETALL", new ObjectMapEntryReplayDecoder(), ValueType.MAP); RedisCommand<Set<Entry<Object, Object>>> HGETALL_ENTRY = new RedisCommand<Set<Entry<Object, Object>>>("HGETALL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder<Object>(), ValueType.MAP_VALUE); RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder<Object>(), ValueType.MAP_VALUE);
@ -331,77 +330,67 @@ public interface RedisCommands {
RedisCommand<Void> PSETEX = new RedisCommand<Void>("PSETEX", new VoidReplayConvertor()); RedisCommand<Void> PSETEX = new RedisCommand<Void>("PSETEX", new VoidReplayConvertor());
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XRANGE", RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XRANGE = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XRANGE",
new ListMultiDecoder( new ListMultiDecoder2(
new ObjectMapReplayDecoder2(),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder()), ValueType.MAP);
new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder()), ValueType.MAP);
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREVRANGE = RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREVRANGE =
new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREVRANGE", XRANGE.getReplayMultiDecoder(), ValueType.MAP); new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREVRANGE", XRANGE.getReplayMultiDecoder(), ValueType.MAP);
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD", RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD",
new ListMultiDecoder( new ListMultiDecoder2(
new StreamResultDecoder(false),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new StreamObjectMapReplayDecoder()), ValueType.MAP);
new ObjectMapJoinDecoder(),
new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()), ValueType.MAP);
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD", XREAD.getReplayMultiDecoder()); RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREAD_BLOCKING = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREAD", XREAD.getReplayMultiDecoder());
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD", RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD",
new ListMultiDecoder( new ListMultiDecoder2(
new StreamResultDecoder(true),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new StreamObjectMapReplayDecoder()), ValueType.MAP);
new ObjectMapJoinDecoder(),
new StreamObjectMapReplayDecoder(),
new StreamResultDecoder()), ValueType.MAP);
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE = RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREAD_BLOCKING_SINGLE =
new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder(), ValueType.MAP); new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREAD", XREAD_SINGLE.getReplayMultiDecoder(), ValueType.MAP);
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREADGROUP", RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP = new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREADGROUP",
new ListMultiDecoder( new ListMultiDecoder2(
new StreamResultDecoder(false),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new StreamObjectMapReplayDecoder()), ValueType.MAP);
new ObjectMapJoinDecoder(),
new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_INDEX),
new StreamResultDecoder()), ValueType.MAP);
RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP_BLOCKING = RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>> XREADGROUP_BLOCKING =
new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREADGROUP", XREADGROUP.getReplayMultiDecoder(), ValueType.MAP); new RedisCommand<Map<String, Map<StreamMessageId, Map<Object, Object>>>>("XREADGROUP", XREADGROUP.getReplayMultiDecoder(), ValueType.MAP);
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP", RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
new ListMultiDecoder( new ListMultiDecoder2(
new StreamResultDecoder(true),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()), new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET_1), new StreamObjectMapReplayDecoder()), ValueType.MAP);
new ObjectMapJoinDecoder(),
new StreamObjectMapReplayDecoder(),
new StreamResultDecoder()), ValueType.MAP);
RedisCommand<StreamInfo<Object, Object>> XINFO_GROUPS = new RedisCommand<StreamInfo<Object, Object>>("XINFO", "GROUPS", RedisCommand<StreamInfo<Object, Object>> XINFO_GROUPS = new RedisCommand<StreamInfo<Object, Object>>("XINFO", "GROUPS",
new ListMultiDecoder(0, new StreamGroupInfoDecoder(), new ObjectListReplayDecoder())); new ListMultiDecoder2(new ObjectListReplayDecoder(), new StreamGroupInfoDecoder()));
RedisCommand<StreamInfo<Object, Object>> XINFO_CONSUMERS = new RedisCommand<StreamInfo<Object, Object>>("XINFO", "CONSUMERS", RedisCommand<StreamInfo<Object, Object>> XINFO_CONSUMERS = new RedisCommand<>("XINFO", "CONSUMERS",
new ListMultiDecoder(0, new StreamConsumerInfoDecoder(), new ObjectListReplayDecoder())); new ListMultiDecoder2(new ObjectListReplayDecoder(), new StreamConsumerInfoDecoder()));
RedisCommand<List<StreamMessageId>> XCLAIM_IDS = new RedisCommand<List<StreamMessageId>>("XCLAIM", new StreamIdListDecoder()); RedisCommand<List<StreamMessageId>> XCLAIM_IDS = new RedisCommand<List<StreamMessageId>>("XCLAIM", new StreamIdListDecoder());
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XCLAIM = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XCLAIM", RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XCLAIM = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XCLAIM",
new ListMultiDecoder( new ListMultiDecoder2(
new ObjectMapReplayDecoder2(),
new ObjectDecoder(new StreamIdDecoder()), new ObjectDecoder(new StreamIdDecoder()),
new StreamObjectMapReplayDecoder(), new StreamObjectMapReplayDecoder()), ValueType.MAP);
new StreamObjectMapReplayDecoder(ListMultiDecoder.RESET),
new ObjectMapJoinDecoder()), ValueType.MAP);
RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP", RedisCommand<Map<StreamMessageId, Map<Object, Object>>> XREADGROUP_BLOCKING_SINGLE = new RedisCommand<Map<StreamMessageId, Map<Object, Object>>>("XREADGROUP",
XREADGROUP_SINGLE.getReplayMultiDecoder()); XREADGROUP_SINGLE.getReplayMultiDecoder());
@ -418,7 +407,7 @@ public interface RedisCommands {
RedisStrictCommand<Long> XDEL = new RedisStrictCommand<Long>("XDEL"); RedisStrictCommand<Long> XDEL = new RedisStrictCommand<Long>("XDEL");
RedisStrictCommand<Long> XTRIM = new RedisStrictCommand<Long>("XTRIM"); RedisStrictCommand<Long> XTRIM = new RedisStrictCommand<Long>("XTRIM");
RedisCommand<Object> XPENDING = new RedisCommand<Object>("XPENDING", RedisCommand<Object> XPENDING = new RedisCommand<Object>("XPENDING",
new ListMultiDecoder(0, new ObjectListReplayDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new PendingResultDecoder())); new ListMultiDecoder2(new PendingResultDecoder(), new ObjectListReplayDecoder(), new ObjectListReplayDecoder()));
RedisCommand<Object> XPENDING_ENTRIES = new RedisCommand<Object>("XPENDING", RedisCommand<Object> XPENDING_ENTRIES = new RedisCommand<Object>("XPENDING",
new PendingEntryDecoder()); new PendingEntryDecoder());
@ -458,11 +447,11 @@ public interface RedisCommands {
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder()); RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisCommand<List<Map<String, String>>> SENTINEL_MASTERS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "MASTERS", RedisCommand<List<Map<String, String>>> SENTINEL_MASTERS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "MASTERS",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder())); new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder()));
RedisCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SLAVES", RedisCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SLAVES",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder())); new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder()));
RedisCommand<List<Map<String, String>>> SENTINEL_SENTINELS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SENTINELS", RedisCommand<List<Map<String, String>>> SENTINEL_SENTINELS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SENTINELS",
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder())); new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder()));
RedisStrictCommand<Void> CLUSTER_ADDSLOTS = new RedisStrictCommand<Void>("CLUSTER", "ADDSLOTS"); RedisStrictCommand<Void> CLUSTER_ADDSLOTS = new RedisStrictCommand<Void>("CLUSTER", "ADDSLOTS");
RedisStrictCommand<Void> CLUSTER_REPLICATE = new RedisStrictCommand<Void>("CLUSTER", "REPLICATE"); RedisStrictCommand<Void> CLUSTER_REPLICATE = new RedisStrictCommand<Void>("CLUSTER", "REPLICATE");

@ -34,7 +34,7 @@ public class CodecDecoder implements MultiDecoder<Object> {
@Override @Override
public Object decode(List<Object> parts, State state) { public Object decode(List<Object> parts, State state) {
return null; return parts;
} }
} }

@ -27,6 +27,7 @@ import org.redisson.client.protocol.Decoder;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
@Deprecated
public class GeoMapReplayDecoder implements MultiDecoder<Map<Object, Object>> { public class GeoMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
@Override @Override

@ -29,6 +29,7 @@ import io.netty.buffer.ByteBuf;
* *
* @param <T> type * @param <T> type
*/ */
@Deprecated
public class ListMultiDecoder<T> implements MultiDecoder<Object> { public class ListMultiDecoder<T> implements MultiDecoder<Object> {
public static final Decoder<Object> RESET = new Decoder<Object>() { public static final Decoder<Object> RESET = new Decoder<Object>() {

@ -0,0 +1,49 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
/**
*
* @author Nikita Koksharov
*
* @param <T> type
*/
public class ListMultiDecoder2<T> implements MultiDecoder<Object> {
private final MultiDecoder<?>[] decoders;
public ListMultiDecoder2(MultiDecoder<?>... decoders) {
this.decoders = decoders;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
int index = state.getLevel();
return decoders[index].getDecoder(paramNum, state);
}
@Override
public Object decode(List<Object> parts, State state) {
int index = state.getLevel();
return decoders[index].decode(parts, state);
}
}

@ -17,6 +17,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List; import java.util.List;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State; import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
@ -29,7 +30,7 @@ public class ListScanResultReplayDecoder implements MultiDecoder<ListScanResult<
@Override @Override
public Decoder<Object> getDecoder(int paramNum, State state) { public Decoder<Object> getDecoder(int paramNum, State state) {
return null; return LongCodec.INSTANCE.getValueDecoder();
} }
@Override @Override

@ -18,6 +18,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State; import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
@ -38,7 +39,7 @@ public class MapCacheScanResultReplayDecoder implements MultiDecoder<MapCacheSca
@Override @Override
public Decoder<Object> getDecoder(int paramNum, State state) { public Decoder<Object> getDecoder(int paramNum, State state) {
return null; return LongCodec.INSTANCE.getValueDecoder();
} }
} }

@ -18,6 +18,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State; import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
@ -30,7 +31,7 @@ public class MapScanResultReplayDecoder implements MultiDecoder<MapScanResult<Ob
@Override @Override
public Decoder<Object> getDecoder(int paramNum, State state) { public Decoder<Object> getDecoder(int paramNum, State state) {
return null; return LongCodec.INSTANCE.getValueDecoder();
} }
@Override @Override

@ -41,7 +41,6 @@ public class ObjectDecoder implements MultiDecoder<Object> {
@Override @Override
public Object decode(List<Object> parts, State state) { public Object decode(List<Object> parts, State state) {
parts.clear();
return parts; return parts;
} }

@ -27,6 +27,7 @@ import org.redisson.client.protocol.Decoder;
* *
* @param <T> type * @param <T> type
*/ */
@Deprecated
public class ObjectListDecoder<T> implements MultiDecoder<List<T>> { public class ObjectListDecoder<T> implements MultiDecoder<List<T>> {
private Codec codec; private Codec codec;
@ -41,6 +42,7 @@ public class ObjectListDecoder<T> implements MultiDecoder<List<T>> {
return codec.getMapKeyDecoder(); return codec.getMapKeyDecoder();
} }
@SuppressWarnings("unchecked")
@Override @Override
public List<T> decode(List<Object> parts, State state) { public List<T> decode(List<Object> parts, State state) {
return (List<T>) parts; return (List<T>) parts;

@ -28,19 +28,26 @@ import org.redisson.client.protocol.Decoder;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class ObjectMapDecoder implements MultiDecoder<Map<Object, Object>> { public class ObjectMapDecoder implements MultiDecoder<Object> {
private Codec codec; private final Codec codec;
private final boolean decodeList;
public ObjectMapDecoder(Codec codec) { public ObjectMapDecoder(Codec codec, boolean decodeList) {
super(); super();
this.codec = codec; this.codec = codec;
this.decodeList = decodeList;
} }
private int pos; private int pos;
private boolean mapDecoded;
@Override @Override
public Decoder<Object> getDecoder(int paramNum, State state) { public Decoder<Object> getDecoder(int paramNum, State state) {
if (mapDecoded) {
return codec.getMapKeyDecoder();
}
if (pos++ % 2 == 0) { if (pos++ % 2 == 0) {
return codec.getMapKeyDecoder(); return codec.getMapKeyDecoder();
} }
@ -48,13 +55,19 @@ public class ObjectMapDecoder implements MultiDecoder<Map<Object, Object>> {
} }
@Override @Override
public Map<Object, Object> decode(List<Object> parts, State state) { public Object decode(List<Object> parts, State state) {
if (decodeList && mapDecoded) {
return parts;
}
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2); Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
for (int i = 0; i < parts.size(); i++) { for (int i = 0; i < parts.size(); i++) {
if (i % 2 != 0) { if (i % 2 != 0) {
result.put(parts.get(i-1), parts.get(i)); result.put(parts.get(i-1), parts.get(i));
} }
} }
mapDecoded = true;
return result; return result;
} }

@ -27,6 +27,7 @@ import org.redisson.client.protocol.Decoder;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
@Deprecated
public class ObjectMapJoinDecoder implements MultiDecoder<Map<Object, Object>> { public class ObjectMapJoinDecoder implements MultiDecoder<Map<Object, Object>> {
@Override @Override

@ -0,0 +1,47 @@
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
/**
*
* @author Nikita Koksharov
*
*/
public class ObjectMapReplayDecoder2 implements MultiDecoder<Map<Object, Object>> {
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
List<List<Object>> list = (List<List<Object>>) (Object) parts;
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
for (List<Object> entry : list) {
result.put(entry.get(0), entry.get(1));
}
return result;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -46,7 +46,7 @@ public class PendingResultDecoder implements MultiDecoder<Object> {
} }
List<List<String>> customerParts = (List<List<String>>) parts.get(3); List<List<String>> customerParts = (List<List<String>>) parts.get(3);
if (customerParts == null) { if (customerParts.isEmpty()) {
return new PendingResult(0, null, null, Collections.emptyMap()); return new PendingResult(0, null, null, Collections.emptyMap());
} }

@ -17,6 +17,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List; import java.util.List;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State; import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
@ -29,7 +30,7 @@ public class ScoredSortedSetScanReplayDecoder implements MultiDecoder<ListScanRe
@Override @Override
public Decoder<Object> getDecoder(int paramNum, State state) { public Decoder<Object> getDecoder(int paramNum, State state) {
return null; return LongCodec.INSTANCE.getValueDecoder();
} }
@Override @Override

@ -26,6 +26,7 @@ import org.redisson.client.protocol.Decoder;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
@Deprecated
public class StreamInfoMapDecoder implements MultiDecoder<Object> { public class StreamInfoMapDecoder implements MultiDecoder<Object> {
boolean hasNonZeroLevel = false; boolean hasNonZeroLevel = false;
@ -33,7 +34,7 @@ public class StreamInfoMapDecoder implements MultiDecoder<Object> {
final ObjectMapDecoder decoder; final ObjectMapDecoder decoder;
public StreamInfoMapDecoder(Codec codec) { public StreamInfoMapDecoder(Codec codec) {
decoder = new ObjectMapDecoder(codec); decoder = new ObjectMapDecoder(codec, false);
} }
@Override @Override

@ -15,7 +15,7 @@
*/ */
package org.redisson.client.protocol.decoder; package org.redisson.client.protocol.decoder;
import java.util.Collections; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -30,13 +30,34 @@ import org.redisson.client.protocol.Decoder;
*/ */
public class StreamResultDecoder implements MultiDecoder<Object> { public class StreamResultDecoder implements MultiDecoder<Object> {
private final boolean firstResult;
public StreamResultDecoder(boolean firstResult) {
super();
this.firstResult = firstResult;
}
@Override @Override
public Object decode(List<Object> parts, State state) { public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) { List<List<Object>> list = (List<List<Object>>) (Object) parts;
Map<String, Map<StreamMessageId, Map<Object, Object>>> result = (Map<String, Map<StreamMessageId, Map<Object, Object>>>) parts.get(0); Map<String, Map<StreamMessageId, Map<Object, Object>>> result = new HashMap<>();
return result.values().iterator().next(); for (List<Object> entries : list) {
List<List<Object>> streamEntries = (List<List<Object>>) entries.get(1);
if (!streamEntries.isEmpty()) {
String name = (String) entries.get(0);
Map<StreamMessageId, Map<Object, Object>> ee = new HashMap<>();
result.put(name, ee);
for (List<Object> se : streamEntries) {
ee.put((StreamMessageId) se.get(0), (Map<Object, Object>) se.get(1));
}
if (firstResult) {
return ee;
}
}
} }
return Collections.emptyMap(); return result;
} }
@Override @Override

@ -101,38 +101,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
this.sentinelResolver = resolverGroup.getResolver(getGroup().next()); this.sentinelResolver = resolverGroup.getResolver(getGroup().next());
boolean connected = false; checkAuth(cfg);
for (String address : cfg.getSentinelAddresses()) {
RedisURI addr = new RedisURI(address);
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getTimeout(), null);
try {
RedisConnection c = client.connect();
connected = true;
try {
c.sync(RedisCommands.PING);
scheme = addr.getScheme();
} catch (RedisAuthRequiredException e) {
usePassword = true;
}
break;
} catch (RedisConnectionException e) {
log.warn("Can't connect to sentinel server. {}", e.getMessage());
} catch (Exception e) {
// skip
} finally {
client.shutdown();
}
}
if (!connected) {
stopThreads();
StringBuilder list = new StringBuilder();
for (String address : cfg.getSentinelAddresses()) {
list.append(address).append(", ");
}
throw new RedisConnectionException("Unable to connect to Redis sentinel servers: " + list);
}
for (String address : cfg.getSentinelAddresses()) { for (String address : cfg.getSentinelAddresses()) {
RedisURI addr = new RedisURI(address); RedisURI addr = new RedisURI(address);
@ -231,6 +200,41 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
scheduleChangeCheck(cfg, null); scheduleChangeCheck(cfg, null);
} }
private void checkAuth(SentinelServersConfig cfg) {
boolean connected = false;
for (String address : cfg.getSentinelAddresses()) {
RedisURI addr = new RedisURI(address);
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getTimeout(), null);
try {
RedisConnection c = client.connect();
connected = true;
try {
c.sync(RedisCommands.PING);
scheme = addr.getScheme();
} catch (RedisAuthRequiredException e) {
usePassword = true;
}
break;
} catch (RedisConnectionException e) {
log.warn("Can't connect to sentinel server. {}", e.getMessage());
} catch (Exception e) {
// skip
} finally {
client.shutdown();
}
}
if (!connected) {
stopThreads();
StringBuilder list = new StringBuilder();
for (String address : cfg.getSentinelAddresses()) {
list.append(address).append(", ");
}
throw new RedisConnectionException("Unable to connect to Redis sentinel servers: " + list);
}
}
@Override @Override
protected void startDNSMonitoring(RedisClient masterHost) { protected void startDNSMonitoring(RedisClient masterHost) {

@ -505,6 +505,13 @@ public class RedissonGeoTest extends BaseTest {
Map<String, GeoPosition> descExpected = new LinkedHashMap<String, GeoPosition>(); Map<String, GeoPosition> descExpected = new LinkedHashMap<String, GeoPosition>();
descExpected.put("Catania", new GeoPosition(15.087267458438873, 37.50266842333162)); descExpected.put("Catania", new GeoPosition(15.087267458438873, 37.50266842333162));
assertThat(geo.radiusWithPosition("Palermo", 200, GeoUnit.KILOMETERS, GeoOrder.DESC, 1).entrySet()).containsExactlyElementsOf(descExpected.entrySet()); assertThat(geo.radiusWithPosition("Palermo", 200, GeoUnit.KILOMETERS, GeoOrder.DESC, 1).entrySet()).containsExactlyElementsOf(descExpected.entrySet());
RGeo<String> geo2 = redisson.getGeo("test2");
geo2.add(new GeoEntry(13.361389, 38.115556, "Palermo"), new GeoEntry(13.361390, 38.115557, "Catania"));
Map<String, GeoPosition> ascExpected2 = new LinkedHashMap<String, GeoPosition>();
ascExpected2.put("Catania", new GeoPosition(13.361389338970184, 38.115556395496299));
ascExpected2.put("Palermo", new GeoPosition(13.361389338970184, 38.115556395496299));
assertThat(geo2.radiusWithPosition("Palermo", 200, GeoUnit.KILOMETERS, GeoOrder.DESC, 2).entrySet()).containsExactlyElementsOf(ascExpected2.entrySet());
} }
@Test @Test

@ -131,25 +131,25 @@ public class RedissonStreamTest extends BaseTest {
@Test @Test
public void testClaimIds() throws InterruptedException { public void testClaimIds() throws InterruptedException {
RStream<String, String> stream = redisson.getStream("test"); RStream<String, String> stream = redisson.getStream("test3");
stream.add("0", "0"); stream.add("0", "0");
stream.createGroup("testGroup"); stream.createGroup("testGroup3");
StreamMessageId id1 = stream.add("1", "1"); StreamMessageId id1 = stream.add("1", "1");
StreamMessageId id2 = stream.add("2", "2"); StreamMessageId id2 = stream.add("2", "2");
Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup", "consumer1"); Map<StreamMessageId, Map<String, String>> s = stream.readGroup("testGroup3", "consumer1");
assertThat(s.size()).isEqualTo(2); assertThat(s.size()).isEqualTo(2);
StreamMessageId id3 = stream.add("3", "33"); StreamMessageId id3 = stream.add("3", "33");
StreamMessageId id4 = stream.add("4", "44"); StreamMessageId id4 = stream.add("4", "44");
Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup", "consumer2"); Map<StreamMessageId, Map<String, String>> s2 = stream.readGroup("testGroup3", "consumer2");
assertThat(s2.size()).isEqualTo(2); assertThat(s2.size()).isEqualTo(2);
List<StreamMessageId> res = stream.fastClaim("testGroup", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4); List<StreamMessageId> res = stream.fastClaim("testGroup3", "consumer1", 1, TimeUnit.MILLISECONDS, id3, id4);
assertThat(res.size()).isEqualTo(2); assertThat(res.size()).isEqualTo(2);
assertThat(res).containsExactly(id3, id4); assertThat(res).containsExactly(id3, id4);
} }

Loading…
Cancel
Save