From 2bd463f4894366c9ea8b5917ba789e9bae9f24fb Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 24 Feb 2020 11:51:56 +0300 Subject: [PATCH] Feature - RPriorityQueue should extends RQueue and RPriorityDeque should extends RDeque #2610 --- .../RedissonPriorityBlockingQueue.java | 4 +- .../org/redisson/RedissonPriorityDeque.java | 73 ++++++++++++++---- .../org/redisson/RedissonPriorityQueue.java | 74 +++++++++++++------ .../java/org/redisson/api/RPriorityDeque.java | 2 +- .../java/org/redisson/api/RPriorityQueue.java | 2 +- 5 files changed, 114 insertions(+), 41 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index 995a80ad9..b0b6056a2 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -72,7 +72,7 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i protected void takeAsync(RPromise result, long delay, long timeoutInMicro, RedisCommand command, Object... params) { long start = System.currentTimeMillis(); commandExecutor.getConnectionManager().getGroup().schedule(() -> { - RFuture future = pollAsync(command, params); + RFuture future = wrapLockedAsync(command, params); future.onComplete((res, e) -> { if (e != null && !(e instanceof RedisConnectionException)) { result.tryFailure(e); @@ -220,7 +220,7 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i @Override public RFuture> pollAsync(int limit) { - return pollAsync(() -> { + return wrapLockedAsync(() -> { return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, "local result = {};" + "for i = 1, ARGV[1], 1 do " + diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java index c65dc3ea3..2891cf3af 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java @@ -15,10 +15,6 @@ */ package org.redisson; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.stream.Stream; - import org.redisson.api.RFuture; import org.redisson.api.RPriorityDeque; import org.redisson.api.RedissonClient; @@ -28,6 +24,12 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListFirstObjectDecoder; import org.redisson.command.CommandExecutor; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Stream; + /** * Distributed and concurrent implementation of {@link java.util.Queue} * @@ -48,26 +50,32 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement super(codec, commandExecutor, name, redisson); } + @Override public int addFirstIfExists(V... elements) { throw new UnsupportedOperationException("use add or put method"); } + @Override public int addLastIfExists(V... elements) { throw new UnsupportedOperationException("use add or put method"); } + @Override public RFuture addFirstIfExistsAsync(V... elements) { throw new UnsupportedOperationException("use add or put method"); } + @Override public RFuture addLastIfExistsAsync(V... elements) { throw new UnsupportedOperationException("use add or put method"); } + @Override public RFuture addFirstAsync(V e) { throw new UnsupportedOperationException("use add or put method"); } + @Override public RFuture addLastAsync(V e) { throw new UnsupportedOperationException("use add or put method"); } @@ -118,7 +126,7 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement }; } -// @Override + @Override public RFuture getLastAsync() { return commandExecutor.readAsync(getName(), codec, LRANGE_SINGLE, getName(), -1, -1); } @@ -150,7 +158,7 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement throw new UnsupportedOperationException("use add or put method"); } -// @Override + @Override public RFuture peekFirstAsync() { return getAsync(0); } @@ -179,7 +187,7 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement } public RFuture pollLastAsync() { - return pollAsync(RedisCommands.RPOP, getName()); + return wrapLockedAsync(RedisCommands.RPOP, getName()); } @Override @@ -187,7 +195,7 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement return get(pollLastAsync()); } -// @Override + @Override public RFuture popAsync() { return pollAsync(); } @@ -205,8 +213,15 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement public RFuture pushAsync(V e) { throw new UnsupportedOperationException("use add or put method"); } - -// @Override + + @Override + public RFuture removeAsync(Object o, int count) { + return wrapLockedAsync(() -> { + return super.removeAsync(o, count); + }); + } + + @Override public RFuture removeFirstOccurrenceAsync(Object o) { return removeAsync(o, 1); } @@ -216,12 +231,12 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement return remove(o, 1); } -// @Override + @Override public RFuture removeFirstAsync() { return pollAsync(); } -// @Override + @Override public RFuture removeLastAsync() { return pollLastAsync(); } @@ -235,7 +250,7 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement return value; } -// @Override + @Override public RFuture removeLastOccurrenceAsync(Object o) { return removeAsync(o, -1); } @@ -250,4 +265,36 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement return toStream(descendingIterator()); } + @Override + public RFuture> pollFirstAsync(int limit) { + return pollAsync(limit); + } + + @Override + public List pollFirst(int limit) { + return poll(limit); + } + + @Override + public List pollLast(int limit) { + return get(pollLastAsync(limit)); + } + + @Override + public RFuture> pollLastAsync(int limit) { + return wrapLockedAsync(() -> { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + "local result = {};" + + "for i = 1, ARGV[1], 1 do " + + "local value = redis.call('rpop', KEYS[1]);" + + "if value ~= false then " + + "table.insert(result, value);" + + "else " + + "return result;" + + "end;" + + "end; " + + "return result;", + Collections.singletonList(getName()), limit); + }); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java index 79a219eb6..a01992e53 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java @@ -15,24 +15,7 @@ */ package org.redisson; -import java.io.ByteArrayOutputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.math.BigInteger; -import java.security.MessageDigest; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import org.redisson.api.RBucket; -import org.redisson.api.RFuture; -import org.redisson.api.RLock; -import org.redisson.api.RPriorityQueue; -import org.redisson.api.RedissonClient; +import org.redisson.api.*; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; @@ -41,6 +24,15 @@ import org.redisson.command.CommandExecutor; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigInteger; +import java.security.MessageDigest; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + /** * * @author Nikita Koksharov @@ -286,17 +278,18 @@ public class RedissonPriorityQueue extends RedissonList implements RPriori return comparator; } + @Override public RFuture pollAsync() { - return pollAsync(RedisCommands.LPOP, getName()); + return wrapLockedAsync(RedisCommands.LPOP, getName()); } - protected RFuture pollAsync(RedisCommand command, Object... params) { - return pollAsync(() -> { + protected RFuture wrapLockedAsync(RedisCommand command, Object... params) { + return wrapLockedAsync(() -> { return commandExecutor.writeAsync(getName(), codec, command, params); }); }; - protected final RFuture pollAsync(Supplier> callable) { + protected final RFuture wrapLockedAsync(Supplier> callable) { long threadId = Thread.currentThread().getId(); RPromise result = new RedissonPromise(); lock.lockAsync(threadId).onComplete((r, exc) -> { @@ -343,7 +336,7 @@ public class RedissonPriorityQueue extends RedissonList implements RPriori return getFirst(); } -// @Override + @Override public RFuture peekAsync() { return getAsync(0); } @@ -435,8 +428,9 @@ public class RedissonPriorityQueue extends RedissonList implements RPriori return get(pollLastAndOfferFirstToAsync(queueName)); } + @Override public RFuture pollLastAndOfferFirstToAsync(String queueName) { - return pollAsync(RedisCommands.RPOPLPUSH, getName(), queueName); + return wrapLockedAsync(RedisCommands.RPOPLPUSH, getName(), queueName); } @Override @@ -459,4 +453,36 @@ public class RedissonPriorityQueue extends RedissonList implements RPriori return clearExpireAsync(getName(), getComparatorKeyName()); } + @Override + public List poll(int limit) { + return get(pollAsync(limit)); + } + + @Override + public RFuture offerAsync(V e) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture addAsync(V e) { + throw new UnsupportedOperationException(); + } + + @Override + public RFuture> pollAsync(int limit) { + return wrapLockedAsync(() -> { + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + "local result = {};" + + "for i = 1, ARGV[1], 1 do " + + "local value = redis.call('lpop', KEYS[1]);" + + "if value ~= false then " + + "table.insert(result, value);" + + "else " + + "return result;" + + "end;" + + "end; " + + "return result;", + Collections.singletonList(getName()), limit); + }); + } } diff --git a/redisson/src/main/java/org/redisson/api/RPriorityDeque.java b/redisson/src/main/java/org/redisson/api/RPriorityDeque.java index 56316091f..bde98475f 100644 --- a/redisson/src/main/java/org/redisson/api/RPriorityDeque.java +++ b/redisson/src/main/java/org/redisson/api/RPriorityDeque.java @@ -25,7 +25,7 @@ import java.util.stream.Stream; * * @param value type */ -public interface RPriorityDeque extends Deque, RPriorityQueue { +public interface RPriorityDeque extends RDeque, RPriorityQueue { /** * Returns stream of elements contained in this deque in reverse order diff --git a/redisson/src/main/java/org/redisson/api/RPriorityQueue.java b/redisson/src/main/java/org/redisson/api/RPriorityQueue.java index c6d2eb548..cfeb96866 100644 --- a/redisson/src/main/java/org/redisson/api/RPriorityQueue.java +++ b/redisson/src/main/java/org/redisson/api/RPriorityQueue.java @@ -27,7 +27,7 @@ import java.util.Queue; * * @param value type */ -public interface RPriorityQueue extends Queue, RObject { +public interface RPriorityQueue extends RQueue, RObject { /** * Returns comparator used by this queue