From 069644288ed366de9aff06e254f93328891aea9a Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 14 Feb 2022 12:49:13 +0300 Subject: [PATCH] Feature - added RScoredSortedSet.pollFirstFromAny() pollLastFromAny() methods with timeout and count #4116 --- .../org/redisson/RedissonScoredSortedSet.java | 38 ++++++++++++++++- .../org/redisson/api/RScoredSortedSet.java | 34 +++++++++++++-- .../redisson/api/RScoredSortedSetAsync.java | 36 ++++++++++++++-- .../api/RScoredSortedSetReactive.java | 36 ++++++++++++++-- .../org/redisson/api/RScoredSortedSetRx.java | 36 ++++++++++++++-- .../redisson/RedissonScoredSortedSetTest.java | 42 +++++++++++++++++++ 6 files changed, 206 insertions(+), 16 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index df9a6371a..c6c57723c 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -151,6 +151,24 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BZPOPMIN_VALUE, toSeconds(timeout, unit), queueNames); } + @Override + public List pollFirstFromAny(Duration duration, int count, String... queueNames) { + return get(pollFirstFromAnyAsync(duration, count, queueNames)); + } + + @Override + public RFuture> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { + List params = new ArrayList<>(); + params.add(duration.getSeconds()); + params.add(queueNames.length + 1); + params.add(getName()); + params.addAll(Arrays.asList(queueNames)); + params.add("MIN"); + params.add("COUNT"); + params.add(count); + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_SINGLE_LIST, params.toArray()); + } + @Override public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) { return get(pollLastFromAnyAsync(timeout, unit, queueNames)); @@ -160,7 +178,25 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc public RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BZPOPMAX_VALUE, toSeconds(timeout, unit), queueNames); } - + + @Override + public List pollLastFromAny(Duration duration, int count, String... queueNames) { + return get(pollLastFromAnyAsync(duration, count, queueNames)); + } + + @Override + public RFuture> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) { + List params = new ArrayList<>(); + params.add(duration.getSeconds()); + params.add(queueNames.length + 1); + params.add(getName()); + params.addAll(Arrays.asList(queueNames)); + params.add("MAX"); + params.add("COUNT"); + params.add(count); + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_SINGLE_LIST, params.toArray()); + } + @Override public V pollLast(long timeout, TimeUnit unit) { return get(pollLastAsync(timeout, unit)); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index 3fde6036e..846db7eaa 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -59,7 +59,7 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< *

* Requires Redis 5.0.0 and higher. * - * @param queueNames - names of queue + * @param queueNames name of queues * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the @@ -67,7 +67,21 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * @return the tail element, or {@code null} if all sorted sets are empty */ V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames); - + + /** + * Removes and returns first available tail elements of any sorted set, + * waiting up to the specified wait time if necessary for elements to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the tail elements + */ + List pollLastFromAny(Duration duration, int count, String... queueNames); + /** * Removes and returns first available head element of any sorted set, * waiting up to the specified wait time if necessary for an element to become available @@ -75,7 +89,7 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< *

* Requires Redis 5.0.0 and higher. * - * @param queueNames - names of queue + * @param queueNames name of queues * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the @@ -84,6 +98,20 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames); + /** + * Removes and returns first available head elements of any sorted set, + * waiting up to the specified wait time if necessary for elements to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the head elements + */ + List pollFirstFromAny(Duration duration, int count, String... queueNames); + /** * Removes and returns the head element waiting if necessary for an element to become available. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index 7d7ddad1e..97c742140 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -40,7 +40,7 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn *

* Requires Redis 5.0.0 and higher. * - * @param queueNames - names of queue + * @param queueNames name of queues * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the @@ -48,7 +48,21 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn * @return the tail element, or {@code null} if all sorted sets are empty */ RFuture pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames); - + + /** + * Removes and returns first available tail elements of any sorted set, + * waiting up to the specified wait time if necessary for elements to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the tail elements + */ + RFuture> pollLastFromAnyAsync(Duration duration, int count, String... queueNames); + /** * Removes and returns first available head element of any sorted set, * waiting up to the specified wait time if necessary for an element to become available @@ -56,7 +70,7 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn *

* Requires Redis 5.0.0 and higher. * - * @param queueNames - names of queue + * @param queueNames name of queues * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the @@ -65,7 +79,21 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn * */ RFuture pollFirstFromAnyAsync(long timeout, TimeUnit unit, String... queueNames); - + + /** + * Removes and returns first available head elements of any sorted set, + * waiting up to the specified wait time if necessary for elements to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the head elements + */ + RFuture> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. *

diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index 78f29eacf..ead16634b 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -43,7 +43,7 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab *

* Requires Redis 5.0.0 and higher. * - * @param queueNames - names of queue + * @param queueNames name of queues * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the @@ -51,7 +51,21 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab * @return the tail element, or {@code null} if all sorted sets are empty */ Mono pollLastFromAny(long timeout, TimeUnit unit, String... queueNames); - + + /** + * Removes and returns first available tail elements of any sorted set, + * waiting up to the specified wait time if necessary for elements to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the tail elements + */ + Mono> pollLastFromAny(Duration duration, int count, String... queueNames); + /** * Removes and returns first available head element of any sorted set, * waiting up to the specified wait time if necessary for an element to become available @@ -59,7 +73,7 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab *

* Requires Redis 5.0.0 and higher. * - * @param queueNames - names of queue + * @param queueNames name of queues * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the @@ -68,7 +82,21 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab * */ Mono pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames); - + + /** + * Removes and returns first available head elements of any sorted set, + * waiting up to the specified wait time if necessary for elements to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the head elements + */ + Mono> pollFirstFromAny(Duration duration, int count, String... queueNames); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. *

diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java index 748f4c931..e97751ac8 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java @@ -44,7 +44,7 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> *

* Requires Redis 5.0.0 and higher. * - * @param queueNames - names of queue + * @param queueNames name of queues * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the @@ -52,7 +52,21 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> * @return the tail element, or {@code null} if all sorted sets are empty */ Maybe pollLastFromAny(long timeout, TimeUnit unit, String... queueNames); - + + /** + * Removes and returns first available tail elements of any sorted set, + * waiting up to the specified wait time if necessary for elements to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the tail elements + */ + Single> pollLastFromAny(Duration duration, int count, String... queueNames); + /** * Removes and returns first available head element of any sorted set, * waiting up to the specified wait time if necessary for an element to become available @@ -60,7 +74,7 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> *

* Requires Redis 5.0.0 and higher. * - * @param queueNames - names of queue + * @param queueNames name of queues * @param timeout how long to wait before giving up, in units of * {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the @@ -69,7 +83,21 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> * */ Maybe pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames); - + + /** + * Removes and returns first available head elements of any sorted set, + * waiting up to the specified wait time if necessary for elements to become available + * in any of defined sorted sets including this one. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the head elements + */ + Single> pollFirstFromAny(Duration duration, int count, String... queueNames); + /** * Removes and returns the head element or {@code null} if this sorted set is empty. *

diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index 41bcc3660..0f1fb6ea0 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -79,6 +79,48 @@ public class RedissonScoredSortedSetTest extends BaseTest { Assertions.assertTrue(System.currentTimeMillis() - s > 2000); } + @Test + public void testPollFirstFromAnyCount() { +// Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0); + + RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue1.add(0.1, 1); + queue1.add(0.2, 2); + queue1.add(0.3, 3); + queue2.add(0.4, 4); + queue2.add(0.5, 5); + queue2.add(0.6, 6); + queue3.add(0.7, 7); + queue3.add(0.8, 8); + queue3.add(0.9, 9); + + List elements = queue1.pollFirstFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2"); + assertThat(elements).containsExactly(1, 2); + } + + @Test + public void testPollLastFromAnyCount() { +// Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0); + + RScoredSortedSet queue1 = redisson.getScoredSortedSet("queue:pollany"); + RScoredSortedSet queue2 = redisson.getScoredSortedSet("queue:pollany1"); + RScoredSortedSet queue3 = redisson.getScoredSortedSet("queue:pollany2"); + queue1.add(0.1, 1); + queue1.add(0.2, 2); + queue1.add(0.3, 3); + queue2.add(0.4, 4); + queue2.add(0.5, 5); + queue2.add(0.6, 6); + queue3.add(0.7, 7); + queue3.add(0.8, 8); + queue3.add(0.9, 9); + + List elements = queue1.pollLastFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2"); + assertThat(elements).containsExactly(3, 2); + } + @Test public void testPollLastFromAny() throws InterruptedException { Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("5.0.0") > 0);