diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index c575cc065..68a944ad1 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -17,6 +17,8 @@ package org.redisson; import java.time.Duration; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -126,7 +128,27 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit); } - + + @Override + public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + return blockingQueue.pollFirstFromAny(duration, count, queueNames); + } + + @Override + public Map> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + return blockingQueue.pollLastFromAny(duration, count, queueNames); + } + + @Override + public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { + return blockingQueue.pollFirstFromAnyAsync(duration, count, queueNames); + } + + @Override + public RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) { + return blockingQueue.pollLastFromAnyAsync(duration, count, queueNames); + } + @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index 0da2aa542..8644d81fe 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -24,6 +24,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.ListDrainToDecoder; +import java.time.Duration; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -109,10 +110,44 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock */ @Override public RFuture pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { - List names = new ArrayList<>(Arrays.asList(queueNames)); - names.add(0, getRawName()); return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, - toSeconds(timeout, unit), names.toArray(new String[]{})); + toSeconds(timeout, unit), queueNames); + } + + @Override + public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + return commandExecutor.getInterrupted(pollFirstFromAnyAsync(duration, count, queueNames)); + } + + @Override + public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { + List params = new ArrayList<>(); + params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS)); + params.add(queueNames.length + 1); + params.add(getRawName()); + params.addAll(Arrays.asList(queueNames)); + params.add("LEFT"); + params.add("COUNT"); + params.add(count); + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLMPOP, params.toArray()); + } + + @Override + public Map> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + return commandExecutor.getInterrupted(pollLastFromAnyAsync(duration, count, queueNames)); + } + + @Override + public RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) { + List params = new ArrayList<>(); + params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS)); + params.add(queueNames.length + 1); + params.add(getRawName()); + params.addAll(Arrays.asList(queueNames)); + params.add("RIGHT"); + params.add("COUNT"); + params.add(count); + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLMPOP, params.toArray()); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index f87c3b0a1..51571f25d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -27,10 +27,8 @@ import org.redisson.connection.decoder.ListDrainToDecoder; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.RedissonPromise; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; +import java.time.Duration; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; @@ -44,16 +42,16 @@ import java.util.function.Consumer; */ public class RedissonBoundedBlockingQueue extends RedissonQueue implements RBoundedBlockingQueue { - private final CommandAsyncExecutor commandExecutor; - + private final RedissonBlockingQueue blockingQueue; + protected RedissonBoundedBlockingQueue(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name, redisson); - this.commandExecutor = commandExecutor; + blockingQueue = new RedissonBlockingQueue(commandExecutor, name, redisson); } protected RedissonBoundedBlockingQueue(Codec codec, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(codec, commandExecutor, name, redisson); - this.commandExecutor = commandExecutor; + blockingQueue = new RedissonBlockingQueue(commandExecutor, name, redisson); } private String getSemaphoreName() { @@ -115,11 +113,11 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public RFuture takeAsync() { - RFuture takeFuture = commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0); + RFuture takeFuture = blockingQueue.takeAsync(); return wrapTakeFuture(takeFuture); } - private RFuture wrapTakeFuture(RFuture takeFuture) { + private RFuture wrapTakeFuture(RFuture takeFuture) { CompletableFuture f = takeFuture.toCompletableFuture().thenCompose(res -> { if (res == null) { return CompletableFuture.completedFuture(null); @@ -186,7 +184,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public RFuture pollAsync(long timeout, TimeUnit unit) { - RFuture takeFuture = commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), toSeconds(timeout, unit)); + RFuture takeFuture = blockingQueue.pollAsync(timeout, unit); return wrapTakeFuture(takeFuture); } @@ -214,10 +212,32 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements */ @Override public RFuture pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { - RFuture takeFuture = commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames); + RFuture takeFuture = blockingQueue.pollFromAnyAsync(timeout, unit, queueNames); return wrapTakeFuture(takeFuture); } + @Override + public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) { + return get(pollFirstFromAnyAsync(duration, count, queueNames)); + } + + @Override + public Map> pollLastFromAny(Duration duration, int count, String... queueNames) { + return get(pollLastFromAnyAsync(duration, count, queueNames)); + } + + @Override + public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { + RFuture>> future = blockingQueue.pollFirstFromAnyAsync(duration, count, queueNames); + return wrapTakeFuture(future); + } + + @Override + public RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) { + RFuture>> future = blockingQueue.pollLastFromAnyAsync(duration, count, queueNames); + return wrapTakeFuture(future); + } + @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); @@ -240,7 +260,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { - RFuture takeFuture = commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BRPOPLPUSH, getRawName(), queueName, unit.toSeconds(timeout)); + RFuture takeFuture = blockingQueue.pollLastAndOfferFirstToAsync(queueName, timeout, unit); return wrapTakeFuture(takeFuture); } @@ -266,7 +286,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements } String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); - return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder((Collection) c)), + return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), "local vals = redis.call('lrange', KEYS[1], 0, -1); " + "redis.call('del', KEYS[1]); " + "if #vals > 0 then " @@ -294,7 +314,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements String channelName = RedissonSemaphore.getChannelName(getSemaphoreName()); - return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder((Collection) c)), + return commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand("EVAL", new ListDrainToDecoder(c)), "local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" + "local vals = redis.call('lrange', KEYS[1], 0, elemNum); " + "redis.call('ltrim', KEYS[1], elemNum + 1, -1); " + diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java index 72ad6e169..3f0faf689 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java @@ -28,6 +28,7 @@ import org.redisson.misc.RedissonPromise; import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -88,6 +89,26 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i throw new UnsupportedOperationException("use poll method"); } + @Override + public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public Map> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + @Override public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { return blockingQueue.pollLastAndOfferFirstToAsync(queueName, timeout, unit); diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index f4f3eb47a..216b858c3 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -27,9 +27,11 @@ import org.redisson.connection.decoder.ListDrainToDecoder; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -128,6 +130,26 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i throw new UnsupportedOperationException("use poll method"); } + @Override + public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public Map> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + @Override public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { RPromise result = new RedissonPromise(); diff --git a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java index 958516635..9abd611aa 100644 --- a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java @@ -37,6 +37,7 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.redisson.remote.RemoteServiceRequest; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.function.Consumer; @@ -607,6 +608,26 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran throw new UnsupportedOperationException(); } + @Override + public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public Map> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) { + throw new UnsupportedOperationException(); + } + @Override public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java index 4290761d6..ca50df87b 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java @@ -15,6 +15,9 @@ */ package org.redisson.api; +import java.time.Duration; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -43,6 +46,36 @@ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockin */ V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException; + /** + * Retrieves and removes first available head elements of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the head elements + */ + Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException; + + /** + * Retrieves and removes first available tail elements of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the tail elements + */ + Map> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException; + /** * Retrieves and removes last available tail element of this queue and adds it at the head of queueName, * waiting up to the specified wait time if necessary for an element to become available. diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java index 512863538..7923d3ac8 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java @@ -15,7 +15,10 @@ */ package org.redisson.api; +import java.time.Duration; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -42,6 +45,36 @@ public interface RBlockingQueueAsync extends RQueueAsync { */ RFuture pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames); + /** + * Retrieves and removes first available head elements of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the head elements + */ + RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames); + + /** + * Retrieves and removes first available tail elements of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the tail elements + */ + RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames); + /** * Removes at most the given number of available elements from * this queue and adds them to the given collection in async mode. A failure diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java index 63d8cfac0..4f72c05ff 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java @@ -15,7 +15,10 @@ */ package org.redisson.api; +import java.time.Duration; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import reactor.core.publisher.Flux; @@ -44,6 +47,36 @@ public interface RBlockingQueueReactive extends RQueueReactive { */ Mono pollFromAny(long timeout, TimeUnit unit, String... queueNames); + /** + * Retrieves and removes first available head elements of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the head elements + */ + Mono>> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException; + + /** + * Retrieves and removes first available tail elements of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the tail elements + */ + Mono>> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException; + /** * Removes at most the given number of available elements from * this queue and adds them to the given collection in async mode. A failure diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java index e261a86ca..5a022bd5b 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java @@ -15,7 +15,10 @@ */ package org.redisson.api; +import java.time.Duration; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.core.Completable; @@ -46,6 +49,36 @@ public interface RBlockingQueueRx extends RQueueRx { */ Maybe pollFromAny(long timeout, TimeUnit unit, String... queueNames); + /** + * Retrieves and removes first available head elements of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the head elements + */ + Maybe>> pollFirstFromAny(Duration duration, int count, String... queueNames); + + /** + * Retrieves and removes first available tail elements of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @param queueNames name of queues + * @return the tail elements + */ + Maybe>> pollLastFromAny(Duration duration, int count, String... queueNames); + /** * Removes at most the given number of available elements from * this queue and adds them to the given collection in async mode. A failure diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 34be5c971..122534641 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -246,9 +246,28 @@ public interface RedisCommands { RedisCommand BZPOPMIN_VALUE = new RedisCommand("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); RedisCommand BZPOPMAX_VALUE = new RedisCommand("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); + RedisCommand>> BLMPOP = new RedisCommand<>("BLMPOP", + new ListMultiDecoder2( + new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) { + @Override + public Object decode(List parts, State state) { + return Collections.singletonMap(parts.get(0), parts.get(1)); + } + }, + new CodecDecoder(), + new CodecDecoder() { + @Override + public Decoder getDecoder(Codec codec, int paramNum, State state) { + if ((paramNum + 1) % 2 == 0) { + return DoubleCodec.INSTANCE.getValueDecoder(); + } + return codec.getValueDecoder(); + } + })); + Set BLOCKING_COMMAND_NAMES = new HashSet( Arrays.asList(BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName(), - BLPOP.getName(), BRPOP.getName(), BLMOVE.getName(), BZMPOP_SINGLE_LIST.getName())); + BLPOP.getName(), BRPOP.getName(), BLMOVE.getName(), BZMPOP_SINGLE_LIST.getName(), BLMPOP.getName())); RedisCommand PFADD = new RedisCommand("PFADD", new BooleanReplayConvertor()); RedisStrictCommand PFCOUNT = new RedisStrictCommand("PFCOUNT"); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index e19a808a7..f63273f67 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -116,7 +116,7 @@ public interface CommandAsyncExecutor { RFuture async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand command, Object[] params, boolean ignoreRedirect, boolean noRetry); - RFuture pollFromAnyAsync(String name, Codec codec, RedisCommand command, long secondsTimeout, String... queueNames); + RFuture pollFromAnyAsync(String name, Codec codec, RedisCommand command, long secondsTimeout, String... queueNames); ByteBuf encode(Codec codec, Object value); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 6182e7afe..a7595cbbf 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -721,7 +721,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } @Override - public RFuture pollFromAnyAsync(String name, Codec codec, RedisCommand command, long secondsTimeout, String... queueNames) { + public RFuture pollFromAnyAsync(String name, Codec codec, RedisCommand command, long secondsTimeout, String... queueNames) { if (connectionManager.isClusterMode() && queueNames.length > 0) { AtomicReference> ref = new AtomicReference<>(); List names = new ArrayList<>(); @@ -741,7 +741,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private CompletionStage poll(Codec codec, AtomicReference> ref, - List names, AtomicLong counter, RedisCommand command) { + List names, AtomicLong counter, RedisCommand command) { if (ref.get().hasNext()) { String currentName = ref.get().next(); RFuture future = writeAsync(currentName, codec, command, currentName, 1); diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 1f69bb8c9..49e571d21 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -2,6 +2,7 @@ package org.redisson; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; @@ -13,10 +14,7 @@ import org.redisson.connection.balancer.RandomLoadBalancer; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -25,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS.l; public class RedissonBlockingQueueTest extends RedissonQueueTest { @@ -512,6 +511,55 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { Assertions.assertTrue(System.currentTimeMillis() - s > 2000); } + @Test + public void testPollFirstFromAny() throws InterruptedException { +// Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0); + + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + RBlockingQueue queue2 = redisson.getBlockingQueue("queue:pollany1"); + RBlockingQueue queue3 = redisson.getBlockingQueue("queue:pollany2"); + Assertions.assertDoesNotThrow(() -> { + queue3.put(1); + queue3.put(2); + queue3.put(3); + queue1.put(4); + queue1.put(5); + queue1.put(6); + queue2.put(7); + queue2.put(8); + queue2.put(9); + }); + + Map> res = queue1.pollFirstFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2"); + assertThat(res.get("queue:pollany")).containsExactly(4, 5); + queue1.clear(); + Map> res2 = queue1.pollFirstFromAny(Duration.ofSeconds(4), 2); + assertThat(res2).isNull(); + } + + @Test + public void testPollLastFromAny() throws InterruptedException { + Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0); + + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + RBlockingQueue queue2 = redisson.getBlockingQueue("queue:pollany1"); + RBlockingQueue queue3 = redisson.getBlockingQueue("queue:pollany2"); + Assertions.assertDoesNotThrow(() -> { + queue3.put(1); + queue3.put(2); + queue3.put(3); + queue1.put(4); + queue1.put(5); + queue1.put(6); + queue2.put(7); + queue2.put(8); + queue2.put(9); + }); + + Map> res = queue1.pollLastFromAny(Duration.ofSeconds(4), 2, "queue:pollany1", "queue:pollany2"); + assertThat(res.get("queue:pollany")).containsExactly(6, 5); + } + @Test public void testTake() throws InterruptedException { RBlockingQueue queue1 = getQueue();