Merge pull request #6426 from seakider/feature_pollLastWithName

Feature poll last with name
pull/6430/head
Nikita Koksharov 2 weeks ago committed by GitHub
commit 679632f3f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -151,6 +151,11 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
return blockingQueue.pollLastFromAny(duration, count, queueNames);
}
@Override
public Entry<String, V> pollLastFromAnyWithName(Duration duration, String... queueNames) throws InterruptedException {
return blockingQueue.pollLastFromAnyWithName(duration, queueNames);
}
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
return blockingQueue.pollFirstFromAnyAsync(duration, count, queueNames);
@ -161,6 +166,11 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
return blockingQueue.pollLastFromAnyAsync(duration, count, queueNames);
}
@Override
public RFuture<Entry<String, V>> pollLastFromAnyWithNameAsync(Duration duration, String... queueNames) {
return blockingQueue.pollLastFromAnyWithNameAsync(duration, queueNames);
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));

@ -183,6 +183,20 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLMPOP, params.toArray());
}
@Override
public Entry<String, V> pollLastFromAnyWithName(Duration duration, String... queueNames) throws InterruptedException {
return commandExecutor.getInterrupted(pollLastFromAnyWithNameAsync(duration, queueNames));
}
@Override
public RFuture<Entry<String, V>> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) {
if (timeout.toMillis() < 0) {
return new CompletableFutureWrapper<>((Entry) null);
}
return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BRPOP_NAME,
toSeconds(timeout.toMillis(), TimeUnit.MILLISECONDS), queueNames);
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
if (timeout < 0) {

@ -235,6 +235,17 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
return wrapTakeFuture(takeFuture);
}
@Override
public Entry<String, V> pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
return commandExecutor.getInterrupted(pollLastFromAnyWithNameAsync(timeout, queueNames));
}
@Override
public RFuture<Entry<String, V>> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) {
RFuture<Entry<String, V>> takeFuture = blockingQueue.pollLastFromAnyWithNameAsync(timeout, queueNames);
return wrapTakeFuture(takeFuture);
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) {
return get(pollFirstFromAnyAsync(duration, count, queueNames));

@ -101,6 +101,16 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
throw new UnsupportedOperationException("use poll method");
}
@Override
public Entry<String, V> pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Entry<String, V>> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");

@ -156,6 +156,11 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
throw new UnsupportedOperationException("use poll method");
}
@Override
public Entry<String, V> pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
@ -291,6 +296,11 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Entry<String, V>> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Void> putAsync(V e) {
throw new UnsupportedOperationException("use add method");

@ -611,6 +611,16 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
throw new UnsupportedOperationException();
}
@Override
public Entry<String, V> pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Entry<String, V>> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) {
throw new UnsupportedOperationException();
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException();

@ -92,6 +92,20 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
*/
Map<String, List<V>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available tail element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueNames queue names. Queue name itself is always included
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @return the tail of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Entry<String, V> pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes last available tail element of this queue and adds it at the head of <code>queueName</code>,
* waiting up to the specified wait time if necessary for an element to become available.

@ -87,6 +87,18 @@ public interface RBlockingQueueAsync<V> extends RQueueAsync<V> {
*/
RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames);
/**
* Retrieves and removes first available tail element of <b>any</b> queue in async mode,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueNames - queue names. Queue name itself is always included
* @param timeout how long to wait before giving up
* @return Future object with the tail of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
*/
RFuture<Entry<String, V>> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames);
/**
* Removes at most the given number of available elements from
* this queue and adds them to the given collection in async mode. A failure

@ -61,6 +61,21 @@ public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {
*/
Mono<Entry<String, V>> pollFromAnyWithName(Duration timeout, String... queueNames);
/**
* Retrieves and removes first available tail element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueNames queue names. Queue name itself is always included
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @return the tail of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Mono<Entry<String, V>> pollLastFromAnyWithName(Duration timeout, String... queueNames);
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available

@ -63,6 +63,20 @@ public interface RBlockingQueueRx<V> extends RQueueRx<V> {
*/
Maybe<Entry<String, V>> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available tail element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueNames queue names. Queue name itself is always included
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @return the tail of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Maybe<Entry<String, V>> pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available

@ -301,7 +301,18 @@ public interface RedisCommands {
RedisCommand<Object> BZPOPMIN_VALUE = new RedisCommand<Object>("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Object> BZPOPMAX_VALUE = new RedisCommand<Object>("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Map<String, List<Object>>> BLPOP_NAME = new RedisCommand<>("BLPOP",
RedisCommand<org.redisson.api.Entry<String, Object>> BLPOP_NAME = new RedisCommand<>("BLPOP",
new ListObjectDecoder(0) {
@Override
public Object decode(List parts, State state) {
if (parts.isEmpty()) {
return null;
}
return new org.redisson.api.Entry<>(parts.get(0), parts.get(1));
}
});
RedisCommand<org.redisson.api.Entry<String, Object>> BRPOP_NAME = new RedisCommand<>("BRPOP",
new ListObjectDecoder(0) {
@Override
public Object decode(List parts, State state) {

@ -425,6 +425,33 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
Assertions.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testPollLastFromAnyWithName() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:polLast");
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:polLast1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:polLast2");
queue3.put(1);
queue3.put(2);
queue3.put(3);
queue1.put(4);
queue1.put(5);
queue1.put(6);
queue2.put(7);
Entry<String, Integer> r = queue1.pollLastFromAnyWithName(Duration.ofSeconds(4), "queue:polLast1", "queue:polpolLast2");
assertThat(r.getKey()).isEqualTo("queue:polLast");
assertThat(r.getValue()).isEqualTo(6);
Entry<String, Integer> r2 = queue2.pollLastFromAnyWithName(Duration.ofSeconds(4), "queue:polLast", "queue:polLast2");
assertThat(r2.getKey()).isEqualTo("queue:polLast1");
assertThat(r2.getValue()).isEqualTo(7);
Entry<String, Integer> r3 = queue2.pollLastFromAnyWithName(Duration.ofSeconds(4), "queue:polLast2", "queue:polLast");
assertThat(r3.getKey()).isEqualTo("queue:polLast2");
assertThat(r3.getValue()).isEqualTo(3);
}
@Test
public void testPollFirstFromAny() throws InterruptedException {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");

Loading…
Cancel
Save