Feature - RBlockingQueue.pollFromAnyWithName() method added. #333

pull/5341/head
Nikita Koksharov 1 year ago
parent d6df3075df
commit 13c5c95f4d

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.redisson.api.Entry;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -118,6 +119,16 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
return blockingQueue.pollFromAnyAsync(timeout, unit);
}
@Override
public Entry<String, V> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
return blockingQueue.pollFromAnyWithName(timeout, queueNames);
}
@Override
public RFuture<Entry<String, V>> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) {
return blockingQueue.pollFromAnyWithNameAsync(timeout, queueNames);
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
return blockingQueue.pollLastAndOfferFirstToAsync(queueName, timeout, unit);

@ -15,6 +15,7 @@
*/
package org.redisson;
import org.redisson.api.Entry;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -114,6 +115,21 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return commandExecutor.getInterrupted(pollFromAnyAsync(timeout, unit, queueNames));
}
@Override
public Entry<String, V> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
return commandExecutor.getInterrupted(pollFromAnyWithNameAsync(timeout, queueNames));
}
@Override
public RFuture<Entry<String, V>> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) {
if (timeout.toMillis() < 0) {
return new CompletableFutureWrapper<>((Entry) null);
}
return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_NAME,
toSeconds(timeout.toMillis(), TimeUnit.MILLISECONDS), queueNames);
}
/*
* (non-Javadoc)
* @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[])

@ -15,6 +15,7 @@
*/
package org.redisson;
import org.redisson.api.Entry;
import org.redisson.api.RBoundedBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -219,6 +220,17 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
return wrapTakeFuture(takeFuture);
}
@Override
public Entry<String, V> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
return commandExecutor.getInterrupted(pollFromAnyWithNameAsync(timeout, queueNames));
}
@Override
public RFuture<Entry<String, V>> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) {
RFuture<Entry<String, V>> takeFuture = blockingQueue.pollFromAnyWithNameAsync(timeout, queueNames);
return wrapTakeFuture(takeFuture);
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) {
return get(pollFirstFromAnyAsync(duration, count, queueNames));

@ -15,6 +15,7 @@
*/
package org.redisson;
import org.redisson.api.Entry;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityBlockingDeque;
import org.redisson.api.RedissonClient;
@ -89,6 +90,16 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
throw new UnsupportedOperationException("use poll method");
}
@Override
public Entry<String, V> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Entry<String, V>> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");

@ -15,6 +15,7 @@
*/
package org.redisson;
import org.redisson.api.Entry;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityBlockingQueue;
import org.redisson.api.RedissonClient;
@ -130,6 +131,11 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
throw new UnsupportedOperationException("use poll method");
}
@Override
public Entry<String, V> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
@ -263,6 +269,11 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Entry<String, V>> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Void> putAsync(V e) {
throw new UnsupportedOperationException("use add method");

@ -18,10 +18,7 @@ package org.redisson;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RTransferQueue;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.*;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
@ -605,6 +602,16 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
throw new UnsupportedOperationException();
}
@Override
public Entry<String, V> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Entry<String, V>> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) {
throw new UnsupportedOperationException();
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException();

@ -0,0 +1,45 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
*
* @author Nikita Koksharov
*
*/
public class Entry<K, V> {
private K key;
private V value;
public Entry() {
}
public Entry(K key, V value) {
this.key = key;
this.value = value;
}
public V getValue() {
return value;
}
public K getKey() {
return key;
}
}

@ -46,6 +46,20 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
*/
V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available head element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueNames queue names. Queue name itself is always included
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Entry<String, V> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available

@ -45,6 +45,18 @@ public interface RBlockingQueueAsync<V> extends RQueueAsync<V> {
*/
RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/**
* Retrieves and removes first available head element of <b>any</b> queue in async mode,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueNames - queue names. Queue name itself is always included
* @param timeout how long to wait before giving up
* @return Future object with the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
*/
RFuture<Entry<String, V>> pollFromAnyWithNameAsync(Duration timeout, String... queueNames);
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available

@ -47,6 +47,20 @@ public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {
*/
Mono<V> pollFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Retrieves and removes first available head element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueNames queue names. Queue name itself is always included
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Mono<Entry<String, V>> pollFromAnyWithName(Duration timeout, String... queueNames);
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available

@ -49,6 +49,20 @@ public interface RBlockingQueueRx<V> extends RQueueRx<V> {
*/
Maybe<V> pollFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Retrieves and removes first available head element of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueNames queue names. Queue name itself is always included
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Maybe<Entry<String, V>> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available

@ -289,6 +289,17 @@ public interface RedisCommands {
RedisCommand<Object> BZPOPMIN_VALUE = new RedisCommand<Object>("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Object> BZPOPMAX_VALUE = new RedisCommand<Object>("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Map<String, List<Object>>> BLPOP_NAME = new RedisCommand<>("BLPOP",
new ListObjectDecoder(0) {
@Override
public Object decode(List parts, State state) {
if (parts.isEmpty()) {
return null;
}
return new org.redisson.api.Entry<>(parts.get(0), parts.get(1));
}
});
RedisCommand<Map<String, List<Object>>> BLMPOP = new RedisCommand<>("BLMPOP",
new ListMultiDecoder2(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) {

@ -6,6 +6,7 @@ import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.Entry;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
@ -508,6 +509,27 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
Assertions.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testPollFromAnyWithName() throws InterruptedException {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
Assertions.assertDoesNotThrow(() -> {
queue3.put(2);
queue1.put(1);
queue2.put(3);
});
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
Entry<String, Integer> r = queue1.pollFromAnyWithName(Duration.ofSeconds(4), "queue:pollany1", "queue:pollany2");
assertThat(r.getKey()).isEqualTo("queue:pollany2");
assertThat(r.getValue()).isEqualTo(2);
Assertions.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testPollFirstFromAny() throws InterruptedException {
// Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);

Loading…
Cancel
Save