Feature - added RBlockingQueue.pollFirstFromAny() and pollLastFromAny() methods. #4169

pull/4226/head
Nikita Koksharov 3 years ago
parent dbab0c756a
commit a3cd031834

@ -17,6 +17,8 @@ package org.redisson;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -126,7 +128,27 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit);
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
return blockingQueue.pollFirstFromAny(duration, count, queueNames);
}
@Override
public Map<String, List<V>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
return blockingQueue.pollLastFromAny(duration, count, queueNames);
}
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
return blockingQueue.pollFirstFromAnyAsync(duration, count, queueNames);
}
@Override
public RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
return blockingQueue.pollLastFromAnyAsync(duration, count, queueNames);
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));

@ -24,6 +24,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -109,10 +110,44 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
List<String> names = new ArrayList<>(Arrays.asList(queueNames));
names.add(0, getRawName());
return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE,
toSeconds(timeout, unit), names.toArray(new String[]{}));
toSeconds(timeout, unit), queueNames);
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
return commandExecutor.getInterrupted(pollFirstFromAnyAsync(duration, count, queueNames));
}
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
List<Object> params = new ArrayList<>();
params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS));
params.add(queueNames.length + 1);
params.add(getRawName());
params.addAll(Arrays.asList(queueNames));
params.add("LEFT");
params.add("COUNT");
params.add(count);
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLMPOP, params.toArray());
}
@Override
public Map<String, List<V>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
return commandExecutor.getInterrupted(pollLastFromAnyAsync(duration, count, queueNames));
}
@Override
public RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
List<Object> params = new ArrayList<>();
params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS));
params.add(queueNames.length + 1);
params.add(getRawName());
params.addAll(Arrays.asList(queueNames));
params.add("RIGHT");
params.add("COUNT");
params.add(count);
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLMPOP, params.toArray());
}
@Override

@ -27,10 +27,8 @@ import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
@ -44,16 +42,16 @@ import java.util.function.Consumer;
*/
public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements RBoundedBlockingQueue<V> {
private final CommandAsyncExecutor commandExecutor;
private final RedissonBlockingQueue<V> blockingQueue;
protected RedissonBoundedBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name, redisson);
this.commandExecutor = commandExecutor;
blockingQueue = new RedissonBlockingQueue<V>(commandExecutor, name, redisson);
}
protected RedissonBoundedBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name, redisson);
this.commandExecutor = commandExecutor;
blockingQueue = new RedissonBlockingQueue<V>(commandExecutor, name, redisson);
}
private String getSemaphoreName() {
@ -115,11 +113,11 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public RFuture<V> takeAsync() {
RFuture<V> takeFuture = commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0);
RFuture<V> takeFuture = blockingQueue.takeAsync();
return wrapTakeFuture(takeFuture);
}
private RFuture<V> wrapTakeFuture(RFuture<V> takeFuture) {
private <V> RFuture<V> wrapTakeFuture(RFuture<V> takeFuture) {
CompletableFuture<V> f = takeFuture.toCompletableFuture().thenCompose(res -> {
if (res == null) {
return CompletableFuture.completedFuture(null);
@ -186,7 +184,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
RFuture<V> takeFuture = commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), toSeconds(timeout, unit));
RFuture<V> takeFuture = blockingQueue.pollAsync(timeout, unit);
return wrapTakeFuture(takeFuture);
}
@ -214,10 +212,32 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
*/
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
RFuture<V> takeFuture = commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames);
RFuture<V> takeFuture = blockingQueue.pollFromAnyAsync(timeout, unit, queueNames);
return wrapTakeFuture(takeFuture);
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) {
return get(pollFirstFromAnyAsync(duration, count, queueNames));
}
@Override
public Map<String, List<V>> pollLastFromAny(Duration duration, int count, String... queueNames) {
return get(pollLastFromAnyAsync(duration, count, queueNames));
}
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
RFuture<Map<String, List<V>>> future = blockingQueue.pollFirstFromAnyAsync(duration, count, queueNames);
return wrapTakeFuture(future);
}
@Override
public RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
RFuture<Map<String, List<V>>> future = blockingQueue.pollLastFromAnyAsync(duration, count, queueNames);
return wrapTakeFuture(future);
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
@ -240,7 +260,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
RFuture<V> takeFuture = commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BRPOPLPUSH, getRawName(), queueName, unit.toSeconds(timeout));
RFuture<V> takeFuture = blockingQueue.pollLastAndOfferFirstToAsync(queueName, timeout, unit);
return wrapTakeFuture(takeFuture);
}
@ -266,7 +286,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
}
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder((Collection<Object>) c)),
return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('del', KEYS[1]); " +
"if #vals > 0 then "
@ -294,7 +314,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
String channelName = RedissonSemaphore.getChannelName(getSemaphoreName());
return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder((Collection<Object>) c)),
return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +

@ -28,6 +28,7 @@ import org.redisson.misc.RedissonPromise;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -88,6 +89,26 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
throw new UnsupportedOperationException("use poll method");
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public Map<String, List<V>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
return blockingQueue.pollLastAndOfferFirstToAsync(queueName, timeout, unit);

@ -27,9 +27,11 @@ import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -128,6 +130,26 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
throw new UnsupportedOperationException("use poll method");
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public Map<String, List<V>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
RPromise<V> result = new RedissonPromise<V>();

@ -37,6 +37,7 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RemoteServiceRequest;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
@ -607,6 +608,26 @@ public class RedissonTransferQueue<V> extends RedissonExpirable implements RTran
throw new UnsupportedOperationException();
}
@Override
public Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public Map<String, List<V>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
throw new UnsupportedOperationException();
}
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();

