RLexSortedSetReactive added. #210

pull/337/head
Nikita 9 years ago
parent b7cf9cf824
commit 3ad12cb416

@ -196,7 +196,7 @@ public class RedissonLexSortedSet extends RedissonScoredSortedSet<String> implem
params.add(0);
params.add(param);
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD, getName(), params.toArray());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD_BOOL, getName(), params.toArray());
}
@Override

@ -0,0 +1,141 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RLexSortedSetReactive;
public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactive<String> implements RLexSortedSetReactive {
public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(StringCodec.INSTANCE, commandExecutor, name);
}
@Override
public Publisher<Integer> removeRangeHeadByLex(String toElement, boolean toInclusive) {
String toValue = value(toElement, toInclusive);
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), "-", toValue);
}
@Override
public Publisher<Integer> removeRangeTailByLex(String fromElement, boolean fromInclusive) {
String fromValue = value(fromElement, fromInclusive);
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), fromValue, "+");
}
@Override
public Publisher<Integer> removeRangeByLex(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
String fromValue = value(fromElement, fromInclusive);
String toValue = value(toElement, toInclusive);
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), fromValue, toValue);
}
@Override
public Publisher<Collection<String>> lexRangeHead(String toElement, boolean toInclusive) {
String toValue = value(toElement, toInclusive);
return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), "-", toValue);
}
@Override
public Publisher<Collection<String>> lexRangeTail(String fromElement, boolean fromInclusive) {
String fromValue = value(fromElement, fromInclusive);
return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, "+");
}
@Override
public Publisher<Collection<String>> lexRange(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
String fromValue = value(fromElement, fromInclusive);
String toValue = value(toElement, toInclusive);
return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, toValue);
}
@Override
public Publisher<Collection<String>> lexRangeHead(String toElement, boolean toInclusive, int offset, int count) {
String toValue = value(toElement, toInclusive);
return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), "-", toValue, "LIMIT", offset, count);
}
@Override
public Publisher<Collection<String>> lexRangeTail(String fromElement, boolean fromInclusive, int offset, int count) {
String fromValue = value(fromElement, fromInclusive);
return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, "+", "LIMIT", offset, count);
}
@Override
public Publisher<Collection<String>> lexRange(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive, int offset, int count) {
String fromValue = value(fromElement, fromInclusive);
String toValue = value(toElement, toInclusive);
return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, toValue, "LIMIT", offset, count);
}
@Override
public Publisher<Integer> lexCountTail(String fromElement, boolean fromInclusive) {
String fromValue = value(fromElement, fromInclusive);
return commandExecutor.readObservable(getName(), RedisCommands.ZLEXCOUNT, getName(), fromValue, "+");
}
@Override
public Publisher<Integer> lexCountHead(String toElement, boolean toInclusive) {
String toValue = value(toElement, toInclusive);
return commandExecutor.readObservable(getName(), RedisCommands.ZLEXCOUNT, getName(), "-", toValue);
}
@Override
public Publisher<Integer> lexCount(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
String fromValue = value(fromElement, fromInclusive);
String toValue = value(toElement, toInclusive);
return commandExecutor.readObservable(getName(), RedisCommands.ZLEXCOUNT, getName(), fromValue, toValue);
}
private String value(String fromElement, boolean fromInclusive) {
String fromValue = fromElement.toString();
if (fromInclusive) {
fromValue = "[" + fromValue;
} else {
fromValue = "(" + fromValue;
}
return fromValue;
}
@Override
public Publisher<Long> add(String e) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZADD, getName(), 0, e);
}
@Override
public Publisher<Long> addAll(Collection<? extends String> c) {
List<Object> params = new ArrayList<Object>(2*c.size());
for (Object param : c) {
params.add(0);
params.add(param);
}
return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZADD, getName(), params.toArray());
}
}

@ -30,6 +30,7 @@ import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive;
import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
@ -151,6 +152,11 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonScoredSortedSetReactive<V>(codec, commandExecutor, name);
}
@Override
public RLexSortedSetReactive getLexSortedSet(String name) {
return new RedissonLexSortedSetReactive(commandExecutor, name);
}
@Override
public void shutdown() {
connectionManager.shutdown();

@ -20,6 +20,8 @@ import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RLexSortedSet;
import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive;
import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
@ -93,16 +95,16 @@ public interface RedissonReactiveClient {
<V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec codec);
// /**
// * Returns String based Redis Sorted Set instance by name
// * All elements are inserted with the same score during addition,
// * in order to force lexicographical ordering
// *
// * @param name
// * @return
// */
// RLexSortedSet getLexSortedSet(String name);
//
/**
* Returns String based Redis Sorted Set instance by name
* All elements are inserted with the same score during addition,
* in order to force lexicographical ordering
*
* @param name
* @return
*/
RLexSortedSetReactive getLexSortedSet(String name);
// /**
// * Returns topic instance by name.
// *

@ -67,7 +67,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Boolean> addAsync(double score, V object) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD, getName(), BigDecimal.valueOf(score).toPlainString(), object);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD_BOOL, getName(), BigDecimal.valueOf(score).toPlainString(), object);
}
@Override
@ -122,7 +122,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZCARD, getName());
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZCARD_INT, getName());
}
@Override
@ -152,7 +152,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Integer> rankAsync(V o) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK, getName(), o);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), o);
}
private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {

@ -56,7 +56,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
@Override
public Publisher<Boolean> add(double score, V object) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZADD, getName(), BigDecimal.valueOf(score).toPlainString(), object);
return commandExecutor.writeObservable(getName(), codec, RedisCommands.ZADD_BOOL, getName(), BigDecimal.valueOf(score).toPlainString(), object);
}
public Publisher<Integer> removeRangeByRank(int startIndex, int endIndex) {
@ -82,7 +82,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
}
@Override
public Publisher<Integer> size() {
public Publisher<Long> size() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZCARD, getName());
}
@ -97,7 +97,7 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
}
@Override
public Publisher<Integer> rank(V o) {
public Publisher<Long> rank(V o) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.ZRANK, getName(), o);
}

