RScoredSortedSetReactive added. #210

pull/337/head
Nikita 9 years ago
parent b083d66d30
commit b7cf9cf824

@ -33,6 +33,7 @@ import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RListReactive;
import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RSetReactive;
import io.netty.util.concurrent.Future;
@ -138,7 +139,16 @@ public class RedissonReactive implements RedissonReactiveClient {
public <V> RSetReactive<V> getSet(String name, Codec codec) {
return new RedissonSetReactive<V>(codec, commandExecutor, name);
}
@Override
public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSetReactive<V>(commandExecutor, name);
}
@Override
public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec codec) {
return new RedissonScoredSortedSetReactive<V>(codec, commandExecutor, name);
}
@Override

@ -23,6 +23,7 @@ import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RListReactive;
import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RSetReactive;
public interface RedissonReactiveClient {
@ -82,16 +83,16 @@ public interface RedissonReactiveClient {
<V> RSetReactive<V> getSet(String name, Codec codec);
// /**
// * Returns Redis Sorted Set instance by name
// *
// * @param name
// * @return
// */
// <V> RScoredSortedSet<V> getScoredSortedSet(String name);
//
// <V> RScoredSortedSet<V> getScoredSortedSet(String name, Codec codec);
//
/**
* Returns Redis Sorted Set instance by name
*
* @param name
* @return
*/
<V> RScoredSortedSetReactive<V> getScoredSortedSet(String name);
<V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec codec);
// /**
// * Returns String based Redis Sorted Set instance by name
// * All elements are inserted with the same score during addition,

@ -0,0 +1,283 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.core.RScoredSortedSetReactive;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactive implements RScoredSortedSetReactive<V> {
public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
public Publisher<V> first() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), 0, 0);
}
public Publisher<V> last() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), -1, -1);
}
@Override
public Publisher<Boolean> add(double score, V object) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZADD, getName(), BigDecimal.valueOf(score).toPlainString(), object);
}
public Publisher<Integer> removeRangeByRank(int startIndex, int endIndex) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZREMRANGEBYRANK, getName(), startIndex, endIndex);
}
public Publisher<Integer> removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZREMRANGEBYSCORE, getName(), startValue, endValue);
}
private String value(String element, boolean inclusive) {
if (!inclusive) {
element = "(" + element;
}
return element;
}
@Override
public Publisher<Boolean> remove(Object object) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZREM, getName(), object);
}
@Override
public Publisher<Integer> size() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZCARD, getName());
}
@Override
public Publisher<Boolean> contains(Object o) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZSCORE_CONTAINS, getName(), o);
}
@Override
public Publisher<Double> getScore(V o) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZSCORE, getName(), o);
}
@Override
public Publisher<Integer> rank(V o) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANK, getName(), o);
}
private Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readObservable(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
}
@Override
public Publisher<V> iterator() {
return new Stream<V>() {
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
@Override
protected void onRequest(final long n) {
currentIndex = n;
nextValues();
}
protected void nextValues() {
final ReactiveSubscription<V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ListScanResult<V> res) {
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
currentIndex = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (V val : res.getValues()) {
m.onNext(val);
currentIndex--;
if (currentIndex == 0) {
m.onComplete();
return;
}
}
if (nextIterPos == -1) {
m.onComplete();
currentIndex = 0;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
}
@Override
public void onComplete() {
if (currentIndex == 0) {
return;
}
nextValues();
}
});
}
});
}
};
}
@Override
public Publisher<Boolean> containsAll(Collection<?> c) {
return commandExecutor.evalReadObservable(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('zrange', KEYS[1], 0, -1);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Publisher<Boolean> removeAll(Collection<?> c) {
return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('zrem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Publisher<Boolean> retainAll(Collection<?> c) {
return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local s = redis.call('zrange', KEYS[1], 0, -1) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('zrem', KEYS[1], element) "
+ "changed = true "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Publisher<Double> addScore(V object, Number value) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZINCRBY,
getName(), new BigDecimal(value.toString()).toPlainString(), object);
}
@Override
public Publisher<Collection<V>> valueRange(int startIndex, int endIndex) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGE, getName(), startIndex, endIndex);
}
@Override
public Publisher<Collection<ScoredEntry<V>>> entryRange(int startIndex, int endIndex) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGE_ENTRY, getName(), startIndex, endIndex, "WITHSCORES");
}
@Override
public Publisher<Collection<V>> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), startValue, endValue);
}
@Override
public Publisher<Collection<ScoredEntry<V>>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGEBYSCORE_ENTRY, getName(), startValue, endValue, "WITHSCORES");
}
@Override
public Publisher<Collection<V>> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), startValue, endValue, "LIMIT", offset, count);
}
@Override
public Publisher<Collection<ScoredEntry<V>>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGEBYSCORE_ENTRY, getName(), startValue, endValue, "WITHSCORES", "LIMIT", offset, count);
}
}