@ -15,6 +15,9 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -43,6 +46,36 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
*/
V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the head elements
*/
Map<String, List<V>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available tail elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the tail elements
*/
Map<String, List<V>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes last available tail element of this queue and adds it at the head of <code>queueName</code>,
* waiting up to the specified wait time if necessary for an element to become available.

@ -15,7 +15,10 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@ -42,6 +45,36 @@ public interface RBlockingQueueAsync<V> extends RQueueAsync<V> {
*/
RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames);
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the head elements
*/
RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames);
/**
* Retrieves and removes first available tail elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the tail elements
*/
RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames);
/**
* Removes at most the given number of available elements from
* this queue and adds them to the given collection in async mode. A failure

@ -15,7 +15,10 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
@ -44,6 +47,36 @@ public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {
*/
Mono<V> pollFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the head elements
*/
Mono<Map<String, List<V>>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException;
/**
* Retrieves and removes first available tail elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the tail elements
*/
Mono<Map<String, List<V>>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException;
/**
* Removes at most the given number of available elements from
* this queue and adds them to the given collection in async mode. A failure

@ -15,7 +15,10 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Completable;
@ -46,6 +49,36 @@ public interface RBlockingQueueRx<V> extends RQueueRx<V> {
*/
Maybe<V> pollFromAny(long timeout, TimeUnit unit, String... queueNames);
/**
* Retrieves and removes first available head elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the head elements
*/
Maybe<Map<String, List<V>>> pollFirstFromAny(Duration duration, int count, String... queueNames);
/**
* Retrieves and removes first available tail elements of <b>any</b> queue,
* waiting up to the specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @param queueNames name of queues
* @return the tail elements
*/
Maybe<Map<String, List<V>>> pollLastFromAny(Duration duration, int count, String... queueNames);
/**
* Removes at most the given number of available elements from
* this queue and adds them to the given collection in async mode. A failure

@ -246,9 +246,28 @@ public interface RedisCommands {
RedisCommand<Object> BZPOPMIN_VALUE = new RedisCommand<Object>("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Object> BZPOPMAX_VALUE = new RedisCommand<Object>("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder());
RedisCommand<Map<String, List<Object>>> BLMPOP = new RedisCommand<>("BLMPOP",
new ListMultiDecoder2(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) {
@Override
public Object decode(List parts, State state) {
return Collections.singletonMap(parts.get(0), parts.get(1));
}
},
new CodecDecoder(),
new CodecDecoder() {
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
if ((paramNum + 1) % 2 == 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return codec.getValueDecoder();
}
}));
Set<String> BLOCKING_COMMAND_NAMES = new HashSet<String>(
Arrays.asList(BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName(),
BLPOP.getName(), BRPOP.getName(), BLMOVE.getName(), BZMPOP_SINGLE_LIST.getName()));
BLPOP.getName(), BRPOP.getName(), BLMOVE.getName(), BZMPOP_SINGLE_LIST.getName(), BLMPOP.getName()));
RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor());
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");

@ -116,7 +116,7 @@ public interface CommandAsyncExecutor {
<V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec,
RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry);
<V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames);
<V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<?> command, long secondsTimeout, String... queueNames);
ByteBuf encode(Codec codec, Object value);

@ -721,7 +721,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
@Override
public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String... queueNames) {
public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<?> command, long secondsTimeout, String... queueNames) {
if (connectionManager.isClusterMode() && queueNames.length > 0) {
AtomicReference<Iterator<String>> ref = new AtomicReference<>();
List<String> names = new ArrayList<>();
@ -741,7 +741,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private <V> CompletionStage<V> poll(Codec codec, AtomicReference<Iterator<String>> ref,
List<String> names, AtomicLong counter, RedisCommand<Object> command) {
List<String> names, AtomicLong counter, RedisCommand<?> command) {
if (ref.get().hasNext()) {
String currentName = ref.get().next();
RFuture<V> future = writeAsync(currentName, codec, command, currentName, 1);

@ -2,6 +2,7 @@ package org.redisson;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
@ -13,10 +14,7 @@ import org.redisson.connection.balancer.RandomLoadBalancer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -25,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS.l;
public class RedissonBlockingQueueTest extends RedissonQueueTest {
@ -512,6 +511,55 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
Assertions.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testPollFirstFromAny() throws InterruptedException {
// Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
Assertions.assertDoesNotThrow(() -> {
queue3.put(1);
queue3.put(2);
queue3.put(3);
queue1.put(4);
queue1.put(5);
queue1.put(6);
queue2.put(7);
queue2.put(8);
queue2.put(9);
});
Map<String, List<Integer>> res = queue1.pollFirstFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2");
assertThat(res.get("queue:pollany")).containsExactly(4, 5);
queue1.clear();
Map<String, List<Integer>> res2 = queue1.pollFirstFromAny(Duration.ofSeconds(4), 2);
assertThat(res2).isNull();
}
@Test
public void testPollLastFromAny() throws InterruptedException {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
Assertions.assertDoesNotThrow(() -> {
queue3.put(1);
queue3.put(2);
queue3.put(3);
queue1.put(4);
queue1.put(5);
queue1.put(6);
queue2.put(7);
queue2.put(8);
queue2.put(9);
});
Map<String, List<Integer>> res = queue1.pollLastFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2");
assertThat(res.get("queue:pollany")).containsExactly(6, 5);
}
@Test
public void testTake() throws InterruptedException {
RBlockingQueue<Integer> queue1 = getQueue();

Loading…
Cancel
Save