From 960c02386e12ef2d4a87fd383e5d993d5d505bc0 Mon Sep 17 00:00:00 2001 From: seakider Date: Mon, 10 Feb 2025 21:34:31 +0800 Subject: [PATCH 1/3] Feature - RBlockingQueue.pollLastFromAnyWithName() Signed-off-by: seakider --- .../org/redisson/RedissonBlockingDeque.java | 10 +++++++ .../org/redisson/RedissonBlockingQueue.java | 14 ++++++++++ .../RedissonBoundedBlockingQueue.java | 11 ++++++++ .../RedissonPriorityBlockingDeque.java | 10 +++++++ .../RedissonPriorityBlockingQueue.java | 10 +++++++ .../org/redisson/RedissonTransferQueue.java | 10 +++++++ .../java/org/redisson/api/RBlockingQueue.java | 14 ++++++++++ .../org/redisson/api/RBlockingQueueAsync.java | 12 +++++++++ .../client/protocol/RedisCommands.java | 11 ++++++++ .../redisson/RedissonBlockingQueueTest.java | 27 +++++++++++++++++++ 10 files changed, 129 insertions(+) diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index cd8ceb71f..df1a3b5fe 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -151,6 +151,11 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock return blockingQueue.pollLastFromAny(duration, count, queueNames); } + @Override + public Entry pollLastFromAnyWithName(Duration duration, String... queueNames) throws InterruptedException { + return blockingQueue.pollLastFromAnyWithName(duration, queueNames); + } + @Override public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { return blockingQueue.pollFirstFromAnyAsync(duration, count, queueNames); @@ -161,6 +166,11 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock return blockingQueue.pollLastFromAnyAsync(duration, count, queueNames); } + @Override + public RFuture> pollLastFromAnyWithNameAsync(Duration duration, String... queueNames) { + return blockingQueue.pollLastFromAnyWithNameAsync(duration, queueNames); + } + @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index 869087324..b827e826c 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -183,6 +183,20 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLMPOP, params.toArray()); } + @Override + public Entry pollLastFromAnyWithName(Duration duration, String... queueNames) throws InterruptedException { + return commandExecutor.getInterrupted(pollLastFromAnyWithNameAsync(duration, queueNames)); + } + + @Override + public RFuture> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) { + if (timeout.toMillis() < 0) { + return new CompletableFutureWrapper<>((Entry) null); + } + return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BRPOP_NAME, + toSeconds(timeout.toMillis(), TimeUnit.MILLISECONDS), queueNames); + } + @Override public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { if (timeout < 0) { diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index 7baec34f8..700fda40d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -235,6 +235,17 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements return wrapTakeFuture(takeFuture); } + @Override + public Entry pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + return commandExecutor.getInterrupted(pollLastFromAnyWithNameAsync(timeout, queueNames)); + } + + @Override + public RFuture> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) { + RFuture> takeFuture = blockingQueue.pollLastFromAnyWithNameAsync(timeout, queueNames); + return wrapTakeFuture(takeFuture); + } + @Override public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) { return get(pollFirstFromAnyAsync(duration, count, queueNames)); diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java index d38bd7c14..2b2c80359 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java @@ -101,6 +101,16 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i throw new UnsupportedOperationException("use poll method"); } + @Override + public Entry pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public RFuture> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + @Override public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { throw new UnsupportedOperationException("use poll method"); diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index 790c5543c..76d39ea78 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -156,6 +156,11 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i throw new UnsupportedOperationException("use poll method"); } + @Override + public Entry pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + @Override public RFuture>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) { throw new UnsupportedOperationException("use poll method"); @@ -291,6 +296,11 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i throw new UnsupportedOperationException("use poll method"); } + @Override + public RFuture> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + @Override public RFuture putAsync(V e) { throw new UnsupportedOperationException("use add method"); diff --git a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java index 3525624b0..a58e7489a 100644 --- a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java @@ -611,6 +611,16 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran throw new UnsupportedOperationException(); } + @Override + public Entry pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames) { + throw new UnsupportedOperationException(); + } + @Override public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { throw new UnsupportedOperationException(); diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java index 31ca70b47..b75710e49 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java @@ -92,6 +92,20 @@ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockin */ Map> pollLastFromAny(Duration duration, int count, String... queueNames) throws InterruptedException; + /** + * Retrieves and removes first available tail element of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueNames queue names. Queue name itself is always included + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @return the tail of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + Entry pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException; + /** * Retrieves and removes last available tail element of this queue and adds it at the head of queueName, * waiting up to the specified wait time if necessary for an element to become available. diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java index 568fd2454..42297b331 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java @@ -87,6 +87,18 @@ public interface RBlockingQueueAsync extends RQueueAsync { */ RFuture>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames); + /** + * Retrieves and removes first available tail element of any queue in async mode, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueNames - queue names. Queue name itself is always included + * @param timeout how long to wait before giving up + * @return Future object with the tail of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + */ + RFuture> pollLastFromAnyWithNameAsync(Duration timeout, String... queueNames); + /** * Removes at most the given number of available elements from * this queue and adds them to the given collection in async mode. A failure diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index db4a8dacf..9d0a9d4ca 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -309,6 +309,17 @@ public interface RedisCommands { } }); + RedisCommand>> BRPOP_NAME = new RedisCommand<>("BRPOP", + new ListObjectDecoder(0) { + @Override + public Object decode(List parts, State state) { + if (parts.isEmpty()) { + return null; + } + return new org.redisson.api.Entry<>(parts.get(0), parts.get(1)); + } + }); + RedisCommand>> BLMPOP = new RedisCommand<>("BLMPOP", new ListMultiDecoder2( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) { diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 50d2d8cc4..78aac3fc5 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -425,6 +425,33 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { Assertions.assertTrue(System.currentTimeMillis() - s > 2000); } + @Test + public void testPollLastFromAnyWithName() throws InterruptedException { + RBlockingQueue queue1 = redisson.getBlockingQueue("queue:polLast"); + RBlockingQueue queue2 = redisson.getBlockingQueue("queue:polLast1"); + RBlockingQueue queue3 = redisson.getBlockingQueue("queue:polLast2"); + queue3.put(1); + queue3.put(2); + queue3.put(3); + queue1.put(4); + queue1.put(5); + queue1.put(6); + queue2.put(7); + + Entry r = queue1.pollLastFromAnyWithName(Duration.ofSeconds(4), "queue:polLast1", "queue:polpolLast2"); + assertThat(r.getKey()).isEqualTo("queue:polLast"); + assertThat(r.getValue()).isEqualTo(6); + + Entry r2 = queue2.pollLastFromAnyWithName(Duration.ofSeconds(4), "queue:polLast", "queue:polLast2"); + assertThat(r2.getKey()).isEqualTo("queue:polLast1"); + assertThat(r2.getValue()).isEqualTo(7); + + Entry r3 = queue2.pollLastFromAnyWithName(Duration.ofSeconds(4), "queue:polLast2", "queue:polLast"); + assertThat(r3.getKey()).isEqualTo("queue:polLast2"); + assertThat(r3.getValue()).isEqualTo(3); + + } + @Test public void testPollFirstFromAny() throws InterruptedException { RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); From 02e7bece3c1141ce8405587e54f8ad957ed9b19d Mon Sep 17 00:00:00 2001 From: seakider Date: Mon, 10 Feb 2025 21:52:22 +0800 Subject: [PATCH 2/3] Feature - RBlockingQueue.pollLastFromAnyWithName() Signed-off-by: seakider --- .../main/java/org/redisson/client/protocol/RedisCommands.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 9d0a9d4ca..0e66d95b6 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -298,7 +298,7 @@ public interface RedisCommands { RedisCommand BZPOPMIN_VALUE = new RedisCommand("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); RedisCommand BZPOPMAX_VALUE = new RedisCommand("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); - RedisCommand>> BLPOP_NAME = new RedisCommand<>("BLPOP", + RedisCommand> BLPOP_NAME = new RedisCommand<>("BLPOP", new ListObjectDecoder(0) { @Override public Object decode(List parts, State state) { @@ -309,7 +309,7 @@ public interface RedisCommands { } }); - RedisCommand>> BRPOP_NAME = new RedisCommand<>("BRPOP", + RedisCommand> BRPOP_NAME = new RedisCommand<>("BRPOP", new ListObjectDecoder(0) { @Override public Object decode(List parts, State state) { From 7a4e0f423988051546251ae7dfc1e8d3a2ac8e44 Mon Sep 17 00:00:00 2001 From: seakider Date: Mon, 10 Feb 2025 22:18:16 +0800 Subject: [PATCH 3/3] Feature - RBlockingQueue.pollLastFromAnyWithName() Signed-off-by: seakider --- .../org/redisson/api/RBlockingQueueReactive.java | 15 +++++++++++++++ .../java/org/redisson/api/RBlockingQueueRx.java | 14 ++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java index 48ff2ba01..89fdebef9 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java @@ -61,6 +61,21 @@ public interface RBlockingQueueReactive extends RQueueReactive { */ Mono> pollFromAnyWithName(Duration timeout, String... queueNames); + /** + * Retrieves and removes first available tail element of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueNames queue names. Queue name itself is always included + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @return the tail of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + Mono> pollLastFromAnyWithName(Duration timeout, String... queueNames); + + /** * Retrieves and removes first available head elements of any queue, * waiting up to the specified wait time if necessary for an element to become available diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java index b5f9f7824..69188b92a 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java @@ -63,6 +63,20 @@ public interface RBlockingQueueRx extends RQueueRx { */ Maybe> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException; + /** + * Retrieves and removes first available tail element of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueNames queue names. Queue name itself is always included + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @return the tail of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + Maybe> pollLastFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException; + /** * Retrieves and removes first available head elements of any queue, * waiting up to the specified wait time if necessary for an element to become available