@ -0,0 +1,67 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import org.reactivestreams.Publisher;
import org.redisson.client.protocol.ScoredEntry;
public interface RScoredSortedSetReactive<V> extends RExpirableReactive {
Publisher<V> iterator();
Publisher<V> first();
Publisher<V> last();
Publisher<Integer> removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Publisher<Integer> removeRangeByRank(int startIndex, int endIndex);
Publisher<Integer> rank(V o);
Publisher<Double> getScore(V o);
Publisher<Boolean> add(double score, V object);
Publisher<Boolean> remove(V object);
Publisher<Integer> size();
Publisher<Boolean> contains(Object o);
Publisher<Boolean> containsAll(Collection<?> c);
Publisher<Boolean> removeAll(Collection<?> c);
Publisher<Boolean> retainAll(Collection<?> c);
Publisher<Double> addScore(V object, Number value);
Publisher<Collection<V>> valueRange(int startIndex, int endIndex);
Publisher<Collection<ScoredEntry<V>>> entryRange(int startIndex, int endIndex);
Publisher<Collection<V>> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Publisher<Collection<ScoredEntry<V>>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Publisher<Collection<V>> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);
Publisher<Collection<ScoredEntry<V>>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);
}

@ -8,6 +8,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.reactivestreams.Publisher;
import org.redisson.core.RCollectionReactive;
import org.redisson.core.RScoredSortedSetReactive;
import reactor.rx.Streams;
@ -25,6 +26,10 @@ public abstract class BaseReactiveTest {
redisson.shutdown();
}
public <V> Iterable<V> sync(RScoredSortedSetReactive<V> list) {
return Streams.create(list.iterator()).toList().poll();
}
public <V> Iterable<V> sync(RCollectionReactive<V> list) {
return Streams.create(list.iterator()).toList().poll();
}

