diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 12cf4f771..21d39c465 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -31,6 +31,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RScoredSortedSet; import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.ScoredCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -371,6 +372,16 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE, getName(), startIndex, endIndex); } + @Override + public Collection valueRangeReversed(int startIndex, int endIndex) { + return get(valueRangeReversedAsync(startIndex, endIndex)); + } + + @Override + public RFuture> valueRangeReversedAsync(int startIndex, int endIndex) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREVRANGE, getName(), startIndex, endIndex); + } + @Override public Collection> entryRange(int startIndex, int endIndex) { return get(entryRangeAsync(startIndex, endIndex)); @@ -381,6 +392,16 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_ENTRY, getName(), startIndex, endIndex, "WITHSCORES"); } + @Override + public Collection> entryRangeReversed(int startIndex, int endIndex) { + return get(entryRangeReversedAsync(startIndex, endIndex)); + } + + @Override + public RFuture>> entryRangeReversedAsync(int startIndex, int endIndex) { + return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREVRANGE_ENTRY, getName(), startIndex, endIndex, "WITHSCORES"); + } + @Override public Collection valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) { return get(valueRangeAsync(startScore, startScoreInclusive, endScore, endScoreInclusive)); @@ -478,14 +499,133 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return get(revRankAsync(o)); } + @Override public Long count(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) { return get(countAsync(startScore, startScoreInclusive, endScore, endScoreInclusive)); } + @Override public RFuture countAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) { String startValue = value(startScore, startScoreInclusive); String endValue = value(endScore, endScoreInclusive); return commandExecutor.readAsync(getName(), codec, RedisCommands.ZCOUNT, getName(), startValue, endValue); } + @Override + public int intersection(String... names) { + return get(intersectionAsync(names)); + } + + @Override + public RFuture intersectionAsync(String... names) { + return intersectionAsync(Aggregate.SUM, names); + } + + @Override + public int intersection(Aggregate aggregate, String... names) { + return get(intersectionAsync(aggregate, names)); + } + + @Override + public RFuture intersectionAsync(Aggregate aggregate, String... names) { + List args = new ArrayList(names.length + 4); + args.add(getName()); + args.add(names.length); + args.addAll(Arrays.asList(names)); + args.add("AGGREGATE"); + args.add(aggregate.name()); + return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZINTERSTORE_INT, args.toArray()); + } + + @Override + public int intersection(Map nameWithWeight) { + return get(intersectionAsync(nameWithWeight)); + } + + @Override + public RFuture intersectionAsync(Map nameWithWeight) { + return intersectionAsync(Aggregate.SUM, nameWithWeight); + } + + @Override + public int intersection(Aggregate aggregate, Map nameWithWeight) { + return get(intersectionAsync(aggregate, nameWithWeight)); + } + + @Override + public RFuture intersectionAsync(Aggregate aggregate, Map nameWithWeight) { + List args = new ArrayList(nameWithWeight.size()*2 + 5); + args.add(getName()); + args.add(nameWithWeight.size()); + args.addAll(nameWithWeight.keySet()); + args.add("WEIGHTS"); + List weights = new ArrayList(); + for (Double weight : nameWithWeight.values()) { + weights.add(BigDecimal.valueOf(weight).toPlainString()); + } + args.addAll(weights); + args.add("AGGREGATE"); + args.add(aggregate.name()); + return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZINTERSTORE_INT, args.toArray()); + } + + @Override + public int union(String... names) { + return get(unionAsync(names)); + } + + @Override + public RFuture unionAsync(String... names) { + return unionAsync(Aggregate.SUM, names); + } + + @Override + public int union(Aggregate aggregate, String... names) { + return get(unionAsync(aggregate, names)); + } + + @Override + public RFuture unionAsync(Aggregate aggregate, String... names) { + List args = new ArrayList(names.length + 4); + args.add(getName()); + args.add(names.length); + args.addAll(Arrays.asList(names)); + args.add("AGGREGATE"); + args.add(aggregate.name()); + return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZUNIONSTORE_INT, args.toArray()); + } + + @Override + public int union(Map nameWithWeight) { + return get(unionAsync(nameWithWeight)); + } + + @Override + public RFuture unionAsync(Map nameWithWeight) { + return unionAsync(Aggregate.SUM, nameWithWeight); + } + + @Override + public int union(Aggregate aggregate, Map nameWithWeight) { + return get(unionAsync(aggregate, nameWithWeight)); + } + + @Override + public RFuture unionAsync(Aggregate aggregate, Map nameWithWeight) { + List args = new ArrayList(nameWithWeight.size()*2 + 5); + args.add(getName()); + args.add(nameWithWeight.size()); + args.addAll(nameWithWeight.keySet()); + args.add("WEIGHTS"); + List weights = new ArrayList(); + for (Double weight : nameWithWeight.values()) { + weights.add(BigDecimal.valueOf(weight).toPlainString()); + } + args.addAll(weights); + args.add("AGGREGATE"); + args.add(aggregate.name()); + return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZUNIONSTORE_INT, args.toArray()); + } + + } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index ec2d0df92..4d5d3e716 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -22,6 +22,12 @@ import org.redisson.client.protocol.ScoredEntry; public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable, RExpirable { + public enum Aggregate { + + SUM, MAX, MIN + + } + V pollFirst(); V pollLast(); @@ -97,8 +103,12 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< Double addScore(V object, Number value); Collection valueRange(int startIndex, int endIndex); + + Collection valueRangeReversed(int startIndex, int endIndex); Collection> entryRange(int startIndex, int endIndex); + + Collection> entryRangeReversed(int startIndex, int endIndex); Collection valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive); @@ -131,5 +141,83 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * @return */ Collection readAll(); + + /** + * Intersect provided ScoredSortedSets + * and store result to current ScoredSortedSet + * + * @param names - names of ScoredSortedSet + * @return length of intersection + */ + int intersection(String... names); + + /** + * Intersect provided ScoredSortedSets with defined aggregation method + * and store result to current ScoredSortedSet + * + * @param aggregate - score aggregation mode + * @param names - names of ScoredSortedSet + * @return length of intersection + */ + int intersection(Aggregate aggregate, String... names); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * and store result to current ScoredSortedSet + * + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return length of intersection + */ + int intersection(Map nameWithWeight); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with defined aggregation method + * and store result to current ScoredSortedSet + * + * @param aggregate - score aggregation mode + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return length of intersection + */ + int intersection(Aggregate aggregate, Map nameWithWeight); + + /** + * Union provided ScoredSortedSets + * and store result to current ScoredSortedSet + * + * @param names - names of ScoredSortedSet + * @return length of union + */ + int union(String... names); + + /** + * Union provided ScoredSortedSets with defined aggregation method + * and store result to current ScoredSortedSet + * + * @param aggregate - score aggregation mode + * @param names - names of ScoredSortedSet + * @return length of union + */ + int union(Aggregate aggregate, String... names); + + /** + * Union provided ScoredSortedSets mapped to weight multiplier + * and store result to current ScoredSortedSet + * + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return length of union + */ + int union(Map nameWithWeight); + + /** + * Union provided ScoredSortedSets mapped to weight multiplier + * with defined aggregation method + * and store result to current ScoredSortedSet + * + * @param aggregate - score aggregation mode + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return length of union + */ + int union(Aggregate aggregate, Map nameWithWeight); } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index 990cb45be..f4649fcbd 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Map; +import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.client.protocol.ScoredEntry; public interface RScoredSortedSetAsync extends RExpirableAsync { @@ -77,8 +78,12 @@ public interface RScoredSortedSetAsync extends RExpirableAsync { RFuture addScoreAsync(V object, Number value); RFuture> valueRangeAsync(int startIndex, int endIndex); + + RFuture> valueRangeReversedAsync(int startIndex, int endIndex); RFuture>> entryRangeAsync(int startIndex, int endIndex); + + RFuture>> entryRangeReversedAsync(int startIndex, int endIndex); RFuture> valueRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive); @@ -111,5 +116,83 @@ public interface RScoredSortedSetAsync extends RExpirableAsync { * @return */ RFuture> readAllAsync(); + + /** + * Intersect provided ScoredSortedSets + * and store result to current ScoredSortedSet + * + * @param names - names of ScoredSortedSet + * @return length of intersection + */ + RFuture intersectionAsync(String... names); + + /** + * Intersect provided ScoredSortedSets with defined aggregation method + * and store result to current ScoredSortedSet + * + * @param aggregate - score aggregation mode + * @param names - names of ScoredSortedSet + * @return length of intersection + */ + RFuture intersectionAsync(Aggregate aggregate, String... names); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * and store result to current ScoredSortedSet + * + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return length of intersection + */ + RFuture intersectionAsync(Map nameWithWeight); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with defined aggregation method + * and store result to current ScoredSortedSet + * + * @param aggregate - score aggregation mode + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return length of intersection + */ + RFuture intersectionAsync(Aggregate aggregate, Map nameWithWeight); + + /** + * Union provided ScoredSortedSets + * and store result to current ScoredSortedSet + * + * @param names - names of ScoredSortedSet + * @return length of union + */ + RFuture unionAsync(String... names); + + /** + * Union provided ScoredSortedSets with defined aggregation method + * and store result to current ScoredSortedSet + * + * @param aggregate - score aggregation mode + * @param names - names of ScoredSortedSet + * @return length of union + */ + RFuture unionAsync(Aggregate aggregate, String... names); + + /** + * Union provided ScoredSortedSets mapped to weight multiplier + * and store result to current ScoredSortedSet + * + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return length of union + */ + RFuture unionAsync(Map nameWithWeight); + + /** + * Union provided ScoredSortedSets mapped to weight multiplier + * with defined aggregation method + * and store result to current ScoredSortedSet + * + * @param aggregate - score aggregation mode + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return length of union + */ + RFuture unionAsync(Aggregate aggregate, Map nameWithWeight); } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index f29885278..0c1ea7833 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -86,6 +86,8 @@ public interface RedisCommands { RedisStrictCommand ASKING = new RedisStrictCommand("ASKING", new VoidReplayConvertor()); RedisStrictCommand READONLY = new RedisStrictCommand("READONLY", new VoidReplayConvertor()); + RedisStrictCommand ZUNIONSTORE_INT = new RedisStrictCommand("ZUNIONSTORE", new IntegerReplayConvertor()); + RedisStrictCommand ZINTERSTORE_INT = new RedisStrictCommand("ZINTERSTORE", new IntegerReplayConvertor()); RedisCommand ZADD_BOOL = new RedisCommand("ZADD", new BooleanAmountReplayConvertor(), 3); RedisCommand ZADD_NX_BOOL = new RedisCommand("ZADD", new BooleanAmountReplayConvertor(), 4); RedisCommand ZADD_BOOL_RAW = new RedisCommand("ZADD", new BooleanAmountReplayConvertor()); @@ -109,7 +111,9 @@ public interface RedisCommands { RedisCommand> ZRANGEBYLEX = new RedisCommand>("ZRANGEBYLEX", new ObjectListReplayDecoder()); RedisCommand> ZRANGEBYSCORE = new RedisCommand>("ZRANGEBYSCORE", new ObjectSetReplayDecoder()); RedisCommand> ZRANGEBYSCORE_LIST = new RedisCommand>("ZRANGEBYSCORE", new ObjectListReplayDecoder()); + RedisCommand> ZREVRANGE = new RedisCommand>("ZREVRANGE", new ObjectListReplayDecoder()); RedisCommand> ZREVRANGEBYSCORE = new RedisCommand>("ZREVRANGEBYSCORE", new ObjectListReplayDecoder()); + RedisCommand>> ZREVRANGE_ENTRY = new RedisCommand>>("ZREVRANGE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZRANGE_ENTRY = new RedisCommand>>("ZRANGE", new ScoredSortedSetReplayDecoder()); RedisCommand>> ZRANGEBYSCORE_ENTRY = new RedisCommand>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder()); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index c292a153b..02e321387 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -114,7 +114,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { for (RFuture future : masterFuture.getNow()) { future.awaitUninterruptibly(); if (!future.isSuccess()) { - continue; + lastException = masterFuture.cause(); } } } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index 22176b6dc..181f431da 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -542,6 +542,21 @@ public class RedissonScoredSortedSetTest extends BaseTest { Collection vals = set.valueRange(0, -1); assertThat(vals).containsExactly(1, 2, 3, 4, 5); } + + @Test + public void testValueRangeReversed() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + set.add(0, 1); + set.add(1, 2); + set.add(2, 3); + set.add(3, 4); + set.add(4, 5); + set.add(4, 5); + + Collection vals = set.valueRangeReversed(0, -1); + assertThat(vals).containsExactly(5, 4, 3, 2, 1); + } + @Test public void testEntryRange() { @@ -559,6 +574,26 @@ public class RedissonScoredSortedSetTest extends BaseTest { new ScoredEntry(40D, 4), new ScoredEntry(50D, 5)); } + + @Test + public void testEntryRangeReversed() { + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + set.add(10, 1); + set.add(20, 2); + set.add(30, 3); + set.add(40, 4); + set.add(50, 5); + + Collection> vals = set.entryRangeReversed(0, -1); + assertThat(vals).containsExactly( + new ScoredEntry(50D, 5), + new ScoredEntry(40D, 4), + new ScoredEntry(30D, 3), + new ScoredEntry(20D, 2), + new ScoredEntry(10D, 1) + ); + } + @Test public void testLexSortedSet() { @@ -754,4 +789,89 @@ public class RedissonScoredSortedSetTest extends BaseTest { Assert.assertTrue(new Double(112.3).compareTo(res2) == 0); } + @Test + public void testIntersection() { + RScoredSortedSet set1 = redisson.getScoredSortedSet("simple1"); + set1.add(1, "one"); + set1.add(2, "two"); + + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + set2.add(1, "one"); + set2.add(2, "two"); + set2.add(3, "three"); + + RScoredSortedSet out = redisson.getScoredSortedSet("out"); + assertThat(out.intersection(set1.getName(), set2.getName())).isEqualTo(2); + + assertThat(out.readAll()).containsOnly("one", "two"); + assertThat(out.getScore("one")).isEqualTo(2); + assertThat(out.getScore("two")).isEqualTo(4); + } + + @Test + public void testIntersectionWithWeight() { + RScoredSortedSet set1 = redisson.getScoredSortedSet("simple1"); + set1.add(1, "one"); + set1.add(2, "two"); + + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + set2.add(1, "one"); + set2.add(2, "two"); + set2.add(3, "three"); + + RScoredSortedSet out = redisson.getScoredSortedSet("out"); + Map nameWithWeight = new HashMap<>(); + nameWithWeight.put(set1.getName(), 2D); + nameWithWeight.put(set2.getName(), 3D); + assertThat(out.intersection(nameWithWeight)).isEqualTo(2); + + assertThat(out.readAll()).containsOnly("one", "two"); + assertThat(out.getScore("one")).isEqualTo(5); + assertThat(out.getScore("two")).isEqualTo(10); + } + + @Test + public void testUnion() { + RScoredSortedSet set1 = redisson.getScoredSortedSet("simple1"); + set1.add(1, "one"); + set1.add(2, "two"); + + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + set2.add(1, "one"); + set2.add(2, "two"); + set2.add(3, "three"); + + RScoredSortedSet out = redisson.getScoredSortedSet("out"); + assertThat(out.union(set1.getName(), set2.getName())).isEqualTo(3); + + assertThat(out.readAll()).containsOnly("one", "two", "three"); + assertThat(out.getScore("one")).isEqualTo(2); + assertThat(out.getScore("two")).isEqualTo(4); + assertThat(out.getScore("three")).isEqualTo(3); + } + + @Test + public void testUnionWithWeight() { + RScoredSortedSet set1 = redisson.getScoredSortedSet("simple1"); + set1.add(1, "one"); + set1.add(2, "two"); + + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + set2.add(1, "one"); + set2.add(2, "two"); + set2.add(3, "three"); + + RScoredSortedSet out = redisson.getScoredSortedSet("out"); + Map nameWithWeight = new HashMap<>(); + nameWithWeight.put(set1.getName(), 2D); + nameWithWeight.put(set2.getName(), 3D); + assertThat(out.union(nameWithWeight)).isEqualTo(3); + + assertThat(out.readAll()).containsOnly("one", "two", "three"); + assertThat(out.getScore("one")).isEqualTo(5); + assertThat(out.getScore("two")).isEqualTo(10); + assertThat(out.getScore("three")).isEqualTo(9); + } + + }