Merge branch 'master' into 3.0.0

pull/802/merge
Nikita 9 years ago
commit 9a2ea4bb2b

@ -31,6 +31,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScoredCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -371,6 +372,16 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE, getName(), startIndex, endIndex);
}
@Override
public Collection<V> valueRangeReversed(int startIndex, int endIndex) {
return get(valueRangeReversedAsync(startIndex, endIndex));
}
@Override
public RFuture<Collection<V>> valueRangeReversedAsync(int startIndex, int endIndex) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREVRANGE, getName(), startIndex, endIndex);
}
@Override
public Collection<ScoredEntry<V>> entryRange(int startIndex, int endIndex) {
return get(entryRangeAsync(startIndex, endIndex));
@ -381,6 +392,16 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_ENTRY, getName(), startIndex, endIndex, "WITHSCORES");
}
@Override
public Collection<ScoredEntry<V>> entryRangeReversed(int startIndex, int endIndex) {
return get(entryRangeReversedAsync(startIndex, endIndex));
}
@Override
public RFuture<Collection<ScoredEntry<V>>> entryRangeReversedAsync(int startIndex, int endIndex) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREVRANGE_ENTRY, getName(), startIndex, endIndex, "WITHSCORES");
}
@Override
public Collection<V> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
return get(valueRangeAsync(startScore, startScoreInclusive, endScore, endScoreInclusive));
@ -478,14 +499,133 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return get(revRankAsync(o));
}
@Override
public Long count(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
return get(countAsync(startScore, startScoreInclusive, endScore, endScoreInclusive));
}
@Override
public RFuture<Long> countAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZCOUNT, getName(), startValue, endValue);
}
@Override
public int intersection(String... names) {
return get(intersectionAsync(names));
}
@Override
public RFuture<Integer> intersectionAsync(String... names) {
return intersectionAsync(Aggregate.SUM, names);
}
@Override
public int intersection(Aggregate aggregate, String... names) {
return get(intersectionAsync(aggregate, names));
}
@Override
public RFuture<Integer> intersectionAsync(Aggregate aggregate, String... names) {
List<Object> args = new ArrayList<Object>(names.length + 4);
args.add(getName());
args.add(names.length);
args.addAll(Arrays.asList(names));
args.add("AGGREGATE");
args.add(aggregate.name());
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZINTERSTORE_INT, args.toArray());
}
@Override
public int intersection(Map<String, Double> nameWithWeight) {
return get(intersectionAsync(nameWithWeight));
}
@Override
public RFuture<Integer> intersectionAsync(Map<String, Double> nameWithWeight) {
return intersectionAsync(Aggregate.SUM, nameWithWeight);
}
@Override
public int intersection(Aggregate aggregate, Map<String, Double> nameWithWeight) {
return get(intersectionAsync(aggregate, nameWithWeight));
}
@Override
public RFuture<Integer> intersectionAsync(Aggregate aggregate, Map<String, Double> nameWithWeight) {
List<Object> args = new ArrayList<Object>(nameWithWeight.size()*2 + 5);
args.add(getName());
args.add(nameWithWeight.size());
args.addAll(nameWithWeight.keySet());
args.add("WEIGHTS");
List<String> weights = new ArrayList<String>();
for (Double weight : nameWithWeight.values()) {
weights.add(BigDecimal.valueOf(weight).toPlainString());
}
args.addAll(weights);
args.add("AGGREGATE");
args.add(aggregate.name());
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZINTERSTORE_INT, args.toArray());
}
@Override
public int union(String... names) {
return get(unionAsync(names));
}
@Override
public RFuture<Integer> unionAsync(String... names) {
return unionAsync(Aggregate.SUM, names);
}
@Override
public int union(Aggregate aggregate, String... names) {
return get(unionAsync(aggregate, names));
}
@Override
public RFuture<Integer> unionAsync(Aggregate aggregate, String... names) {
List<Object> args = new ArrayList<Object>(names.length + 4);
args.add(getName());
args.add(names.length);
args.addAll(Arrays.asList(names));
args.add("AGGREGATE");
args.add(aggregate.name());
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZUNIONSTORE_INT, args.toArray());
}
@Override
public int union(Map<String, Double> nameWithWeight) {
return get(unionAsync(nameWithWeight));
}
@Override
public RFuture<Integer> unionAsync(Map<String, Double> nameWithWeight) {
return unionAsync(Aggregate.SUM, nameWithWeight);
}
@Override
public int union(Aggregate aggregate, Map<String, Double> nameWithWeight) {
return get(unionAsync(aggregate, nameWithWeight));
}
@Override
public RFuture<Integer> unionAsync(Aggregate aggregate, Map<String, Double> nameWithWeight) {
List<Object> args = new ArrayList<Object>(nameWithWeight.size()*2 + 5);
args.add(getName());
args.add(nameWithWeight.size());
args.addAll(nameWithWeight.keySet());
args.add("WEIGHTS");
List<String> weights = new ArrayList<String>();
for (Double weight : nameWithWeight.values()) {
weights.add(BigDecimal.valueOf(weight).toPlainString());
}
args.addAll(weights);
args.add("AGGREGATE");
args.add(aggregate.name());
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZUNIONSTORE_INT, args.toArray());
}
}

