diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 025905803..b00e22521 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -866,7 +866,65 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc args.add(aggregate.name()); return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.ZINTERSTORE_INT, args.toArray()); } - + + @Override + public Collection readIntersection(String... names) { + return get(readIntersectionAsync(names)); + } + + @Override + public RFuture> readIntersectionAsync(String... names) { + return readIntersectionAsync(Aggregate.SUM, names); + } + + @Override + public Collection readIntersection(Aggregate aggregate, String... names) { + return get(readIntersectionAsync(aggregate, names)); + } + + @Override + public RFuture> readIntersectionAsync(Aggregate aggregate, String... names) { + List args = new ArrayList(names.length + 4); + args.add(names.length + 1); + args.add(getName()); + args.addAll(Arrays.asList(names)); + args.add("AGGREGATE"); + args.add(aggregate.name()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZINTER, args.toArray()); + } + + @Override + public Collection readIntersection(Map nameWithWeight) { + return get(readIntersectionAsync(nameWithWeight)); + } + + @Override + public RFuture> readIntersectionAsync(Map nameWithWeight) { + return readIntersectionAsync(Aggregate.SUM, nameWithWeight); + } + + @Override + public Collection readIntersection(Aggregate aggregate, Map nameWithWeight) { + return get(readIntersectionAsync(aggregate, nameWithWeight)); + } + + @Override + public RFuture> readIntersectionAsync(Aggregate aggregate, Map nameWithWeight) { + List args = new ArrayList(nameWithWeight.size()*2 + 5); + args.add(nameWithWeight.size() + 1); + args.add(getName()); + args.addAll(nameWithWeight.keySet()); + args.add("WEIGHTS"); + List weights = new ArrayList(); + for (Double weight : nameWithWeight.values()) { + weights.add(BigDecimal.valueOf(weight).toPlainString()); + } + args.addAll(weights); + args.add("AGGREGATE"); + args.add(aggregate.name()); + return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZINTER, args.toArray()); + } + @Override public int union(String... names) { return get(unionAsync(names)); @@ -942,8 +1000,9 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc @Override public RFuture> readUnionAsync(Aggregate aggregate, String... names) { - List args = new ArrayList(names.length + 4); - args.add(names.length); + List args = new ArrayList<>(names.length + 4); + args.add(names.length + 1); + args.add(getName()); args.addAll(Arrays.asList(names)); args.add("AGGREGATE"); args.add(aggregate.name()); @@ -968,7 +1027,8 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc @Override public RFuture> readUnionAsync(Aggregate aggregate, Map nameWithWeight) { List args = new ArrayList(nameWithWeight.size()*2 + 5); - args.add(nameWithWeight.size()); + args.add(nameWithWeight.size() + 1); + args.add(getName()); args.addAll(nameWithWeight.keySet()); args.add("WEIGHTS"); List weights = new ArrayList(); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 93d898afc..71405ba17 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -766,6 +766,53 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ int intersection(Aggregate aggregate, Map nameWithWeight); + /** + * Intersect provided ScoredSortedSets + * with current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param names - names of ScoredSortedSet + * @return result of intersection + */ + Collection readIntersection(String... names); + + /** + * Intersect provided ScoredSortedSets with current ScoredSortedSet using defined aggregation method + * without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param aggregate - score aggregation mode + * @param names - names of ScoredSortedSet + * @return result of intersection + */ + Collection readIntersection(Aggregate aggregate, String... names); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return result of intersection + */ + Collection readIntersection(Map nameWithWeight); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with current ScoredSortedSet using defined aggregation method + * without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param aggregate - score aggregation mode + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return result of intersection + */ + Collection readIntersection(Aggregate aggregate, Map nameWithWeight); + /** * Union provided ScoredSortedSets * and store result to current ScoredSortedSet @@ -808,28 +855,34 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< /** * Union ScoredSortedSets specified by name with current ScoredSortedSet * without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param names - names of ScoredSortedSet - * @return length of union + * @return result of union */ Collection readUnion(String... names); /** * Union ScoredSortedSets specified by name with defined aggregation method * and current ScoredSortedSet without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param aggregate - score aggregation mode * @param names - names of ScoredSortedSet - * @return length of union + * @return result of union */ Collection readUnion(Aggregate aggregate, String... names); /** * Union provided ScoredSortedSets mapped to weight multiplier * and current ScoredSortedSet without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier - * @return length of union + * @return result of union */ Collection readUnion(Map nameWithWeight); @@ -837,10 +890,12 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * Union provided ScoredSortedSets mapped to weight multiplier * with defined aggregation method * and current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. * * @param aggregate - score aggregation mode * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier - * @return length of union + * @return result of union */ Collection readUnion(Aggregate aggregate, Map nameWithWeight); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index 1c849505d..aa405f949 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -633,6 +633,53 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn */ RFuture intersectionAsync(Aggregate aggregate, Map nameWithWeight); + /** + * Intersect provided ScoredSortedSets + * with current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param names - names of ScoredSortedSet + * @return result of intersection + */ + RFuture> readIntersectionAsync(String... names); + + /** + * Intersect provided ScoredSortedSets with current ScoredSortedSet using defined aggregation method + * without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param aggregate - score aggregation mode + * @param names - names of ScoredSortedSet + * @return result of intersection + */ + RFuture> readIntersectionAsync(Aggregate aggregate, String... names); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return result of intersection + */ + RFuture> readIntersectionAsync(Map nameWithWeight); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with current ScoredSortedSet using defined aggregation method + * without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param aggregate - score aggregation mode + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return result of intersection + */ + RFuture> readIntersectionAsync(Aggregate aggregate, Map nameWithWeight); + /** * Union provided ScoredSortedSets * and store result to current ScoredSortedSet @@ -675,28 +722,34 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn /** * Union ScoredSortedSets specified by name with current ScoredSortedSet * without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param names - names of ScoredSortedSet - * @return length of union + * @return result of union */ RFuture> readUnionAsync(String... names); /** * Union ScoredSortedSets specified by name with defined aggregation method * and current ScoredSortedSet without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param aggregate - score aggregation mode * @param names - names of ScoredSortedSet - * @return length of union + * @return result of union */ RFuture> readUnionAsync(Aggregate aggregate, String... names); /** * Union provided ScoredSortedSets mapped to weight multiplier * and current ScoredSortedSet without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier - * @return length of union + * @return result of union */ RFuture> readUnionAsync(Map nameWithWeight); @@ -704,10 +757,12 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn * Union provided ScoredSortedSets mapped to weight multiplier * with defined aggregation method * and current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. * * @param aggregate - score aggregation mode * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier - * @return length of union + * @return result of union */ RFuture> readUnionAsync(Aggregate aggregate, Map nameWithWeight); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index c6fc59a52..6ebea73a2 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -656,6 +656,53 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab */ Mono intersection(Aggregate aggregate, Map nameWithWeight); + /** + * Intersect provided ScoredSortedSets + * with current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param names - names of ScoredSortedSet + * @return result of intersection + */ + Mono> readIntersection(String... names); + + /** + * Intersect provided ScoredSortedSets with current ScoredSortedSet using defined aggregation method + * without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param aggregate - score aggregation mode + * @param names - names of ScoredSortedSet + * @return result of intersection + */ + Mono> readIntersection(Aggregate aggregate, String... names); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return result of intersection + */ + Mono> readIntersection(Map nameWithWeight); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with current ScoredSortedSet using defined aggregation method + * without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param aggregate - score aggregation mode + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return result of intersection + */ + Mono> readIntersection(Aggregate aggregate, Map nameWithWeight); + /** * Union provided ScoredSortedSets * and store result to current ScoredSortedSet @@ -698,28 +745,34 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab /** * Union ScoredSortedSets specified by name with current ScoredSortedSet * without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param names - names of ScoredSortedSet - * @return length of union + * @return result of union */ Mono> readUnion(String... names); /** * Union ScoredSortedSets specified by name with defined aggregation method * and current ScoredSortedSet without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param aggregate - score aggregation mode * @param names - names of ScoredSortedSet - * @return length of union + * @return result of union */ Mono> readUnion(Aggregate aggregate, String... names); /** * Union provided ScoredSortedSets mapped to weight multiplier * and current ScoredSortedSet without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier - * @return length of union + * @return result of union */ Mono> readUnion(Map nameWithWeight); @@ -727,10 +780,12 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab * Union provided ScoredSortedSets mapped to weight multiplier * with defined aggregation method * and current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. * * @param aggregate - score aggregation mode * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier - * @return length of union + * @return result of union */ Mono> readUnion(Aggregate aggregate, Map nameWithWeight); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java index c4e7cd0f2..15c5b823f 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java @@ -658,6 +658,53 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> */ Single intersection(Aggregate aggregate, Map nameWithWeight); + /** + * Intersect provided ScoredSortedSets + * with current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param names - names of ScoredSortedSet + * @return result of intersection + */ + Single> readIntersection(String... names); + + /** + * Intersect provided ScoredSortedSets with current ScoredSortedSet using defined aggregation method + * without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param aggregate - score aggregation mode + * @param names - names of ScoredSortedSet + * @return result of intersection + */ + Single> readIntersection(Aggregate aggregate, String... names); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return result of intersection + */ + Single> readIntersection(Map nameWithWeight); + + /** + * Intersect provided ScoredSortedSets mapped to weight multiplier + * with current ScoredSortedSet using defined aggregation method + * without state change + *

+ * Requires Redis 6.2.0 and higher. + * + * @param aggregate - score aggregation mode + * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier + * @return result of intersection + */ + Single> readIntersection(Aggregate aggregate, Map nameWithWeight); + /** * Union provided ScoredSortedSets * and store result to current ScoredSortedSet @@ -700,28 +747,34 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> /** * Union ScoredSortedSets specified by name with current ScoredSortedSet * without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param names - names of ScoredSortedSet - * @return length of union + * @return result of union */ Single> readUnion(String... names); /** * Union ScoredSortedSets specified by name with defined aggregation method * and current ScoredSortedSet without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param aggregate - score aggregation mode * @param names - names of ScoredSortedSet - * @return length of union + * @return result of union */ Single> readUnion(Aggregate aggregate, String... names); /** * Union provided ScoredSortedSets mapped to weight multiplier * and current ScoredSortedSet without state change. + *

+ * Requires Redis 6.2.0 and higher. * * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier - * @return length of union + * @return result of union */ Single> readUnion(Map nameWithWeight); @@ -729,10 +782,12 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> * Union provided ScoredSortedSets mapped to weight multiplier * with defined aggregation method * and current ScoredSortedSet without state change + *

+ * Requires Redis 6.2.0 and higher. * * @param aggregate - score aggregation mode * @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier - * @return length of union + * @return result of union */ Single> readUnion(Aggregate aggregate, Map nameWithWeight); diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 91e4755e4..82508e766 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -95,6 +95,7 @@ public interface RedisCommands { RedisStrictCommand READONLY = new RedisStrictCommand("READONLY", new VoidReplayConvertor()); RedisCommand> ZUNION = new RedisCommand<>("ZUNION", new ObjectListReplayDecoder<>()); + RedisCommand> ZINTER = new RedisCommand<>("ZINTER", new ObjectListReplayDecoder<>()); RedisStrictCommand ZUNIONSTORE_INT = new RedisStrictCommand("ZUNIONSTORE", new IntegerReplayConvertor()); RedisStrictCommand ZINTERSTORE_INT = new RedisStrictCommand("ZINTERSTORE", new IntegerReplayConvertor()); RedisCommand ZADD_BOOL = new RedisCommand("ZADD", new BooleanAmountReplayConvertor()); diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index e64c8de2f..679b33a29 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -1307,6 +1307,22 @@ public class RedissonScoredSortedSetTest extends BaseTest { assertThat(set.revRank("three")).isEqualTo(0); } + @Test + public void testReadIntersection() { + RScoredSortedSet set1 = redisson.getScoredSortedSet("simple1"); + set1.add(1, "one"); + set1.add(2, "two"); + set1.add(2, "four"); + + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + set2.add(1, "one"); + set2.add(2, "two"); + set2.add(3, "three"); + + RScoredSortedSet out = redisson.getScoredSortedSet("simple1"); + assertThat(out.readIntersection(set1.getName(), set2.getName())).containsOnly("one", "two"); + } + @Test public void testIntersection() { RScoredSortedSet set1 = redisson.getScoredSortedSet("simple1"); @@ -1376,7 +1392,7 @@ public class RedissonScoredSortedSetTest extends BaseTest { set2.add(2, "two"); set2.add(3, "three"); - RScoredSortedSet out = redisson.getScoredSortedSet("out"); + RScoredSortedSet out = redisson.getScoredSortedSet("simple1"); assertThat(out.readUnion(set1.getName(), set2.getName())).containsOnly("one", "two", "three", "four"); }