@ -69,7 +69,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
@Override
public Publisher<Long> add(V e) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.SADD_SINGLE, getName(), e);
return commandExecutor.writeObservable(getName(), codec, RedisCommands.SADD, getName(), e);
}
@Override

@ -64,13 +64,16 @@ public interface RedisCommands {
RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor());
RedisStrictCommand<Void> READONLY = new RedisStrictCommand<Void>("READONLY", new VoidReplayConvertor());
RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3);
RedisCommand<Boolean> ZADD_BOOL = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3);
RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", 3);
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor(), 2);
RedisStrictCommand<Integer> ZCARD = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZCARD_INT = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor());
RedisStrictCommand<Long> ZCARD = new RedisStrictCommand<Long>("ZCARD");
RedisStrictCommand<Integer> ZLEXCOUNT = new RedisStrictCommand<Integer>("ZLEXCOUNT", new IntegerReplayConvertor());
RedisCommand<Boolean> ZSCORE_CONTAINS = new RedisCommand<Boolean>("ZSCORE", new BooleanNotNullReplayConvertor(), 2);
RedisStrictCommand<Double> ZSCORE = new RedisStrictCommand<Double>("ZSCORE", new DoubleReplayConvertor());
RedisCommand<Integer> ZRANK = new RedisCommand<Integer>("ZRANK", new IntegerReplayConvertor(), 2);
RedisCommand<Integer> ZRANK_INT = new RedisCommand<Integer>("ZRANK", new IntegerReplayConvertor(), 2);
RedisStrictCommand<Long> ZRANK = new RedisStrictCommand<Long>("ZRANK", 2);
RedisCommand<Object> ZRANGE_SINGLE = new RedisCommand<Object>("ZRANGE", new ObjectFirstResultReplayDecoder<Object>());
RedisCommand<List<Object>> ZRANGE = new RedisCommand<List<Object>>("ZRANGE", new ObjectListReplayDecoder<Object>());
RedisStrictCommand<Integer> ZREMRANGEBYRANK = new RedisStrictCommand<Integer>("ZREMRANGEBYRANK", new IntegerReplayConvertor());

@ -24,6 +24,10 @@ public class RedisStrictCommand<T> extends RedisCommand<T> {
super(name, (Decoder<T>)null, objectParamIndex, inParamType);
}
public RedisStrictCommand(String name, int encodeParamIndex) {
super(name, null, null, null, encodeParamIndex);
}
public RedisStrictCommand(String name, MultiDecoder<T> replayMultiDecoder) {
super(name, replayMultiDecoder);
}