@ -22,6 +22,12 @@ import org.redisson.client.protocol.ScoredEntry;
public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<V>, RExpirable {
public enum Aggregate {
SUM, MAX, MIN
}
V pollFirst();
V pollLast();
@ -97,8 +103,12 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
Double addScore(V object, Number value);
Collection<V> valueRange(int startIndex, int endIndex);
Collection<V> valueRangeReversed(int startIndex, int endIndex);
Collection<ScoredEntry<V>> entryRange(int startIndex, int endIndex);
Collection<ScoredEntry<V>> entryRangeReversed(int startIndex, int endIndex);
Collection<V> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
@ -131,5 +141,83 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* @return
*/
Collection<V> readAll();
/**
* Intersect provided ScoredSortedSets
* and store result to current ScoredSortedSet
*
* @param names - names of ScoredSortedSet
* @return length of intersection
*/
int intersection(String... names);
/**
* Intersect provided ScoredSortedSets with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param names - names of ScoredSortedSet
* @return length of intersection
*/
int intersection(Aggregate aggregate, String... names);
/**
* Intersect provided ScoredSortedSets mapped to weight multiplier
* and store result to current ScoredSortedSet
*
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of intersection
*/
int intersection(Map<String, Double> nameWithWeight);
/**
* Intersect provided ScoredSortedSets mapped to weight multiplier
* with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of intersection
*/
int intersection(Aggregate aggregate, Map<String, Double> nameWithWeight);
/**
* Union provided ScoredSortedSets
* and store result to current ScoredSortedSet
*
* @param names - names of ScoredSortedSet
* @return length of union
*/
int union(String... names);
/**
* Union provided ScoredSortedSets with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param names - names of ScoredSortedSet
* @return length of union
*/
int union(Aggregate aggregate, String... names);
/**
* Union provided ScoredSortedSets mapped to weight multiplier
* and store result to current ScoredSortedSet
*
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of union
*/
int union(Map<String, Double> nameWithWeight);
/**
* Union provided ScoredSortedSets mapped to weight multiplier
* with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of union
*/
int union(Aggregate aggregate, Map<String, Double> nameWithWeight);
}

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.Collection;
import java.util.Map;
import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.client.protocol.ScoredEntry;
public interface RScoredSortedSetAsync<V> extends RExpirableAsync {
@ -77,8 +78,12 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync {
RFuture<Double> addScoreAsync(V object, Number value);
RFuture<Collection<V>> valueRangeAsync(int startIndex, int endIndex);
RFuture<Collection<V>> valueRangeReversedAsync(int startIndex, int endIndex);
RFuture<Collection<ScoredEntry<V>>> entryRangeAsync(int startIndex, int endIndex);
RFuture<Collection<ScoredEntry<V>>> entryRangeReversedAsync(int startIndex, int endIndex);
RFuture<Collection<V>> valueRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
@ -111,5 +116,83 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync {
* @return
*/
RFuture<Collection<V>> readAllAsync();
/**
* Intersect provided ScoredSortedSets
* and store result to current ScoredSortedSet
*
* @param names - names of ScoredSortedSet
* @return length of intersection
*/
RFuture<Integer> intersectionAsync(String... names);
/**
* Intersect provided ScoredSortedSets with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param names - names of ScoredSortedSet
* @return length of intersection
*/
RFuture<Integer> intersectionAsync(Aggregate aggregate, String... names);
/**
* Intersect provided ScoredSortedSets mapped to weight multiplier
* and store result to current ScoredSortedSet
*
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of intersection
*/
RFuture<Integer> intersectionAsync(Map<String, Double> nameWithWeight);
/**
* Intersect provided ScoredSortedSets mapped to weight multiplier
* with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of intersection
*/
RFuture<Integer> intersectionAsync(Aggregate aggregate, Map<String, Double> nameWithWeight);
/**
* Union provided ScoredSortedSets
* and store result to current ScoredSortedSet
*
* @param names - names of ScoredSortedSet
* @return length of union
*/
RFuture<Integer> unionAsync(String... names);
/**
* Union provided ScoredSortedSets with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param names - names of ScoredSortedSet
* @return length of union
*/
RFuture<Integer> unionAsync(Aggregate aggregate, String... names);
/**
* Union provided ScoredSortedSets mapped to weight multiplier
* and store result to current ScoredSortedSet
*
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of union
*/
RFuture<Integer> unionAsync(Map<String, Double> nameWithWeight);
/**
* Union provided ScoredSortedSets mapped to weight multiplier
* with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of union
*/
RFuture<Integer> unionAsync(Aggregate aggregate, Map<String, Double> nameWithWeight);
}

@ -86,6 +86,8 @@ public interface RedisCommands {
RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor());
RedisStrictCommand<Void> READONLY = new RedisStrictCommand<Void>("READONLY", new VoidReplayConvertor());
RedisStrictCommand<Integer> ZUNIONSTORE_INT = new RedisStrictCommand<Integer>("ZUNIONSTORE", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZINTERSTORE_INT = new RedisStrictCommand<Integer>("ZINTERSTORE", new IntegerReplayConvertor());
RedisCommand<Boolean> ZADD_BOOL = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3);
RedisCommand<Boolean> ZADD_NX_BOOL = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 4);
RedisCommand<Boolean> ZADD_BOOL_RAW = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor());
@ -109,7 +111,9 @@ public interface RedisCommands {
RedisCommand<List<Object>> ZRANGEBYLEX = new RedisCommand<List<Object>>("ZRANGEBYLEX", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> ZRANGEBYSCORE = new RedisCommand<Set<Object>>("ZRANGEBYSCORE", new ObjectSetReplayDecoder<Object>());
RedisCommand<List<Object>> ZRANGEBYSCORE_LIST = new RedisCommand<List<Object>>("ZRANGEBYSCORE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZREVRANGE = new RedisCommand<List<Object>>("ZREVRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZREVRANGEBYSCORE = new RedisCommand<List<Object>>("ZREVRANGEBYSCORE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZREVRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());

@ -114,7 +114,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (RFuture<Void> future : masterFuture.getNow()) {
future.awaitUninterruptibly();
if (!future.isSuccess()) {
continue;
lastException = masterFuture.cause();
}
}
}

@ -542,6 +542,21 @@ public class RedissonScoredSortedSetTest extends BaseTest {
Collection<Integer> vals = set.valueRange(0, -1);
assertThat(vals).containsExactly(1, 2, 3, 4, 5);
}
@Test
public void testValueRangeReversed() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(0, 1);
set.add(1, 2);
set.add(2, 3);
set.add(3, 4);
set.add(4, 5);
set.add(4, 5);
Collection<Integer> vals = set.valueRangeReversed(0, -1);
assertThat(vals).containsExactly(5, 4, 3, 2, 1);
}
@Test
public void testEntryRange() {
@ -559,6 +574,26 @@ public class RedissonScoredSortedSetTest extends BaseTest {
new ScoredEntry<Integer>(40D, 4),
new ScoredEntry<Integer>(50D, 5));
}
@Test
public void testEntryRangeReversed() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(10, 1);
set.add(20, 2);
set.add(30, 3);
set.add(40, 4);
set.add(50, 5);
Collection<ScoredEntry<Integer>> vals = set.entryRangeReversed(0, -1);
assertThat(vals).containsExactly(
new ScoredEntry<Integer>(50D, 5),
new ScoredEntry<Integer>(40D, 4),
new ScoredEntry<Integer>(30D, 3),
new ScoredEntry<Integer>(20D, 2),
new ScoredEntry<Integer>(10D, 1)
);
}
@Test
public void testLexSortedSet() {
@ -754,4 +789,89 @@ public class RedissonScoredSortedSetTest extends BaseTest {
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
}
@Test
public void testIntersection() {
RScoredSortedSet<String> set1 = redisson.getScoredSortedSet("simple1");
set1.add(1, "one");
set1.add(2, "two");
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("simple2");
set2.add(1, "one");
set2.add(2, "two");
set2.add(3, "three");
RScoredSortedSet<String> out = redisson.getScoredSortedSet("out");
assertThat(out.intersection(set1.getName(), set2.getName())).isEqualTo(2);
assertThat(out.readAll()).containsOnly("one", "two");
assertThat(out.getScore("one")).isEqualTo(2);
assertThat(out.getScore("two")).isEqualTo(4);
}
@Test
public void testIntersectionWithWeight() {
RScoredSortedSet<String> set1 = redisson.getScoredSortedSet("simple1");
set1.add(1, "one");
set1.add(2, "two");
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("simple2");
set2.add(1, "one");
set2.add(2, "two");
set2.add(3, "three");
RScoredSortedSet<String> out = redisson.getScoredSortedSet("out");
Map<String, Double> nameWithWeight = new HashMap<>();
nameWithWeight.put(set1.getName(), 2D);
nameWithWeight.put(set2.getName(), 3D);
assertThat(out.intersection(nameWithWeight)).isEqualTo(2);
assertThat(out.readAll()).containsOnly("one", "two");
assertThat(out.getScore("one")).isEqualTo(5);
assertThat(out.getScore("two")).isEqualTo(10);
}
@Test
public void testUnion() {
RScoredSortedSet<String> set1 = redisson.getScoredSortedSet("simple1");
set1.add(1, "one");
set1.add(2, "two");
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("simple2");
set2.add(1, "one");
set2.add(2, "two");
set2.add(3, "three");
RScoredSortedSet<String> out = redisson.getScoredSortedSet("out");
assertThat(out.union(set1.getName(), set2.getName())).isEqualTo(3);
assertThat(out.readAll()).containsOnly("one", "two", "three");
assertThat(out.getScore("one")).isEqualTo(2);
assertThat(out.getScore("two")).isEqualTo(4);
assertThat(out.getScore("three")).isEqualTo(3);
}
@Test
public void testUnionWithWeight() {
RScoredSortedSet<String> set1 = redisson.getScoredSortedSet("simple1");
set1.add(1, "one");
set1.add(2, "two");
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("simple2");
set2.add(1, "one");
set2.add(2, "two");
set2.add(3, "three");
RScoredSortedSet<String> out = redisson.getScoredSortedSet("out");
Map<String, Double> nameWithWeight = new HashMap<>();
nameWithWeight.put(set1.getName(), 2D);
nameWithWeight.put(set2.getName(), 3D);
assertThat(out.union(nameWithWeight)).isEqualTo(3);
assertThat(out.readAll()).containsOnly("one", "two", "three");
assertThat(out.getScore("one")).isEqualTo(5);
assertThat(out.getScore("two")).isEqualTo(10);
assertThat(out.getScore("three")).isEqualTo(9);
}
}

Loading…
Cancel
Save