From b7cf9cf8241269d640f080fc333dd03b54665bcb Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 30 Nov 2015 12:37:05 +0300 Subject: [PATCH] RScoredSortedSetReactive added. #210 --- .../java/org/redisson/RedissonReactive.java | 10 + .../org/redisson/RedissonReactiveClient.java | 21 +- .../RedissonScoredSortedSetReactive.java | 283 +++++++++++++++ .../core/RScoredSortedSetReactive.java | 67 ++++ .../java/org/redisson/BaseReactiveTest.java | 5 + .../RedissonScoredSortedSetReactiveTest.java | 335 ++++++++++++++++++ 6 files changed, 711 insertions(+), 10 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonScoredSortedSetReactive.java create mode 100644 src/main/java/org/redisson/core/RScoredSortedSetReactive.java create mode 100644 src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index 099d4542c..c649e6dea 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -33,6 +33,7 @@ import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RListReactive; import org.redisson.core.RMap; import org.redisson.core.RMapReactive; +import org.redisson.core.RScoredSortedSetReactive; import org.redisson.core.RSetReactive; import io.netty.util.concurrent.Future; @@ -138,7 +139,16 @@ public class RedissonReactive implements RedissonReactiveClient { public RSetReactive getSet(String name, Codec codec) { return new RedissonSetReactive(codec, commandExecutor, name); + } + @Override + public RScoredSortedSetReactive getScoredSortedSet(String name) { + return new RedissonScoredSortedSetReactive(commandExecutor, name); + } + + @Override + public RScoredSortedSetReactive getScoredSortedSet(String name, Codec codec) { + return new RedissonScoredSortedSetReactive(codec, commandExecutor, name); } @Override diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 69bd2cb66..b0f8fd657 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -23,6 +23,7 @@ import org.redisson.core.RHyperLogLogReactive; import org.redisson.core.RListReactive; import org.redisson.core.RMap; import org.redisson.core.RMapReactive; +import org.redisson.core.RScoredSortedSetReactive; import org.redisson.core.RSetReactive; public interface RedissonReactiveClient { @@ -82,16 +83,16 @@ public interface RedissonReactiveClient { RSetReactive getSet(String name, Codec codec); -// /** -// * Returns Redis Sorted Set instance by name -// * -// * @param name -// * @return -// */ -// RScoredSortedSet getScoredSortedSet(String name); -// -// RScoredSortedSet getScoredSortedSet(String name, Codec codec); -// + /** + * Returns Redis Sorted Set instance by name + * + * @param name + * @return + */ + RScoredSortedSetReactive getScoredSortedSet(String name); + + RScoredSortedSetReactive getScoredSortedSet(String name, Codec codec); + // /** // * Returns String based Redis Sorted Set instance by name // * All elements are inserted with the same score during addition, diff --git a/src/main/java/org/redisson/RedissonScoredSortedSetReactive.java b/src/main/java/org/redisson/RedissonScoredSortedSetReactive.java new file mode 100644 index 000000000..fcbcf7b64 --- /dev/null +++ b/src/main/java/org/redisson/RedissonScoredSortedSetReactive.java @@ -0,0 +1,283 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.math.BigDecimal; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.StringCodec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.ScoredEntry; +import org.redisson.client.protocol.convertor.BooleanReplayConvertor; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.core.RScoredSortedSetReactive; + +import reactor.rx.Stream; +import reactor.rx.subscription.ReactiveSubscription; + +public class RedissonScoredSortedSetReactive extends RedissonExpirableReactive implements RScoredSortedSetReactive { + + public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) { + super(commandExecutor, name); + } + + public RedissonScoredSortedSetReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + super(codec, commandExecutor, name); + } + + public Publisher first() { + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), 0, 0); + } + + public Publisher last() { + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), -1, -1); + } + + @Override + public Publisher add(double score, V object) { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZADD, getName(), BigDecimal.valueOf(score).toPlainString(), object); + } + + public Publisher removeRangeByRank(int startIndex, int endIndex) { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZREMRANGEBYRANK, getName(), startIndex, endIndex); + } + + public Publisher removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) { + String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive); + String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive); + return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZREMRANGEBYSCORE, getName(), startValue, endValue); + } + + private String value(String element, boolean inclusive) { + if (!inclusive) { + element = "(" + element; + } + return element; + } + + @Override + public Publisher remove(Object object) { + return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZREM, getName(), object); + } + + @Override + public Publisher size() { + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZCARD, getName()); + } + + @Override + public Publisher contains(Object o) { + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZSCORE_CONTAINS, getName(), o); + } + + @Override + public Publisher getScore(V o) { + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZSCORE, getName(), o); + } + + @Override + public Publisher rank(V o) { + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANK, getName(), o); + } + + private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + return commandExecutor.readObservable(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); + } + + @Override + public Publisher iterator() { + return new Stream() { + + @Override + public void subscribe(final Subscriber t) { + t.onSubscribe(new ReactiveSubscription(this, t) { + + private List firstValues; + private long nextIterPos; + private InetSocketAddress client; + + private long currentIndex; + + @Override + protected void onRequest(final long n) { + currentIndex = n; + nextValues(); + } + + protected void nextValues() { + final ReactiveSubscription m = this; + scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ListScanResult res) { + client = res.getRedisClient(); + + long prevIterPos = nextIterPos; + if (nextIterPos == 0 && firstValues == null) { + firstValues = res.getValues(); + } else if (res.getValues().equals(firstValues)) { + m.onComplete(); + currentIndex = 0; + return; + } + + nextIterPos = res.getPos(); + if (prevIterPos == nextIterPos) { + nextIterPos = -1; + } + for (V val : res.getValues()) { + m.onNext(val); + currentIndex--; + if (currentIndex == 0) { + m.onComplete(); + return; + } + } + if (nextIterPos == -1) { + m.onComplete(); + currentIndex = 0; + } + } + + @Override + public void onError(Throwable error) { + m.onError(error); + } + + @Override + public void onComplete() { + if (currentIndex == 0) { + return; + } + nextValues(); + } + }); + } + }); + } + + }; + } + + @Override + public Publisher containsAll(Collection c) { + return commandExecutor.evalReadObservable(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + "local s = redis.call('zrange', KEYS[1], 0, -1);" + + "for i = 0, table.getn(s), 1 do " + + "for j = 0, table.getn(ARGV), 1 do " + + "if ARGV[j] == s[i] " + + "then table.remove(ARGV, j) end " + + "end; " + + "end;" + + "return table.getn(ARGV) == 0; ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public Publisher removeAll(Collection c) { + return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + "local v = false " + + "for i = 0, table.getn(ARGV), 1 do " + + "if redis.call('zrem', KEYS[1], ARGV[i]) == 1 " + + "then v = true end " + +"end " + + "return v ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public Publisher retainAll(Collection c) { + return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), + "local changed = false " + + "local s = redis.call('zrange', KEYS[1], 0, -1) " + + "local i = 0 " + + "while i <= table.getn(s) do " + + "local element = s[i] " + + "local isInAgrs = false " + + "for j = 0, table.getn(ARGV), 1 do " + + "if ARGV[j] == element then " + + "isInAgrs = true " + + "break " + + "end " + + "end " + + "if isInAgrs == false then " + + "redis.call('zrem', KEYS[1], element) " + + "changed = true " + + "end " + + "i = i + 1 " + + "end " + + "return changed ", + Collections.singletonList(getName()), c.toArray()); + } + + @Override + public Publisher addScore(V object, Number value) { + return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZINCRBY, + getName(), new BigDecimal(value.toString()).toPlainString(), object); + } + + @Override + public Publisher> valueRange(int startIndex, int endIndex) { + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGE, getName(), startIndex, endIndex); + } + + @Override + public Publisher>> entryRange(int startIndex, int endIndex) { + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGE_ENTRY, getName(), startIndex, endIndex, "WITHSCORES"); + } + + @Override + public Publisher> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) { + String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive); + String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive); + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), startValue, endValue); + } + + @Override + public Publisher>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) { + String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive); + String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive); + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGEBYSCORE_ENTRY, getName(), startValue, endValue, "WITHSCORES"); + } + + @Override + public Publisher> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) { + String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive); + String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive); + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), startValue, endValue, "LIMIT", offset, count); + } + + @Override + public Publisher>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) { + String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive); + String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive); + return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANGEBYSCORE_ENTRY, getName(), startValue, endValue, "WITHSCORES", "LIMIT", offset, count); + } + +} diff --git a/src/main/java/org/redisson/core/RScoredSortedSetReactive.java b/src/main/java/org/redisson/core/RScoredSortedSetReactive.java new file mode 100644 index 000000000..d4d0168c4 --- /dev/null +++ b/src/main/java/org/redisson/core/RScoredSortedSetReactive.java @@ -0,0 +1,67 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.Collection; + +import org.reactivestreams.Publisher; +import org.redisson.client.protocol.ScoredEntry; + +public interface RScoredSortedSetReactive extends RExpirableReactive { + + Publisher iterator(); + + Publisher first(); + + Publisher last(); + + Publisher removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive); + + Publisher removeRangeByRank(int startIndex, int endIndex); + + Publisher rank(V o); + + Publisher getScore(V o); + + Publisher add(double score, V object); + + Publisher remove(V object); + + Publisher size(); + + Publisher contains(Object o); + + Publisher containsAll(Collection c); + + Publisher removeAll(Collection c); + + Publisher retainAll(Collection c); + + Publisher addScore(V object, Number value); + + Publisher> valueRange(int startIndex, int endIndex); + + Publisher>> entryRange(int startIndex, int endIndex); + + Publisher> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive); + + Publisher>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive); + + Publisher> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count); + + Publisher>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count); + +} diff --git a/src/test/java/org/redisson/BaseReactiveTest.java b/src/test/java/org/redisson/BaseReactiveTest.java index 481b0f58d..946004ede 100644 --- a/src/test/java/org/redisson/BaseReactiveTest.java +++ b/src/test/java/org/redisson/BaseReactiveTest.java @@ -8,6 +8,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.reactivestreams.Publisher; import org.redisson.core.RCollectionReactive; +import org.redisson.core.RScoredSortedSetReactive; import reactor.rx.Streams; @@ -25,6 +26,10 @@ public abstract class BaseReactiveTest { redisson.shutdown(); } + public Iterable sync(RScoredSortedSetReactive list) { + return Streams.create(list.iterator()).toList().poll(); + } + public Iterable sync(RCollectionReactive list) { return Streams.create(list.iterator()).toList().poll(); } diff --git a/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java b/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java new file mode 100644 index 000000000..03a4beb57 --- /dev/null +++ b/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java @@ -0,0 +1,335 @@ +package org.redisson; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.client.protocol.ScoredEntry; +import org.redisson.core.RScoredSortedSetReactive; + +public class RedissonScoredSortedSetReactiveTest extends BaseReactiveTest { + + @Test + public void testFirstLast() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(0.1, "a")); + sync(set.add(0.2, "b")); + sync(set.add(0.3, "c")); + sync(set.add(0.4, "d")); + + Assert.assertEquals("a", sync(set.first())); + Assert.assertEquals("d", sync(set.last())); + } + + + @Test + public void testRemoveRangeByScore() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(0.1, "a")); + sync(set.add(0.2, "b")); + sync(set.add(0.3, "c")); + sync(set.add(0.4, "d")); + sync(set.add(0.5, "e")); + sync(set.add(0.6, "f")); + sync(set.add(0.7, "g")); + + Assert.assertEquals(2, sync(set.removeRangeByScore(0.1, false, 0.3, true)).intValue()); + MatcherAssert.assertThat(sync(set), Matchers.contains("a", "d", "e", "f", "g")); + } + + @Test + public void testRemoveRangeByRank() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(0.1, "a")); + sync(set.add(0.2, "b")); + sync(set.add(0.3, "c")); + sync(set.add(0.4, "d")); + sync(set.add(0.5, "e")); + sync(set.add(0.6, "f")); + sync(set.add(0.7, "g")); + + Assert.assertEquals(2, sync(set.removeRangeByRank(0, 1)).intValue()); + MatcherAssert.assertThat(sync(set), Matchers.contains("c", "d", "e", "f", "g")); + } + + @Test + public void testRank() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(0.1, "a")); + sync(set.add(0.2, "b")); + sync(set.add(0.3, "c")); + sync(set.add(0.4, "d")); + sync(set.add(0.5, "e")); + sync(set.add(0.6, "f")); + sync(set.add(0.7, "g")); + + Assert.assertEquals(3, sync(set.rank("d")).intValue()); + } + + @Test + public void testAddAsync() throws InterruptedException, ExecutionException { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + Assert.assertTrue(sync(set.add(0.323, 2))); + Assert.assertFalse(sync(set.add(0.323, 2))); + + Assert.assertTrue(sync(set.contains(2))); + } + + @Test + public void testRemoveAsync() throws InterruptedException, ExecutionException { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + set.add(0.11, 1); + set.add(0.22, 3); + set.add(0.33, 7); + + Assert.assertTrue(sync(set.remove(1))); + Assert.assertFalse(sync(set.contains(1))); + Assert.assertThat(sync(set), Matchers.contains(3, 7)); + + Assert.assertFalse(sync(set.remove(1))); + Assert.assertThat(sync(set), Matchers.contains(3, 7)); + + sync(set.remove(3)); + Assert.assertFalse(sync(set.contains(3))); + Assert.assertThat(sync(set), Matchers.contains(7)); + } + + @Test + public void testIteratorNextNext() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(1, "1")); + sync(set.add(2, "4")); + + Iterator iter = toIterator(set.iterator()); + Assert.assertEquals("1", iter.next()); + Assert.assertEquals("4", iter.next()); + Assert.assertFalse(iter.hasNext()); + } + + @Test + public void testIteratorSequence() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + for (int i = 0; i < 1000; i++) { + sync(set.add(i, Integer.valueOf(i))); + } + + Set setCopy = new HashSet(); + for (int i = 0; i < 1000; i++) { + setCopy.add(Integer.valueOf(i)); + } + + checkIterator(set, setCopy); + } + + private void checkIterator(RScoredSortedSetReactive set, Set setCopy) { + for (Iterator iterator = toIterator(set.iterator()); iterator.hasNext();) { + Integer value = iterator.next(); + if (!setCopy.remove(value)) { + Assert.fail(); + } + } + + Assert.assertEquals(0, setCopy.size()); + } + + @Test + public void testRetainAll() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + for (int i = 0; i < 20000; i++) { + sync(set.add(i, i)); + } + + Assert.assertTrue(sync(set.retainAll(Arrays.asList(1, 2)))); + Assert.assertThat(sync(set), Matchers.containsInAnyOrder(1, 2)); + Assert.assertEquals(2, sync(set.size()).intValue()); + } + + @Test + public void testRemoveAll() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(0.1, 1)); + sync(set.add(0.2, 2)); + sync(set.add(0.3, 3)); + + Assert.assertTrue(sync(set.removeAll(Arrays.asList(1, 2)))); + Assert.assertThat(sync(set), Matchers.contains(3)); + Assert.assertEquals(1, sync(set.size()).intValue()); + } + + + @Test + public void testSort() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + Assert.assertTrue(sync(set.add(4, 2))); + Assert.assertTrue(sync(set.add(5, 3))); + Assert.assertTrue(sync(set.add(3, 1))); + Assert.assertTrue(sync(set.add(6, 4))); + Assert.assertTrue(sync(set.add(1000, 10))); + Assert.assertTrue(sync(set.add(1, -1))); + Assert.assertTrue(sync(set.add(2, 0))); + + MatcherAssert.assertThat(sync(set), Matchers.contains(-1, 0, 1, 2, 3, 4, 10)); + + Assert.assertEquals(-1, (int)sync(set.first())); + Assert.assertEquals(10, (int)sync(set.last())); + } + + @Test + public void testRemove() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(4, 5)); + sync(set.add(2, 3)); + sync(set.add(0, 1)); + sync(set.add(1, 2)); + sync(set.add(3, 4)); + + Assert.assertFalse(sync(set.remove(0))); + Assert.assertTrue(sync(set.remove(3))); + + Assert.assertThat(sync(set), Matchers.contains(1, 2, 4, 5)); + } + + @Test + public void testContainsAll() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + for (int i = 0; i < 200; i++) { + sync(set.add(i, i)); + } + + Assert.assertTrue(sync(set.containsAll(Arrays.asList(30, 11)))); + Assert.assertFalse(sync(set.containsAll(Arrays.asList(30, 711, 11)))); + } + + @Test + public void testContains() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + + sync(set.add(0, new TestObject("1", "2"))); + sync(set.add(1, new TestObject("1", "2"))); + sync(set.add(2, new TestObject("2", "3"))); + sync(set.add(3, new TestObject("3", "4"))); + sync(set.add(4, new TestObject("5", "6"))); + + Assert.assertTrue(sync(set.contains(new TestObject("2", "3")))); + Assert.assertTrue(sync(set.contains(new TestObject("1", "2")))); + Assert.assertFalse(sync(set.contains(new TestObject("1", "9")))); + } + + @Test + public void testDuplicates() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + + Assert.assertTrue(sync(set.add(0, new TestObject("1", "2")))); + Assert.assertFalse(sync(set.add(0, new TestObject("1", "2")))); + Assert.assertTrue(sync(set.add(2, new TestObject("2", "3")))); + Assert.assertTrue(sync(set.add(3, new TestObject("3", "4")))); + Assert.assertTrue(sync(set.add(4, new TestObject("5", "6")))); + + Assert.assertEquals(4, sync(set.size()).intValue()); + } + + @Test + public void testSize() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(0, 1)); + sync(set.add(1, 2)); + sync(set.add(2, 3)); + sync(set.add(2, 3)); + sync(set.add(3, 4)); + sync(set.add(4, 5)); + sync(set.add(4, 5)); + + Assert.assertEquals(5, sync(set.size()).intValue()); + } + + @Test + public void testValueRange() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(0, 1)); + sync(set.add(1, 2)); + sync(set.add(2, 3)); + sync(set.add(3, 4)); + sync(set.add(4, 5)); + sync(set.add(4, 5)); + + Collection vals = sync(set.valueRange(0, -1)); + MatcherAssert.assertThat(vals, Matchers.contains(1, 2, 3, 4, 5)); + } + + @Test + public void testEntryRange() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + sync(set.add(10, 1)); + sync(set.add(20, 2)); + sync(set.add(30, 3)); + sync(set.add(40, 4)); + sync(set.add(50, 5)); + + Collection> vals = sync(set.entryRange(0, -1)); + MatcherAssert.assertThat(vals, Matchers.contains(new ScoredEntry(10D, 1), + new ScoredEntry(20D, 2), + new ScoredEntry(30D, 3), + new ScoredEntry(40D, 4), + new ScoredEntry(50D, 5))); + } + + @Test + public void testScoredSortedSetValueRange() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + + sync(set.add(0, "a")); + sync(set.add(1, "b")); + sync(set.add(2, "c")); + sync(set.add(3, "d")); + sync(set.add(4, "e")); + + Collection r = sync(set.valueRange(1, true, 4, false, 1, 2)); + String[] a = r.toArray(new String[0]); + Assert.assertArrayEquals(new String[]{"c", "d"}, a); + } + + @Test + public void testScoredSortedSetEntryRange() { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + + sync(set.add(0, "a")); + sync(set.add(1, "b")); + sync(set.add(2, "c")); + sync(set.add(3, "d")); + sync(set.add(4, "e")); + + Collection> r = sync(set.entryRange(1, true, 4, false, 1, 2)); + ScoredEntry[] a = r.toArray(new ScoredEntry[0]); + Assert.assertEquals(2d, a[0].getScore(), 0); + Assert.assertEquals(3d, a[1].getScore(), 0); + Assert.assertEquals("c", a[0].getValue()); + Assert.assertEquals("d", a[1].getValue()); + } + + @Test + public void testAddAndGet() throws InterruptedException { + RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); + set.add(1, 100); + + Double res = sync(set.addScore(100, 11)); + Assert.assertEquals(12, (double)res, 0); + Double score = sync(set.getScore(100)); + Assert.assertEquals(12, (double)score, 0); + + RScoredSortedSetReactive set2 = redisson.getScoredSortedSet("simple"); + set2.add(100.2, 1); + + Double res2 = sync(set2.addScore(1, new Double(12.1))); + Assert.assertTrue(new Double(112.3).compareTo(res2) == 0); + res2 = sync(set2.getScore(1)); + Assert.assertTrue(new Double(112.3).compareTo(res2) == 0); + } + +}