From 13c5c95f4d6a037dfcbd73e5a26864dc497e6180 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 26 Sep 2023 08:37:49 +0300 Subject: [PATCH] Feature - RBlockingQueue.pollFromAnyWithName() method added. #333 --- .../org/redisson/RedissonBlockingDeque.java | 11 +++++ .../org/redisson/RedissonBlockingQueue.java | 16 +++++++ .../RedissonBoundedBlockingQueue.java | 12 +++++ .../RedissonPriorityBlockingDeque.java | 11 +++++ .../RedissonPriorityBlockingQueue.java | 11 +++++ .../org/redisson/RedissonTransferQueue.java | 15 +++++-- .../src/main/java/org/redisson/api/Entry.java | 45 +++++++++++++++++++ .../java/org/redisson/api/RBlockingQueue.java | 14 ++++++ .../org/redisson/api/RBlockingQueueAsync.java | 12 +++++ .../redisson/api/RBlockingQueueReactive.java | 14 ++++++ .../org/redisson/api/RBlockingQueueRx.java | 14 ++++++ .../client/protocol/RedisCommands.java | 11 +++++ .../redisson/RedissonBlockingQueueTest.java | 22 +++++++++ 13 files changed, 204 insertions(+), 4 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/Entry.java diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index c87a349a2..707bcbf28 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.redisson.api.Entry; import org.redisson.api.RBlockingDeque; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -118,6 +119,16 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock return blockingQueue.pollFromAnyAsync(timeout, unit); } + @Override + public Entry pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + return blockingQueue.pollFromAnyWithName(timeout, queueNames); + } + + @Override + public RFuture> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) { + return blockingQueue.pollFromAnyWithNameAsync(timeout, queueNames); + } + @Override public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { return blockingQueue.pollLastAndOfferFirstToAsync(queueName, timeout, unit); diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index 11fad8708..e4661fa34 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.Entry; import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -114,6 +115,21 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock return commandExecutor.getInterrupted(pollFromAnyAsync(timeout, unit, queueNames)); } + @Override + public Entry pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + return commandExecutor.getInterrupted(pollFromAnyWithNameAsync(timeout, queueNames)); + } + + @Override + public RFuture> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) { + if (timeout.toMillis() < 0) { + return new CompletableFutureWrapper<>((Entry) null); + } + + return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_NAME, + toSeconds(timeout.toMillis(), TimeUnit.MILLISECONDS), queueNames); + } + /* * (non-Javadoc) * @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[]) diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index ae83754b6..80e50ab22 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.Entry; import org.redisson.api.RBoundedBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -219,6 +220,17 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements return wrapTakeFuture(takeFuture); } + @Override + public Entry pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + return commandExecutor.getInterrupted(pollFromAnyWithNameAsync(timeout, queueNames)); + } + + @Override + public RFuture> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) { + RFuture> takeFuture = blockingQueue.pollFromAnyWithNameAsync(timeout, queueNames); + return wrapTakeFuture(takeFuture); + } + @Override public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) { return get(pollFirstFromAnyAsync(duration, count, queueNames)); diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java index 5defb0f3a..aa4613d0c 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.Entry; import org.redisson.api.RFuture; import org.redisson.api.RPriorityBlockingDeque; import org.redisson.api.RedissonClient; @@ -89,6 +90,16 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i throw new UnsupportedOperationException("use poll method"); } + @Override + public Entry pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + + @Override + public RFuture> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + @Override public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { throw new UnsupportedOperationException("use poll method"); diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index 0ea78bbe9..5c23a6c33 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.Entry; import org.redisson.api.RFuture; import org.redisson.api.RPriorityBlockingQueue; import org.redisson.api.RedissonClient; @@ -130,6 +131,11 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i throw new UnsupportedOperationException("use poll method"); } + @Override + public Entry pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException("use poll method"); + } + @Override public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { throw new UnsupportedOperationException("use poll method"); @@ -263,6 +269,11 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i throw new UnsupportedOperationException("use poll method"); } + @Override + public RFuture> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) { + throw new UnsupportedOperationException("use poll method"); + } + @Override public RFuture putAsync(V e) { throw new UnsupportedOperationException("use add method"); diff --git a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java index 71ae95cf6..8d7dfc768 100644 --- a/redisson/src/main/java/org/redisson/RedissonTransferQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonTransferQueue.java @@ -18,10 +18,7 @@ package org.redisson; import io.netty.buffer.ByteBuf; import io.netty.util.Timeout; import io.netty.util.concurrent.ImmediateEventExecutor; -import org.redisson.api.RFuture; -import org.redisson.api.RRemoteService; -import org.redisson.api.RTransferQueue; -import org.redisson.api.RemoteInvocationOptions; +import org.redisson.api.*; import org.redisson.api.annotation.RRemoteAsync; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; @@ -605,6 +602,16 @@ public class RedissonTransferQueue extends RedissonExpirable implements RTran throw new UnsupportedOperationException(); } + @Override + public Entry pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture> pollFromAnyWithNameAsync(Duration timeout, String... queueNames) { + throw new UnsupportedOperationException(); + } + @Override public Map> pollFirstFromAny(Duration duration, int count, String... queueNames) throws InterruptedException { throw new UnsupportedOperationException(); diff --git a/redisson/src/main/java/org/redisson/api/Entry.java b/redisson/src/main/java/org/redisson/api/Entry.java new file mode 100644 index 000000000..3a02de444 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/Entry.java @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +/** + * + * @author Nikita Koksharov + * + */ +public class Entry { + + private K key; + private V value; + + public Entry() { + + } + + public Entry(K key, V value) { + this.key = key; + this.value = value; + } + + public V getValue() { + return value; + } + + public K getKey() { + return key; + } + +} \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java index 9bb1c06a8..7b87ddc49 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java @@ -46,6 +46,20 @@ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockin */ V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException; + /** + * Retrieves and removes first available head element of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueNames queue names. Queue name itself is always included + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @return the head of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + Entry pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException; + /** * Retrieves and removes first available head elements of any queue, * waiting up to the specified wait time if necessary for an element to become available diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java index 3b41bf0ad..1602b0df4 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java @@ -45,6 +45,18 @@ public interface RBlockingQueueAsync extends RQueueAsync { */ RFuture pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames); + /** + * Retrieves and removes first available head element of any queue in async mode, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueNames - queue names. Queue name itself is always included + * @param timeout how long to wait before giving up + * @return Future object with the head of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + */ + RFuture> pollFromAnyWithNameAsync(Duration timeout, String... queueNames); + /** * Retrieves and removes first available head elements of any queue, * waiting up to the specified wait time if necessary for an element to become available diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java index 665d27bb7..2909d63f3 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java @@ -47,6 +47,20 @@ public interface RBlockingQueueReactive extends RQueueReactive { */ Mono pollFromAny(long timeout, TimeUnit unit, String... queueNames); + /** + * Retrieves and removes first available head element of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueNames queue names. Queue name itself is always included + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @return the head of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + Mono> pollFromAnyWithName(Duration timeout, String... queueNames); + /** * Retrieves and removes first available head elements of any queue, * waiting up to the specified wait time if necessary for an element to become available diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java index 029632255..f1416a828 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java @@ -49,6 +49,20 @@ public interface RBlockingQueueRx extends RQueueRx { */ Maybe pollFromAny(long timeout, TimeUnit unit, String... queueNames); + /** + * Retrieves and removes first available head element of any queue, + * waiting up to the specified wait time if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueNames queue names. Queue name itself is always included + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @return the head of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ + Maybe> pollFromAnyWithName(Duration timeout, String... queueNames) throws InterruptedException; + /** * Retrieves and removes first available head elements of any queue, * waiting up to the specified wait time if necessary for an element to become available diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index e62fe8f34..6b3a87c42 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -289,6 +289,17 @@ public interface RedisCommands { RedisCommand BZPOPMIN_VALUE = new RedisCommand("BZPOPMIN", new ScoredSortedSetPolledObjectDecoder()); RedisCommand BZPOPMAX_VALUE = new RedisCommand("BZPOPMAX", new ScoredSortedSetPolledObjectDecoder()); + RedisCommand>> BLPOP_NAME = new RedisCommand<>("BLPOP", + new ListObjectDecoder(0) { + @Override + public Object decode(List parts, State state) { + if (parts.isEmpty()) { + return null; + } + return new org.redisson.api.Entry<>(parts.get(0), parts.get(1)); + } + }); + RedisCommand>> BLMPOP = new RedisCommand<>("BLMPOP", new ListMultiDecoder2( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) { diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 4599d8b74..50f723265 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.RedisRunner.RedisProcess; +import org.redisson.api.Entry; import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; @@ -508,6 +509,27 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { Assertions.assertTrue(System.currentTimeMillis() - s > 2000); } + @Test + public void testPollFromAnyWithName() throws InterruptedException { + final RBlockingQueue queue1 = redisson.getBlockingQueue("queue:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingQueue queue2 = redisson.getBlockingQueue("queue:pollany1"); + RBlockingQueue queue3 = redisson.getBlockingQueue("queue:pollany2"); + Assertions.assertDoesNotThrow(() -> { + queue3.put(2); + queue1.put(1); + queue2.put(3); + }); + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + Entry r = queue1.pollFromAnyWithName(Duration.ofSeconds(4), "queue:pollany1", "queue:pollany2"); + + assertThat(r.getKey()).isEqualTo("queue:pollany2"); + assertThat(r.getValue()).isEqualTo(2); + Assertions.assertTrue(System.currentTimeMillis() - s > 2000); + } + @Test public void testPollFirstFromAny() throws InterruptedException { // Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);