Merge pull request #2982 from sulake/feature/multiple_members

Add multi member / batch methods to scored sorted set.
pull/2999/head
Nikita Koksharov 5 years ago committed by GitHub
commit 6feb33c30a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,20 +15,6 @@
*/
package org.redisson;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
@ -37,6 +23,7 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
@ -48,6 +35,20 @@ import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RedissonPromise;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
/**
*
* @author Nikita Koksharov
@ -204,6 +205,11 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return get(addAndGetRevRankAsync(score, object));
}
@Override
public List<Integer> addAndGetAllRevRank(Map<? extends V, Double> map) {
return get(addAndGetAllRevRankAsync(map));
}
@Override
public RFuture<Integer> addAndGetRevRankAsync(double score, V object) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
@ -212,6 +218,36 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
Collections.<Object>singletonList(getName()), new BigDecimal(score).toPlainString(), encode(object));
}
@Override
public RFuture<List<Integer>> addAndGetAllRevRankAsync(Map<? extends V, Double> map) {
final List<Object> params = new ArrayList<Object>(map.size() * 2);
for (java.util.Map.Entry<? extends V, Double> t : map.entrySet()) {
if (t.getKey() == null) {
throw new NullPointerException("map key can't be null");
}
if (t.getValue() == null) {
throw new NullPointerException("map value can't be null");
}
params.add(encode(t.getKey()));
params.add(BigDecimal.valueOf(t.getValue()).toPlainString());
}
return commandExecutor.evalReadAsync((String) null, IntegerCodec.INSTANCE, RedisCommands.EVAL_INT_LIST,
"local r = {} " +
"for i, v in ipairs(ARGV) do " +
"if i % 2 == 0 then " +
"redis.call('zadd', KEYS[1], ARGV[i], ARGV[i-1]); " +
"end; " +
"end;" +
"for i, v in ipairs(ARGV) do " +
"if i % 2 == 0 then " +
"r[#r+1] = redis.call('zrevrank', KEYS[1], ARGV[i-1]); " +
"end; " +
"end;" +
"return r;",
Collections.singletonList(getName()), params.toArray());
}
@Override
public boolean tryAdd(double score, V object) {
return get(tryAddAsync(score, object));
@ -372,11 +408,27 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return get(getScoreAsync(o));
}
@Override
public List<Double> getAllScore(List<V> keys) {
return get(getAllScoreAsync(keys));
}
@Override
public RFuture<Double> getScoreAsync(V o) {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZSCORE, getName(), encode(o));
}
@Override
public RFuture<List<Double>> getAllScoreAsync(Collection<V> elements) {
return commandExecutor.evalReadAsync((String) null, DoubleCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local r = {} " +
"for i, v in ipairs(ARGV) do " +
"r[#r+1] = redis.call('ZSCORE', KEYS[1], ARGV[i]); " +
"end;" +
"return r;",
Collections.singletonList(getName()), encode(elements).toArray());
}
@Override
public Integer rank(V o) {
return get(rankAsync(o));

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -262,6 +263,14 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
Double getScore(V o);
/**
* Returns scores of elements.
*
* @param elements - elements
* @return element scores
*/
List<Double> getAllScore(List<V> elements);
/**
* Adds element to this set, overrides previous score if it has been already added.
*
@ -289,6 +298,14 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
Integer addAndGetRevRank(double score, V object);
/**
* Adds elements to this set, overrides previous score if it has been already added.
* Finally returns reverse rank list of the items
* @param map - map of object and scores, make sure to use an ordered map
* @return collection of reverse ranks
*/
List<Integer> addAndGetAllRevRank(Map<? extends V, Double> map);
/**
* Adds element to this set only if has not been added before.
* <p>

@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -223,6 +224,14 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
*/
RFuture<Double> getScoreAsync(V o);
/**
* Returns scores of elements.
*
* @param elements - elements
* @return element scores
*/
RFuture<List<Double>> getAllScoreAsync(Collection<V> elements);
/**
* Adds element to this set, overrides previous score if it has been already added.
*
@ -250,6 +259,14 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
*/
RFuture<Integer> addAndGetRevRankAsync(double score, V object);
/**
* Adds elements to this set, overrides previous score if it has been already added.
* Finally returns reverse rank list of the items
* @param map - map of object and scores, make sure to use an ordered map
* @return collection of reverse ranks
*/
RFuture<List<Integer>> addAndGetAllRevRankAsync(Map<? extends V, Double> map);
/**
* Adds element to this set only if has not been added before.
* <p>

@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -235,6 +236,14 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
*/
Mono<Double> getScore(V o);
/**
* Returns scores of elements.
*
* @param elements - elements
* @return element scores
*/
Mono<List<Double>> getAllScore(Collection<V> elements);
/**
* Adds element to this set, overrides previous score if it has been already added.
*
@ -273,6 +282,14 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
*/
Mono<Integer> addAndGetRevRank(double score, V object);
/**
* Adds elements to this set, overrides previous score if it has been already added.
* Finally returns reverse rank list of the items
* @param map - map of object and scores, make sure to use an ordered map
* @return collection of reverse ranks
*/
Mono<List<Integer>> addAndGetAllRevRank(Map<? extends V, Double> map);
/**
* Adds element to this set only if has not been added before.
* <p>

@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -236,6 +237,14 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
*/
Maybe<Double> getScore(V o);
/**
* Returns scores of elements.
*
* @param elements - elements
* @return element scores
*/
Single<List<Double>> getAllScore(Collection<V> elements);
/**
* Adds element to this set, overrides previous score if it has been already added.
*
@ -274,6 +283,14 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
*/
Single<Integer> addAndGetRevRank(double score, V object);
/**
* Adds elements to this set, overrides previous score if it has been already added.
* Finally returns reverse rank list of the items
* @param map - map of object and scores, make sure to use an ordered map
* @return collection of reverse ranks
*/
Single<List<Integer>> addAndGetAllRevRank(Map<? extends V, Double> map);
/**
* Adds element to this set only if has not been added before.
* <p>

@ -20,14 +20,71 @@ import org.redisson.api.StreamInfo;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.*;
import org.redisson.client.protocol.decoder.*;
import org.redisson.client.protocol.convertor.BitsSizeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNullSafeReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.ByteReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleNullSafeReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.ShortReplayConvertor;
import org.redisson.client.protocol.convertor.StreamIdConvertor;
import org.redisson.client.protocol.convertor.StringToListConvertor;
import org.redisson.client.protocol.convertor.TimeObjectDecoder;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
import org.redisson.client.protocol.convertor.TypeConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ClusterNodesDecoder;
import org.redisson.client.protocol.decoder.InetSocketAddressDecoder;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ListObjectDecoder;
import org.redisson.client.protocol.decoder.ListResultReplayDecoder;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.Long2MultiDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder2;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.PendingEntryDecoder;
import org.redisson.client.protocol.decoder.PendingResultDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetPolledObjectDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.client.protocol.decoder.StreamConsumerInfoDecoder;
import org.redisson.client.protocol.decoder.StreamGroupInfoDecoder;
import org.redisson.client.protocol.decoder.StreamIdDecoder;
import org.redisson.client.protocol.decoder.StreamIdListDecoder;
import org.redisson.client.protocol.decoder.StreamObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.StreamResultDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.client.protocol.decoder.StringReplayDecoder;
import org.redisson.client.protocol.decoder.TimeLongObjectDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.cluster.ClusterNodeInfo;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
*
@ -215,6 +272,7 @@ public interface RedisCommands {
RedisCommand<Object> EVAL_FIRST_LIST = new RedisCommand<Object>("EVAL", new ListFirstObjectDecoder());
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> EVAL_LIST_REVERSE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<>(true));
RedisCommand<List<Integer>> EVAL_INT_LIST = new RedisCommand("EVAL", new ObjectListReplayDecoder<Integer>(), new IntegerReplayConvertor());
RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>());
RedisCommand<Object> EVAL_OBJECT = new RedisCommand<Object>("EVAL");
RedisCommand<Object> EVAL_MAP_VALUE = new RedisCommand<Object>("EVAL", ValueType.MAP_VALUE);

@ -7,6 +7,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@ -1227,6 +1228,21 @@ public class RedissonScoredSortedSetTest extends BaseTest {
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
}
@Test
public void testAddAndGetAll() throws InterruptedException {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(100.2, "1");
Double res2 = set.addScore("1", new Double(12.1));
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
res2 = set.getScore("1");
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
Collection<Double> res = set.getAllScore(Arrays.asList("1", "42", "100"));
Assert.assertArrayEquals(new Double[] {112.3d, null, null},
res.toArray());
}
@Test
public void testAddScoreAndGetRank() throws InterruptedException {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
@ -1265,7 +1281,20 @@ public class RedissonScoredSortedSetTest extends BaseTest {
assertThat(score).isEqualTo(14);
}
@Test
public void testAddAndGetAllRevRank() throws InterruptedException {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple", StringCodec.INSTANCE);
Map<String, Double> map = new LinkedHashMap<>();
map.put("one", 1d);
map.put("three", 3d);
map.put("two", 2d);
Collection<Integer> res = set.addAndGetAllRevRank(map);
Assert.assertArrayEquals(new Integer[]{2, 0, 1}, res.toArray());
assertThat(set.revRank("one")).isEqualTo(2);
assertThat(set.revRank("two")).isEqualTo(1);
assertThat(set.revRank("three")).isEqualTo(0);
}
@Test
public void testIntersection() {

Loading…
Cancel
Save