Feature - added RScoredSortedSet.pollFirstFromAny() pollLastFromAny() methods with timeout and count #4116

pull/4162/head
Nikita Koksharov 3 years ago
parent 0014642ac3
commit 069644288e

@ -151,6 +151,24 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BZPOPMIN_VALUE, toSeconds(timeout, unit), queueNames); return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BZPOPMIN_VALUE, toSeconds(timeout, unit), queueNames);
} }
@Override
public List<V> pollFirstFromAny(Duration duration, int count, String... queueNames) {
return get(pollFirstFromAnyAsync(duration, count, queueNames));
}
@Override
public RFuture<List<V>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
List<Object> params = new ArrayList<>();
params.add(duration.getSeconds());
params.add(queueNames.length + 1);
params.add(getName());
params.addAll(Arrays.asList(queueNames));
params.add("MIN");
params.add("COUNT");
params.add(count);
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_SINGLE_LIST, params.toArray());
}
@Override @Override
public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) { public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) {
return get(pollLastFromAnyAsync(timeout, unit, queueNames)); return get(pollLastFromAnyAsync(timeout, unit, queueNames));
@ -160,7 +178,25 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BZPOPMAX_VALUE, toSeconds(timeout, unit), queueNames); return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BZPOPMAX_VALUE, toSeconds(timeout, unit), queueNames);
} }
@Override
public List<V> pollLastFromAny(Duration duration, int count, String... queueNames) {
return get(pollLastFromAnyAsync(duration, count, queueNames));
}
@Override
public RFuture<List<V>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
List<Object> params = new ArrayList<>();
params.add(duration.getSeconds());
params.add(queueNames.length + 1);
params.add(getName());
params.addAll(Arrays.asList(queueNames));
params.add("MAX");
params.add("COUNT");
params.add(count);
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_SINGLE_LIST, params.toArray());
}
@Override @Override
public V pollLast(long timeout, TimeUnit unit) { public V pollLast(long timeout, TimeUnit unit) {
return get(pollLastAsync(timeout, unit)); return get(pollLastAsync(timeout, unit));

@ -59,7 +59,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* <p> * <p>
* Requires <b>Redis 5.0.0 and higher.</b> * Requires <b>Redis 5.0.0 and higher.</b>
* *
* @param queueNames - names of queue * @param queueNames name of queues
* @param timeout how long to wait before giving up, in units of * @param timeout how long to wait before giving up, in units of
* {@code unit} * {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the * @param unit a {@code TimeUnit} determining how to interpret the
@ -67,7 +67,21 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* @return the tail element, or {@code null} if all sorted sets are empty * @return the tail element, or {@code null} if all sorted sets are empty
*/ */
V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames); V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Removes and returns first available tail elements of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for elements to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the tail elements
*/
List<V> pollLastFromAny(Duration duration, int count, String... queueNames);
/** /**
* Removes and returns first available head element of <b>any</b> sorted set, * Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available * waiting up to the specified wait time if necessary for an element to become available
@ -75,7 +89,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* <p> * <p>
* Requires <b>Redis 5.0.0 and higher.</b> * Requires <b>Redis 5.0.0 and higher.</b>
* *
* @param queueNames - names of queue * @param queueNames name of queues
* @param timeout how long to wait before giving up, in units of * @param timeout how long to wait before giving up, in units of
* {@code unit} * {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the * @param unit a {@code TimeUnit} determining how to interpret the
@ -84,6 +98,20 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/ */
V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames); V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Removes and returns first available head elements of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for elements to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the head elements
*/
List<V> pollFirstFromAny(Duration duration, int count, String... queueNames);
/** /**
* Removes and returns the head element waiting if necessary for an element to become available. * Removes and returns the head element waiting if necessary for an element to become available.
* *

@ -40,7 +40,7 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
* <p> * <p>
* Requires <b>Redis 5.0.0 and higher.</b> * Requires <b>Redis 5.0.0 and higher.</b>
* *
* @param queueNames - names of queue * @param queueNames name of queues
* @param timeout how long to wait before giving up, in units of * @param timeout how long to wait before giving up, in units of
* {@code unit} * {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the * @param unit a {@code TimeUnit} determining how to interpret the
@ -48,7 +48,21 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
* @return the tail element, or {@code null} if all sorted sets are empty * @return the tail element, or {@code null} if all sorted sets are empty
*/ */
RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames); RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/**
* Removes and returns first available tail elements of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for elements to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the tail elements
*/
RFuture<List<V>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames);
/** /**
* Removes and returns first available head element of <b>any</b> sorted set, * Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available * waiting up to the specified wait time if necessary for an element to become available
@ -56,7 +70,7 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
* <p> * <p>
* Requires <b>Redis 5.0.0 and higher.</b> * Requires <b>Redis 5.0.0 and higher.</b>
* *
* @param queueNames - names of queue * @param queueNames name of queues
* @param timeout how long to wait before giving up, in units of * @param timeout how long to wait before giving up, in units of
* {@code unit} * {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the * @param unit a {@code TimeUnit} determining how to interpret the
@ -65,7 +79,21 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
* *
*/ */
RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String... queueNames); RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/**
* Removes and returns first available head elements of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for elements to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the head elements
*/
RFuture<List<V>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames);
/** /**
* Removes and returns the head element or {@code null} if this sorted set is empty. * Removes and returns the head element or {@code null} if this sorted set is empty.
* <p> * <p>

@ -43,7 +43,7 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
* <p> * <p>
* Requires <b>Redis 5.0.0 and higher.</b> * Requires <b>Redis 5.0.0 and higher.</b>
* *
* @param queueNames - names of queue * @param queueNames name of queues
* @param timeout how long to wait before giving up, in units of * @param timeout how long to wait before giving up, in units of
* {@code unit} * {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the * @param unit a {@code TimeUnit} determining how to interpret the
@ -51,7 +51,21 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
* @return the tail element, or {@code null} if all sorted sets are empty * @return the tail element, or {@code null} if all sorted sets are empty
*/ */
Mono<V> pollLastFromAny(long timeout, TimeUnit unit, String... queueNames); Mono<V> pollLastFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Removes and returns first available tail elements of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for elements to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the tail elements
*/
Mono<List<V>> pollLastFromAny(Duration duration, int count, String... queueNames);
/** /**
* Removes and returns first available head element of <b>any</b> sorted set, * Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available * waiting up to the specified wait time if necessary for an element to become available
@ -59,7 +73,7 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
* <p> * <p>
* Requires <b>Redis 5.0.0 and higher.</b> * Requires <b>Redis 5.0.0 and higher.</b>
* *
* @param queueNames - names of queue * @param queueNames name of queues
* @param timeout how long to wait before giving up, in units of * @param timeout how long to wait before giving up, in units of
* {@code unit} * {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the * @param unit a {@code TimeUnit} determining how to interpret the
@ -68,7 +82,21 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
* *
*/ */
Mono<V> pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames); Mono<V> pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Removes and returns first available head elements of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for elements to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the head elements
*/
Mono<List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames);
/** /**
* Removes and returns the head element or {@code null} if this sorted set is empty. * Removes and returns the head element or {@code null} if this sorted set is empty.
* <p> * <p>

@ -44,7 +44,7 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
* <p> * <p>
* Requires <b>Redis 5.0.0 and higher.</b> * Requires <b>Redis 5.0.0 and higher.</b>
* *
* @param queueNames - names of queue * @param queueNames name of queues
* @param timeout how long to wait before giving up, in units of * @param timeout how long to wait before giving up, in units of
* {@code unit} * {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the * @param unit a {@code TimeUnit} determining how to interpret the
@ -52,7 +52,21 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
* @return the tail element, or {@code null} if all sorted sets are empty * @return the tail element, or {@code null} if all sorted sets are empty
*/ */
Maybe<V> pollLastFromAny(long timeout, TimeUnit unit, String... queueNames); Maybe<V> pollLastFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Removes and returns first available tail elements of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for elements to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the tail elements
*/
Single<List<V>> pollLastFromAny(Duration duration, int count, String... queueNames);
/** /**
* Removes and returns first available head element of <b>any</b> sorted set, * Removes and returns first available head element of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for an element to become available * waiting up to the specified wait time if necessary for an element to become available
@ -60,7 +74,7 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
* <p> * <p>
* Requires <b>Redis 5.0.0 and higher.</b> * Requires <b>Redis 5.0.0 and higher.</b>
* *
* @param queueNames - names of queue * @param queueNames name of queues
* @param timeout how long to wait before giving up, in units of * @param timeout how long to wait before giving up, in units of
* {@code unit} * {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the * @param unit a {@code TimeUnit} determining how to interpret the
@ -69,7 +83,21 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
* *
*/ */
Maybe<V> pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames); Maybe<V> pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Removes and returns first available head elements of <b>any</b> sorted set,
* waiting up to the specified wait time if necessary for elements to become available
* in any of defined sorted sets <b>including</b> this one.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the head elements
*/
Single<List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames);
/** /**
* Removes and returns the head element or {@code null} if this sorted set is empty. * Removes and returns the head element or {@code null} if this sorted set is empty.
* <p> * <p>

@ -79,6 +79,48 @@ public class RedissonScoredSortedSetTest extends BaseTest {
Assertions.assertTrue(System.currentTimeMillis() - s > 2000); Assertions.assertTrue(System.currentTimeMillis() - s > 2000);
} }
@Test
public void testPollFirstFromAnyCount() {
// Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);
RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");
RScoredSortedSet<Integer> queue2 = redisson.getScoredSortedSet("queue:pollany1");
RScoredSortedSet<Integer> queue3 = redisson.getScoredSortedSet("queue:pollany2");
queue1.add(0.1, 1);
queue1.add(0.2, 2);
queue1.add(0.3, 3);
queue2.add(0.4, 4);
queue2.add(0.5, 5);
queue2.add(0.6, 6);
queue3.add(0.7, 7);
queue3.add(0.8, 8);
queue3.add(0.9, 9);
List<Integer> elements = queue1.pollFirstFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2");
assertThat(elements).containsExactly(1, 2);
}
@Test
public void testPollLastFromAnyCount() {
// Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);
RScoredSortedSet<Integer> queue1 = redisson.getScoredSortedSet("queue:pollany");
RScoredSortedSet<Integer> queue2 = redisson.getScoredSortedSet("queue:pollany1");
RScoredSortedSet<Integer> queue3 = redisson.getScoredSortedSet("queue:pollany2");
queue1.add(0.1, 1);
queue1.add(0.2, 2);
queue1.add(0.3, 3);
queue2.add(0.4, 4);
queue2.add(0.5, 5);
queue2.add(0.6, 6);
queue3.add(0.7, 7);
queue3.add(0.8, 8);
queue3.add(0.9, 9);
List<Integer> elements = queue1.pollLastFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2");
assertThat(elements).containsExactly(3, 2);
}
@Test @Test
public void testPollLastFromAny() throws InterruptedException { public void testPollLastFromAny() throws InterruptedException {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("5.0.0") > 0); Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("5.0.0") > 0);

Loading…
Cancel
Save