Feature - added poll() methods with limit to RQueue, RDeque, RDelayedQueue objects #2391

pull/2400/head
Nikita Koksharov 5 years ago
parent 758acbe5f3
commit 5621bdce23

@ -255,6 +255,11 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
return get(readAllAsync());
}
@Override
public List<V> poll(int limit) {
return get(pollAsync(limit));
}
@Override
public RFuture<List<V>> readAllAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_LIST,
@ -265,7 +270,25 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
+ "table.insert(result, value);"
+ "end; "
+ "return result; ",
Collections.<Object>singletonList(queueName));
Collections.singletonList(queueName));
}
@Override
public RFuture<List<V>> pollAsync(int limit) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local v = redis.call('lpop', KEYS[1]);" +
"if v ~= false then " +
"redis.call('zrem', KEYS[2], v); " +
"local randomId, value = struct.unpack('dLc0', v);" +
"table.insert(result, value);" +
"else " +
"return result;" +
"end;" +
"end; " +
"return result;",
Arrays.asList(queueName, timeoutSetName), limit);
}
@Override

@ -15,7 +15,9 @@
*/
package org.redisson;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.api.RDeque;
@ -168,11 +170,41 @@ public class RedissonDeque<V> extends RedissonQueue<V> implements RDeque<V> {
return poll();
}
@Override
public RFuture<List<V>> pollFirstAsync(int limit) {
return pollAsync(limit);
}
@Override
public List<V> pollFirst(int limit) {
return poll(limit);
}
@Override
public RFuture<V> pollLastAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.RPOP, getName());
}
@Override
public List<V> pollLast(int limit) {
return get(pollLastAsync(limit));
}
@Override
public RFuture<List<V>> pollLastAsync(int limit) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('rpop', KEYS[1]);" +
"if value ~= false then " +
"table.insert(result, value);" +
"else " +
"return result;" +
"end;" +
"end; " +
"return result;",
Collections.singletonList(getName()), limit);
}
@Override
public V pollLast() {

@ -16,6 +16,7 @@
package org.redisson;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -142,6 +143,11 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
throw new UnsupportedOperationException("use offer method");
}
@Override
public RFuture<List<V>> pollAsync(int limit) {
return null;
}
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
@ -258,4 +264,28 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
return commandExecutor.getInterrupted(pollLastAsync(timeout, unit));
}
@Override
public List<V> poll(int limit) {
throw new UnsupportedOperationException();
}
@Override
public List<V> pollLast(int limit) {
throw new UnsupportedOperationException();
}
@Override
public List<V> pollFirst(int limit) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<List<V>> pollFirstAsync(int limit) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<List<V>> pollLastAsync(int limit) {
throw new UnsupportedOperationException();
}
}

@ -17,6 +17,7 @@ package org.redisson;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -217,6 +218,11 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
throw new UnsupportedOperationException("use offer method");
}
@Override
public RFuture<List<V>> pollAsync(int limit) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
throw new UnsupportedOperationException("use poll method");
@ -226,4 +232,9 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
public RFuture<Void> putAsync(V e) {
throw new UnsupportedOperationException("use add method");
}
@Override
public List<V> poll(int limit) {
throw new UnsupportedOperationException();
}
}

@ -15,8 +15,6 @@
*/
package org.redisson;
import java.util.NoSuchElementException;
import org.redisson.api.RFuture;
import org.redisson.api.RQueue;
import org.redisson.api.RedissonClient;
@ -24,6 +22,10 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
/**
* Distributed and concurrent implementation of {@link java.util.Queue}
*
@ -77,6 +79,27 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.LPOP, getName());
}
@Override
public List<V> poll(int limit) {
return get(pollAsync(limit));
}
@Override
public RFuture<List<V>> pollAsync(int limit) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('lpop', KEYS[1]);" +
"if value ~= false then " +
"table.insert(result, value);" +
"else " +
"return result;" +
"end;" +
"end; " +
"return result;",
Collections.singletonList(getName()), limit);
}
@Override
public V poll() {
return get(pollAsync());

@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.Deque;
import java.util.List;
/**
* Distributed implementation of {@link java.util.Deque}
@ -26,5 +27,20 @@ import java.util.Deque;
*/
public interface RDeque<V> extends Deque<V>, RQueue<V>, RDequeAsync<V> {
/**
* Retrieves and removes the tail elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of tail elements
*/
List<V> pollLast(int limit);
/**
* Retrieves and removes the head elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of head elements
*/
List<V> pollFirst(int limit);
}

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import java.util.List;
/**
* Distributed async implementation of {@link java.util.Deque}
*
@ -144,4 +146,19 @@ public interface RDequeAsync<V> extends RQueueAsync<V> {
*/
RFuture<Boolean> offerFirstAsync(V e);
/**
* Retrieves and removes the head elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of head elements
*/
RFuture<List<V>> pollFirstAsync(int limit);
/**
* Retrieves and removes the tail elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of tail elements
*/
RFuture<List<V>> pollLastAsync(int limit);
}

