diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index bdcb0e94e..d412c6cee 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -255,6 +255,11 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay return get(readAllAsync()); } + @Override + public List poll(int limit) { + return get(pollAsync(limit)); + } + @Override public RFuture> readAllAsync() { return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_LIST, @@ -265,7 +270,25 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay + "table.insert(result, value);" + "end; " + "return result; ", - Collections.singletonList(queueName)); + Collections.singletonList(queueName)); + } + + @Override + public RFuture> pollAsync(int limit) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + "local result = {};" + + "for i = 1, ARGV[1], 1 do " + + "local v = redis.call('lpop', KEYS[1]);" + + "if v ~= false then " + + "redis.call('zrem', KEYS[2], v); " + + "local randomId, value = struct.unpack('dLc0', v);" + + "table.insert(result, value);" + + "else " + + "return result;" + + "end;" + + "end; " + + "return result;", + Arrays.asList(queueName, timeoutSetName), limit); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonDeque.java b/redisson/src/main/java/org/redisson/RedissonDeque.java index 8634692d1..1d39416c5 100644 --- a/redisson/src/main/java/org/redisson/RedissonDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonDeque.java @@ -15,7 +15,9 @@ */ package org.redisson; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; import org.redisson.api.RDeque; @@ -168,11 +170,41 @@ public class RedissonDeque extends RedissonQueue implements RDeque { return poll(); } + @Override + public RFuture> pollFirstAsync(int limit) { + return pollAsync(limit); + } + + @Override + public List pollFirst(int limit) { + return poll(limit); + } + @Override public RFuture pollLastAsync() { return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName()); } + @Override + public List pollLast(int limit) { + return get(pollLastAsync(limit)); + } + + @Override + public RFuture> pollLastAsync(int limit) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + "local result = {};" + + "for i = 1, ARGV[1], 1 do " + + "local value = redis.call('rpop', KEYS[1]);" + + "if value ~= false then " + + "table.insert(result, value);" + + "else " + + "return result;" + + "end;" + + "end; " + + "return result;", + Collections.singletonList(getName()), limit); + } @Override public V pollLast() { diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java index 06ba4140b..08a3ac083 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java @@ -16,6 +16,7 @@ package org.redisson; import java.util.Collection; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -142,6 +143,11 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i throw new UnsupportedOperationException("use offer method"); } + @Override + public RFuture> pollAsync(int limit) { + return null; + } + @Override public RFuture pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { throw new UnsupportedOperationException("use poll method"); @@ -258,4 +264,28 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i return commandExecutor.getInterrupted(pollLastAsync(timeout, unit)); } + @Override + public List poll(int limit) { + throw new UnsupportedOperationException(); + } + + @Override + public List pollLast(int limit) { + throw new UnsupportedOperationException(); + } + + @Override + public List pollFirst(int limit) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture> pollFirstAsync(int limit) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture> pollLastAsync(int limit) { + throw new UnsupportedOperationException(); + } } \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index dd099bf9f..111ba790b 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -17,6 +17,7 @@ package org.redisson; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -217,6 +218,11 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i throw new UnsupportedOperationException("use offer method"); } + @Override + public RFuture> pollAsync(int limit) { + throw new UnsupportedOperationException(); + } + @Override public RFuture pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) { throw new UnsupportedOperationException("use poll method"); @@ -226,4 +232,9 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i public RFuture putAsync(V e) { throw new UnsupportedOperationException("use add method"); } + + @Override + public List poll(int limit) { + throw new UnsupportedOperationException(); + } } \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonQueue.java b/redisson/src/main/java/org/redisson/RedissonQueue.java index 42d1c0e84..0ced69c82 100644 --- a/redisson/src/main/java/org/redisson/RedissonQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonQueue.java @@ -15,8 +15,6 @@ */ package org.redisson; -import java.util.NoSuchElementException; - import org.redisson.api.RFuture; import org.redisson.api.RQueue; import org.redisson.api.RedissonClient; @@ -24,6 +22,10 @@ import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + /** * Distributed and concurrent implementation of {@link java.util.Queue} * @@ -77,6 +79,27 @@ public class RedissonQueue extends RedissonList implements RQueue { return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPOP, getName()); } + @Override + public List poll(int limit) { + return get(pollAsync(limit)); + } + + @Override + public RFuture> pollAsync(int limit) { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + "local result = {};" + + "for i = 1, ARGV[1], 1 do " + + "local value = redis.call('lpop', KEYS[1]);" + + "if value ~= false then " + + "table.insert(result, value);" + + "else " + + "return result;" + + "end;" + + "end; " + + "return result;", + Collections.singletonList(getName()), limit); + } + @Override public V poll() { return get(pollAsync()); diff --git a/redisson/src/main/java/org/redisson/api/RDeque.java b/redisson/src/main/java/org/redisson/api/RDeque.java index a10adf33e..c2d2f5a28 100644 --- a/redisson/src/main/java/org/redisson/api/RDeque.java +++ b/redisson/src/main/java/org/redisson/api/RDeque.java @@ -16,6 +16,7 @@ package org.redisson.api; import java.util.Deque; +import java.util.List; /** * Distributed implementation of {@link java.util.Deque} @@ -26,5 +27,20 @@ import java.util.Deque; */ public interface RDeque extends Deque, RQueue, RDequeAsync { + /** + * Retrieves and removes the tail elements of this queue. + * Elements amount limited by limit param. + * + * @return list of tail elements + */ + List pollLast(int limit); + + /** + * Retrieves and removes the head elements of this queue. + * Elements amount limited by limit param. + * + * @return list of head elements + */ + List pollFirst(int limit); } diff --git a/redisson/src/main/java/org/redisson/api/RDequeAsync.java b/redisson/src/main/java/org/redisson/api/RDequeAsync.java index 727bb4281..1b5e3df57 100644 --- a/redisson/src/main/java/org/redisson/api/RDequeAsync.java +++ b/redisson/src/main/java/org/redisson/api/RDequeAsync.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import java.util.List; + /** * Distributed async implementation of {@link java.util.Deque} * @@ -144,4 +146,19 @@ public interface RDequeAsync extends RQueueAsync { */ RFuture offerFirstAsync(V e); + /** + * Retrieves and removes the head elements of this queue. + * Elements amount limited by limit param. + * + * @return list of head elements + */ + RFuture> pollFirstAsync(int limit); + + /** + * Retrieves and removes the tail elements of this queue. + * Elements amount limited by limit param. + * + * @return list of tail elements + */ + RFuture> pollLastAsync(int limit); } diff --git a/redisson/src/main/java/org/redisson/api/RDequeReactive.java b/redisson/src/main/java/org/redisson/api/RDequeReactive.java index 5b30d30f0..b58152a78 100644 --- a/redisson/src/main/java/org/redisson/api/RDequeReactive.java +++ b/redisson/src/main/java/org/redisson/api/RDequeReactive.java @@ -93,6 +93,22 @@ public interface RDequeReactive extends RQueueReactive { */ Mono pollFirst(); + /** + * Retrieves and removes the tail elements of this queue. + * Elements amount limited by limit param. + * + * @return list of tail elements + */ + Flux pollLast(int limit); + + /** + * Retrieves and removes the head elements of this queue. + * Elements amount limited by limit param. + * + * @return list of head elements + */ + Flux pollFirst(int limit); + /** * Returns element at the tail of this deque * or null if there are no elements in deque. diff --git a/redisson/src/main/java/org/redisson/api/RDequeRx.java b/redisson/src/main/java/org/redisson/api/RDequeRx.java index 609302564..5fdb36af3 100644 --- a/redisson/src/main/java/org/redisson/api/RDequeRx.java +++ b/redisson/src/main/java/org/redisson/api/RDequeRx.java @@ -95,6 +95,22 @@ public interface RDequeRx extends RQueueRx { */ Maybe pollFirst(); + /** + * Retrieves and removes the tail elements of this queue. + * Elements amount limited by limit param. + * + * @return list of tail elements + */ + Flowable pollLast(int limit); + + /** + * Retrieves and removes the head elements of this queue. + * Elements amount limited by limit param. + * + * @return list of head elements + */ + Flowable pollFirst(int limit); + /** * Returns element at the tail of this deque * or null if there are no elements in deque. diff --git a/redisson/src/main/java/org/redisson/api/RQueue.java b/redisson/src/main/java/org/redisson/api/RQueue.java index 003ddc82f..b4eddb64d 100644 --- a/redisson/src/main/java/org/redisson/api/RQueue.java +++ b/redisson/src/main/java/org/redisson/api/RQueue.java @@ -42,5 +42,13 @@ public interface RQueue extends Queue, RExpirable, RQueueAsync { * @return elements */ List readAll(); + + /** + * Retrieves and removes the head elements of this queue. + * Elements amount limited by limit param. + * + * @return list of head elements + */ + List poll(int limit); } diff --git a/redisson/src/main/java/org/redisson/api/RQueueAsync.java b/redisson/src/main/java/org/redisson/api/RQueueAsync.java index ff1eb96cb..bd49610a9 100644 --- a/redisson/src/main/java/org/redisson/api/RQueueAsync.java +++ b/redisson/src/main/java/org/redisson/api/RQueueAsync.java @@ -66,5 +66,13 @@ public interface RQueueAsync extends RCollectionAsync { * @return elements */ RFuture> readAllAsync(); - + + /** + * Retrieves and removes the head elements of this queue. + * Elements amount limited by limit param. + * + * @return list of head elements + */ + RFuture> pollAsync(int limit); + } diff --git a/redisson/src/main/java/org/redisson/api/RQueueReactive.java b/redisson/src/main/java/org/redisson/api/RQueueReactive.java index 4b7958ead..7fa6180c1 100644 --- a/redisson/src/main/java/org/redisson/api/RQueueReactive.java +++ b/redisson/src/main/java/org/redisson/api/RQueueReactive.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.List; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -42,6 +43,14 @@ public interface RQueueReactive extends RCollectionReactive { */ Mono poll(); + /** + * Retrieves and removes the head elements of this queue. + * Elements amount limited by limit param. + * + * @return list of head elements + */ + Flux poll(int limit); + /** * Inserts the specified element into this queue. * diff --git a/redisson/src/main/java/org/redisson/api/RQueueRx.java b/redisson/src/main/java/org/redisson/api/RQueueRx.java index a825f6be2..c17b51ae4 100644 --- a/redisson/src/main/java/org/redisson/api/RQueueRx.java +++ b/redisson/src/main/java/org/redisson/api/RQueueRx.java @@ -15,11 +15,12 @@ */ package org.redisson.api; -import java.util.List; - +import io.reactivex.Flowable; import io.reactivex.Maybe; import io.reactivex.Single; +import java.util.List; + /** * RxJava2 interface for Queue object * @@ -43,6 +44,14 @@ public interface RQueueRx extends RCollectionRx { */ Maybe poll(); + /** + * Retrieves and removes the head elements of this queue. + * Elements amount limited by limit param. + * + * @return list of head elements + */ + Flowable poll(int limit); + /** * Inserts the specified element into this queue. * diff --git a/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java b/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java index 6a48a8b9c..d67314741 100644 --- a/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonDelayedQueueTest.java @@ -197,9 +197,33 @@ public class RedissonDelayedQueueTest extends BaseTest { dealyedQueue.destroy(); } + + @Test + public void testPollLimited() throws InterruptedException { + RBlockingQueue queue = redisson.getBlockingQueue("test"); + RDelayedQueue dealyedQueue = redisson.getDelayedQueue(queue); + + dealyedQueue.offer("1", 1, TimeUnit.SECONDS); + dealyedQueue.offer("2", 2, TimeUnit.SECONDS); + dealyedQueue.offer("3", 3, TimeUnit.SECONDS); + dealyedQueue.offer("4", 4, TimeUnit.SECONDS); + + assertThat(dealyedQueue.poll(3)).containsExactly("1", "2", "3"); + assertThat(dealyedQueue.poll(2)).containsExactly("4"); + assertThat(dealyedQueue.poll(2)).isEmpty(); + + Thread.sleep(3000); + assertThat(queue.isEmpty()).isTrue(); + + assertThat(queue.poll()).isNull(); + assertThat(queue.poll()).isNull(); + + dealyedQueue.destroy(); + } + @Test - public void testDealyedQueuePoll() throws InterruptedException { + public void testPoll() throws InterruptedException { RBlockingQueue queue = redisson.getBlockingQueue("test"); RDelayedQueue dealyedQueue = redisson.getDelayedQueue(queue); diff --git a/redisson/src/test/java/org/redisson/RedissonQueueTest.java b/redisson/src/test/java/org/redisson/RedissonQueueTest.java index 4e210c8f4..59c05f0a2 100644 --- a/redisson/src/test/java/org/redisson/RedissonQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonQueueTest.java @@ -2,6 +2,8 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; +import java.util.Arrays; +import java.util.List; import java.util.NoSuchElementException; import org.junit.Assert; @@ -13,6 +15,18 @@ public class RedissonQueueTest extends BaseTest { RQueue getQueue() { return redisson.getQueue("queue"); } + + @Test + public void testPollLimited() { + RQueue queue = getQueue(); + queue.addAll(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + List elements = queue.poll(3); + assertThat(elements).containsExactly(1, 2, 3); + List elements2 = queue.poll(10); + assertThat(elements2).containsExactly(4, 5, 6, 7); + List elements3 = queue.poll(5); + assertThat(elements3).isEmpty(); + } @Test public void testAddOffer() {