@ -0,0 +1,52 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import org.reactivestreams.Publisher;
public interface RLexSortedSetReactive extends RCollectionReactive<String> {
Publisher<Integer> removeRangeByLex(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
Publisher<Integer> removeRangeTailByLex(String fromElement, boolean fromInclusive);
Publisher<Integer> removeRangeHeadByLex(String toElement, boolean toInclusive);
Publisher<Integer> lexCountTail(String fromElement, boolean fromInclusive);
Publisher<Integer> lexCountHead(String toElement, boolean toInclusive);
Publisher<Collection<String>> lexRangeTail(String fromElement, boolean fromInclusive);
Publisher<Collection<String>> lexRangeHead(String toElement, boolean toInclusive);
Publisher<Collection<String>> lexRange(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
Publisher<Collection<String>> lexRangeTail(String fromElement, boolean fromInclusive, int offset, int count);
Publisher<Collection<String>> lexRangeHead(String toElement, boolean toInclusive, int offset, int count);
Publisher<Collection<String>> lexRange(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive, int offset, int count);
Publisher<Integer> lexCount(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
Publisher<Long> rank(String o);
Publisher<Collection<String>> valueRange(int startIndex, int endIndex);
}

@ -27,6 +27,8 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync {
Future<V> lastAsync();
Future<Integer> removeRangeByScoreAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Future<Integer> removeRangeByRankAsync(int startIndex, int endIndex);
Future<Integer> rankAsync(V o);

@ -32,7 +32,7 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive {
Publisher<Integer> removeRangeByRank(int startIndex, int endIndex);
Publisher<Integer> rank(V o);
Publisher<Long> rank(V o);
Publisher<Double> getScore(V o);
@ -40,7 +40,7 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive {
Publisher<Boolean> remove(V object);
Publisher<Integer> size();
Publisher<Long> size();
Publisher<Boolean> contains(Object o);

@ -0,0 +1,127 @@
package org.redisson;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RLexSortedSetReactive;
public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest {
@Test
public void testRemoveLexRangeTail() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
Assert.assertTrue(sync(set.add("a")) == 1);
Assert.assertFalse(sync(set.add("a")) == 1);
Assert.assertTrue(sync(set.add("b")) == 1);
Assert.assertTrue(sync(set.add("c")) == 1);
Assert.assertTrue(sync(set.add("d")) == 1);
Assert.assertTrue(sync(set.add("e")) == 1);
Assert.assertTrue(sync(set.add("f")) == 1);
Assert.assertTrue(sync(set.add("g")) == 1);
Assert.assertEquals(0, sync(set.removeRangeTailByLex("z", false)).intValue());
Assert.assertEquals(4, sync(set.removeRangeTailByLex("c", false)).intValue());
MatcherAssert.assertThat(sync(set), Matchers.contains("a", "b", "c"));
Assert.assertEquals(1, sync(set.removeRangeTailByLex("c", true)).intValue());
MatcherAssert.assertThat(sync(set), Matchers.contains("a", "b"));
}
@Test
public void testRemoveLexRangeHead() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
sync(set.add("a"));
sync(set.add("b"));
sync(set.add("c"));
sync(set.add("d"));
sync(set.add("e"));
sync(set.add("f"));
sync(set.add("g"));
Assert.assertEquals(2, sync(set.removeRangeHeadByLex("c", false)).intValue());
MatcherAssert.assertThat(sync(set), Matchers.contains("c", "d", "e", "f", "g"));
Assert.assertEquals(1, (int)sync(set.removeRangeHeadByLex("c", true)));
MatcherAssert.assertThat(sync(set), Matchers.contains("d", "e", "f", "g"));
}
@Test
public void testRemoveLexRange() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
sync(set.add("a"));
sync(set.add("b"));
sync(set.add("c"));
sync(set.add("d"));
sync(set.add("e"));
sync(set.add("f"));
sync(set.add("g"));
Assert.assertEquals(5, sync(set.removeRangeByLex("aaa", true, "g", false)).intValue());
MatcherAssert.assertThat(sync(set), Matchers.contains("a", "g"));
}
@Test
public void testLexRangeTail() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
Assert.assertTrue(sync(set.add("a")) == 1);
Assert.assertFalse(sync(set.add("a")) == 1);
Assert.assertTrue(sync(set.add("b")) == 1);
Assert.assertTrue(sync(set.add("c")) == 1);
Assert.assertTrue(sync(set.add("d")) == 1);
Assert.assertTrue(sync(set.add("e")) == 1);
Assert.assertTrue(sync(set.add("f")) == 1);
Assert.assertTrue(sync(set.add("g")) == 1);
MatcherAssert.assertThat(sync(set.lexRangeTail("c", false)), Matchers.contains("d", "e", "f", "g"));
MatcherAssert.assertThat(sync(set.lexRangeTail("c", true)), Matchers.contains("c", "d", "e", "f", "g"));
}
@Test
public void testLexRangeHead() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
sync(set.add("a"));
sync(set.add("b"));
sync(set.add("c"));
sync(set.add("d"));
sync(set.add("e"));
sync(set.add("f"));
sync(set.add("g"));
MatcherAssert.assertThat(sync(set.lexRangeHead("c", false)), Matchers.contains("a", "b"));
MatcherAssert.assertThat(sync(set.lexRangeHead("c", true)), Matchers.contains("a", "b", "c"));
}
@Test
public void testLexRange() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
sync(set.add("a"));
sync(set.add("b"));
sync(set.add("c"));
sync(set.add("d"));
sync(set.add("e"));
sync(set.add("f"));
sync(set.add("g"));
MatcherAssert.assertThat(sync(set.lexRange("aaa", true, "g", false)), Matchers.contains("b", "c", "d", "e", "f"));
}
@Test
public void testLexCount() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
sync(set.add("a"));
sync(set.add("b"));
sync(set.add("c"));
sync(set.add("d"));
sync(set.add("e"));
sync(set.add("f"));
sync(set.add("g"));
Assert.assertEquals(5, (int)sync(set.lexCount("b", true, "f", true)));
Assert.assertEquals(3, (int)sync(set.lexCount("b", false, "f", false)));
}
}
Loading…
Cancel
Save