@ -93,6 +93,22 @@ public interface RDequeReactive<V> extends RQueueReactive<V> {
*/
Mono<V> pollFirst();
/**
* Retrieves and removes the tail elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of tail elements
*/
Flux<V> pollLast(int limit);
/**
* Retrieves and removes the head elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of head elements
*/
Flux<V> pollFirst(int limit);
/**
* Returns element at the tail of this deque
* or <code>null</code> if there are no elements in deque.

@ -95,6 +95,22 @@ public interface RDequeRx<V> extends RQueueRx<V> {
*/
Maybe<V> pollFirst();
/**
* Retrieves and removes the tail elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of tail elements
*/
Flowable<V> pollLast(int limit);
/**
* Retrieves and removes the head elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of head elements
*/
Flowable<V> pollFirst(int limit);
/**
* Returns element at the tail of this deque
* or <code>null</code> if there are no elements in deque.

@ -42,5 +42,13 @@ public interface RQueue<V> extends Queue<V>, RExpirable, RQueueAsync<V> {
* @return elements
*/
List<V> readAll();
/**
* Retrieves and removes the head elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of head elements
*/
List<V> poll(int limit);
}

@ -66,5 +66,13 @@ public interface RQueueAsync<V> extends RCollectionAsync<V> {
* @return elements
*/
RFuture<List<V>> readAllAsync();
/**
* Retrieves and removes the head elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of head elements
*/
RFuture<List<V>> pollAsync(int limit);
}

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.List;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@ -42,6 +43,14 @@ public interface RQueueReactive<V> extends RCollectionReactive<V> {
*/
Mono<V> poll();
/**
* Retrieves and removes the head elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of head elements
*/
Flux<V> poll(int limit);
/**
* Inserts the specified element into this queue.
*

@ -15,11 +15,12 @@
*/
package org.redisson.api;
import java.util.List;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
import java.util.List;
/**
* RxJava2 interface for Queue object
*
@ -43,6 +44,14 @@ public interface RQueueRx<V> extends RCollectionRx<V> {
*/
Maybe<V> poll();
/**
* Retrieves and removes the head elements of this queue.
* Elements amount limited by <code>limit</code> param.
*
* @return list of head elements
*/
Flowable<V> poll(int limit);
/**
* Inserts the specified element into this queue.
*

@ -197,9 +197,33 @@ public class RedissonDelayedQueueTest extends BaseTest {
dealyedQueue.destroy();
}
@Test
public void testPollLimited() throws InterruptedException {
RBlockingQueue<String> queue = redisson.getBlockingQueue("test");
RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue);
dealyedQueue.offer("1", 1, TimeUnit.SECONDS);
dealyedQueue.offer("2", 2, TimeUnit.SECONDS);
dealyedQueue.offer("3", 3, TimeUnit.SECONDS);
dealyedQueue.offer("4", 4, TimeUnit.SECONDS);
assertThat(dealyedQueue.poll(3)).containsExactly("1", "2", "3");
assertThat(dealyedQueue.poll(2)).containsExactly("4");
assertThat(dealyedQueue.poll(2)).isEmpty();
Thread.sleep(3000);
assertThat(queue.isEmpty()).isTrue();
assertThat(queue.poll()).isNull();
assertThat(queue.poll()).isNull();
dealyedQueue.destroy();
}
@Test
public void testDealyedQueuePoll() throws InterruptedException {
public void testPoll() throws InterruptedException {
RBlockingQueue<String> queue = redisson.getBlockingQueue("test");
RDelayedQueue<String> dealyedQueue = redisson.getDelayedQueue(queue);

@ -2,6 +2,8 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import org.junit.Assert;
@ -13,6 +15,18 @@ public class RedissonQueueTest extends BaseTest {
<T> RQueue<T> getQueue() {
return redisson.getQueue("queue");
}
@Test
public void testPollLimited() {
RQueue<Integer> queue = getQueue();
queue.addAll(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
List<Integer> elements = queue.poll(3);
assertThat(elements).containsExactly(1, 2, 3);
List<Integer> elements2 = queue.poll(10);
assertThat(elements2).containsExactly(4, 5, 6, 7);
List<Integer> elements3 = queue.poll(5);
assertThat(elements3).isEmpty();
}
@Test
public void testAddOffer() {

Loading…
Cancel
Save