@ -0,0 +1,335 @@
package org.redisson;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.core.RScoredSortedSetReactive;
public class RedissonScoredSortedSetReactiveTest extends BaseReactiveTest {
@Test
public void testFirstLast() {
RScoredSortedSetReactive<String> set = redisson.getScoredSortedSet("simple");
sync(set.add(0.1, "a"));
sync(set.add(0.2, "b"));
sync(set.add(0.3, "c"));
sync(set.add(0.4, "d"));
Assert.assertEquals("a", sync(set.first()));
Assert.assertEquals("d", sync(set.last()));
}
@Test
public void testRemoveRangeByScore() {
RScoredSortedSetReactive<String> set = redisson.getScoredSortedSet("simple");
sync(set.add(0.1, "a"));
sync(set.add(0.2, "b"));
sync(set.add(0.3, "c"));
sync(set.add(0.4, "d"));
sync(set.add(0.5, "e"));
sync(set.add(0.6, "f"));
sync(set.add(0.7, "g"));
Assert.assertEquals(2, sync(set.removeRangeByScore(0.1, false, 0.3, true)).intValue());
MatcherAssert.assertThat(sync(set), Matchers.contains("a", "d", "e", "f", "g"));
}
@Test
public void testRemoveRangeByRank() {
RScoredSortedSetReactive<String> set = redisson.getScoredSortedSet("simple");
sync(set.add(0.1, "a"));
sync(set.add(0.2, "b"));
sync(set.add(0.3, "c"));
sync(set.add(0.4, "d"));
sync(set.add(0.5, "e"));
sync(set.add(0.6, "f"));
sync(set.add(0.7, "g"));
Assert.assertEquals(2, sync(set.removeRangeByRank(0, 1)).intValue());
MatcherAssert.assertThat(sync(set), Matchers.contains("c", "d", "e", "f", "g"));
}
@Test
public void testRank() {
RScoredSortedSetReactive<String> set = redisson.getScoredSortedSet("simple");
sync(set.add(0.1, "a"));
sync(set.add(0.2, "b"));
sync(set.add(0.3, "c"));
sync(set.add(0.4, "d"));
sync(set.add(0.5, "e"));
sync(set.add(0.6, "f"));
sync(set.add(0.7, "g"));
Assert.assertEquals(3, sync(set.rank("d")).intValue());
}
@Test
public void testAddAsync() throws InterruptedException, ExecutionException {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
Assert.assertTrue(sync(set.add(0.323, 2)));
Assert.assertFalse(sync(set.add(0.323, 2)));
Assert.assertTrue(sync(set.contains(2)));
}
@Test
public void testRemoveAsync() throws InterruptedException, ExecutionException {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
set.add(0.11, 1);
set.add(0.22, 3);
set.add(0.33, 7);
Assert.assertTrue(sync(set.remove(1)));
Assert.assertFalse(sync(set.contains(1)));
Assert.assertThat(sync(set), Matchers.contains(3, 7));
Assert.assertFalse(sync(set.remove(1)));
Assert.assertThat(sync(set), Matchers.contains(3, 7));
sync(set.remove(3));
Assert.assertFalse(sync(set.contains(3)));
Assert.assertThat(sync(set), Matchers.contains(7));
}
@Test
public void testIteratorNextNext() {
RScoredSortedSetReactive<String> set = redisson.getScoredSortedSet("simple");
sync(set.add(1, "1"));
sync(set.add(2, "4"));
Iterator<String> iter = toIterator(set.iterator());
Assert.assertEquals("1", iter.next());
Assert.assertEquals("4", iter.next());
Assert.assertFalse(iter.hasNext());
}
@Test
public void testIteratorSequence() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
for (int i = 0; i < 1000; i++) {
sync(set.add(i, Integer.valueOf(i)));
}
Set<Integer> setCopy = new HashSet<Integer>();
for (int i = 0; i < 1000; i++) {
setCopy.add(Integer.valueOf(i));
}
checkIterator(set, setCopy);
}
private void checkIterator(RScoredSortedSetReactive<Integer> set, Set<Integer> setCopy) {
for (Iterator<Integer> iterator = toIterator(set.iterator()); iterator.hasNext();) {
Integer value = iterator.next();
if (!setCopy.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, setCopy.size());
}
@Test
public void testRetainAll() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
for (int i = 0; i < 20000; i++) {
sync(set.add(i, i));
}
Assert.assertTrue(sync(set.retainAll(Arrays.asList(1, 2))));
Assert.assertThat(sync(set), Matchers.containsInAnyOrder(1, 2));
Assert.assertEquals(2, sync(set.size()).intValue());
}
@Test
public void testRemoveAll() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
sync(set.add(0.1, 1));
sync(set.add(0.2, 2));
sync(set.add(0.3, 3));
Assert.assertTrue(sync(set.removeAll(Arrays.asList(1, 2))));
Assert.assertThat(sync(set), Matchers.contains(3));
Assert.assertEquals(1, sync(set.size()).intValue());
}
@Test
public void testSort() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
Assert.assertTrue(sync(set.add(4, 2)));
Assert.assertTrue(sync(set.add(5, 3)));
Assert.assertTrue(sync(set.add(3, 1)));
Assert.assertTrue(sync(set.add(6, 4)));
Assert.assertTrue(sync(set.add(1000, 10)));
Assert.assertTrue(sync(set.add(1, -1)));
Assert.assertTrue(sync(set.add(2, 0)));
MatcherAssert.assertThat(sync(set), Matchers.contains(-1, 0, 1, 2, 3, 4, 10));
Assert.assertEquals(-1, (int)sync(set.first()));
Assert.assertEquals(10, (int)sync(set.last()));
}
@Test
public void testRemove() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
sync(set.add(4, 5));
sync(set.add(2, 3));
sync(set.add(0, 1));
sync(set.add(1, 2));
sync(set.add(3, 4));
Assert.assertFalse(sync(set.remove(0)));
Assert.assertTrue(sync(set.remove(3)));
Assert.assertThat(sync(set), Matchers.contains(1, 2, 4, 5));
}
@Test
public void testContainsAll() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
for (int i = 0; i < 200; i++) {
sync(set.add(i, i));
}
Assert.assertTrue(sync(set.containsAll(Arrays.asList(30, 11))));
Assert.assertFalse(sync(set.containsAll(Arrays.asList(30, 711, 11))));
}
@Test
public void testContains() {
RScoredSortedSetReactive<TestObject> set = redisson.getScoredSortedSet("simple");
sync(set.add(0, new TestObject("1", "2")));
sync(set.add(1, new TestObject("1", "2")));
sync(set.add(2, new TestObject("2", "3")));
sync(set.add(3, new TestObject("3", "4")));
sync(set.add(4, new TestObject("5", "6")));
Assert.assertTrue(sync(set.contains(new TestObject("2", "3"))));
Assert.assertTrue(sync(set.contains(new TestObject("1", "2"))));
Assert.assertFalse(sync(set.contains(new TestObject("1", "9"))));
}
@Test
public void testDuplicates() {
RScoredSortedSetReactive<TestObject> set = redisson.getScoredSortedSet("simple");
Assert.assertTrue(sync(set.add(0, new TestObject("1", "2"))));
Assert.assertFalse(sync(set.add(0, new TestObject("1", "2"))));
Assert.assertTrue(sync(set.add(2, new TestObject("2", "3"))));
Assert.assertTrue(sync(set.add(3, new TestObject("3", "4"))));
Assert.assertTrue(sync(set.add(4, new TestObject("5", "6"))));
Assert.assertEquals(4, sync(set.size()).intValue());
}
@Test
public void testSize() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
sync(set.add(0, 1));
sync(set.add(1, 2));
sync(set.add(2, 3));
sync(set.add(2, 3));
sync(set.add(3, 4));
sync(set.add(4, 5));
sync(set.add(4, 5));
Assert.assertEquals(5, sync(set.size()).intValue());
}
@Test
public void testValueRange() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
sync(set.add(0, 1));
sync(set.add(1, 2));
sync(set.add(2, 3));
sync(set.add(3, 4));
sync(set.add(4, 5));
sync(set.add(4, 5));
Collection<Integer> vals = sync(set.valueRange(0, -1));
MatcherAssert.assertThat(vals, Matchers.contains(1, 2, 3, 4, 5));
}
@Test
public void testEntryRange() {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
sync(set.add(10, 1));
sync(set.add(20, 2));
sync(set.add(30, 3));
sync(set.add(40, 4));
sync(set.add(50, 5));
Collection<ScoredEntry<Integer>> vals = sync(set.entryRange(0, -1));
MatcherAssert.assertThat(vals, Matchers.contains(new ScoredEntry<Integer>(10D, 1),
new ScoredEntry<Integer>(20D, 2),
new ScoredEntry<Integer>(30D, 3),
new ScoredEntry<Integer>(40D, 4),
new ScoredEntry<Integer>(50D, 5)));
}
@Test
public void testScoredSortedSetValueRange() {
RScoredSortedSetReactive<String> set = redisson.<String>getScoredSortedSet("simple");
sync(set.add(0, "a"));
sync(set.add(1, "b"));
sync(set.add(2, "c"));
sync(set.add(3, "d"));
sync(set.add(4, "e"));
Collection<String> r = sync(set.valueRange(1, true, 4, false, 1, 2));
String[] a = r.toArray(new String[0]);
Assert.assertArrayEquals(new String[]{"c", "d"}, a);
}
@Test
public void testScoredSortedSetEntryRange() {
RScoredSortedSetReactive<String> set = redisson.<String>getScoredSortedSet("simple");
sync(set.add(0, "a"));
sync(set.add(1, "b"));
sync(set.add(2, "c"));
sync(set.add(3, "d"));
sync(set.add(4, "e"));
Collection<ScoredEntry<String>> r = sync(set.entryRange(1, true, 4, false, 1, 2));
ScoredEntry<String>[] a = r.toArray(new ScoredEntry[0]);
Assert.assertEquals(2d, a[0].getScore(), 0);
Assert.assertEquals(3d, a[1].getScore(), 0);
Assert.assertEquals("c", a[0].getValue());
Assert.assertEquals("d", a[1].getValue());
}
@Test
public void testAddAndGet() throws InterruptedException {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
set.add(1, 100);
Double res = sync(set.addScore(100, 11));
Assert.assertEquals(12, (double)res, 0);
Double score = sync(set.getScore(100));
Assert.assertEquals(12, (double)score, 0);
RScoredSortedSetReactive<Integer> set2 = redisson.getScoredSortedSet("simple");
set2.add(100.2, 1);
Double res2 = sync(set2.addScore(1, new Double(12.1)));
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
res2 = sync(set2.getScore(1));
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
}
}
Loading…
Cancel
Save