Feature - RPriorityQueue should extends RQueue and RPriorityDeque should extends RDeque #2610

pull/2611/head
Nikita Koksharov 5 years ago
parent ddfcc61bf9
commit 2bd463f489

@ -72,7 +72,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
protected <T> void takeAsync(RPromise<V> result, long delay, long timeoutInMicro, RedisCommand<T> command, Object... params) {
long start = System.currentTimeMillis();
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
RFuture<V> future = pollAsync(command, params);
RFuture<V> future = wrapLockedAsync(command, params);
future.onComplete((res, e) -> {
if (e != null && !(e instanceof RedisConnectionException)) {
result.tryFailure(e);
@ -220,7 +220,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
@Override
public RFuture<List<V>> pollAsync(int limit) {
return pollAsync(() -> {
return wrapLockedAsync(() -> {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +

@ -15,10 +15,6 @@
*/
package org.redisson;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.stream.Stream;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityDeque;
import org.redisson.api.RedissonClient;
@ -28,6 +24,12 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.command.CommandExecutor;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Stream;
/**
* Distributed and concurrent implementation of {@link java.util.Queue}
*
@ -48,26 +50,32 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
super(codec, commandExecutor, name, redisson);
}
@Override
public int addFirstIfExists(V... elements) {
throw new UnsupportedOperationException("use add or put method");
}
@Override
public int addLastIfExists(V... elements) {
throw new UnsupportedOperationException("use add or put method");
}
@Override
public RFuture<Integer> addFirstIfExistsAsync(V... elements) {
throw new UnsupportedOperationException("use add or put method");
}
@Override
public RFuture<Integer> addLastIfExistsAsync(V... elements) {
throw new UnsupportedOperationException("use add or put method");
}
@Override
public RFuture<Void> addFirstAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
@Override
public RFuture<Void> addLastAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
@ -118,7 +126,7 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
};
}
// @Override
@Override
public RFuture<V> getLastAsync() {
return commandExecutor.readAsync(getName(), codec, LRANGE_SINGLE, getName(), -1, -1);
}
@ -150,7 +158,7 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
throw new UnsupportedOperationException("use add or put method");
}
// @Override
@Override
public RFuture<V> peekFirstAsync() {
return getAsync(0);
}
@ -179,7 +187,7 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
}
public RFuture<V> pollLastAsync() {
return pollAsync(RedisCommands.RPOP, getName());
return wrapLockedAsync(RedisCommands.RPOP, getName());
}
@Override
@ -187,7 +195,7 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
return get(pollLastAsync());
}
// @Override
@Override
public RFuture<V> popAsync() {
return pollAsync();
}
@ -205,8 +213,15 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
public RFuture<Void> pushAsync(V e) {
throw new UnsupportedOperationException("use add or put method");
}
// @Override
@Override
public RFuture<Boolean> removeAsync(Object o, int count) {
return wrapLockedAsync(() -> {
return super.removeAsync(o, count);
});
}
@Override
public RFuture<Boolean> removeFirstOccurrenceAsync(Object o) {
return removeAsync(o, 1);
}
@ -216,12 +231,12 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
return remove(o, 1);
}
// @Override
@Override
public RFuture<V> removeFirstAsync() {
return pollAsync();
}
// @Override
@Override
public RFuture<V> removeLastAsync() {
return pollLastAsync();
}
@ -235,7 +250,7 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
return value;
}
// @Override
@Override
public RFuture<Boolean> removeLastOccurrenceAsync(Object o) {
return removeAsync(o, -1);
}
@ -250,4 +265,36 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
return toStream(descendingIterator());
}
@Override
public RFuture<List<V>> pollFirstAsync(int limit) {
return pollAsync(limit);
}
@Override
public List<V> pollFirst(int limit) {
return poll(limit);
}
@Override
public List<V> pollLast(int limit) {
return get(pollLastAsync(limit));
}
@Override
public RFuture<List<V>> pollLastAsync(int limit) {
return wrapLockedAsync(() -> {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('rpop', KEYS[1]);" +
"if value ~= false then " +
"table.insert(result, value);" +
"else " +
"return result;" +
"end;" +
"end; " +
"return result;",
Collections.singletonList(getName()), limit);
});
}
}

@ -15,24 +15,7 @@
*/
package org.redisson;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPriorityQueue;
import org.redisson.api.RedissonClient;
import org.redisson.api.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
@ -41,6 +24,15 @@ import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
*
* @author Nikita Koksharov
@ -286,17 +278,18 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
return comparator;
}
@Override
public RFuture<V> pollAsync() {
return pollAsync(RedisCommands.LPOP, getName());
return wrapLockedAsync(RedisCommands.LPOP, getName());
}
protected <T> RFuture<V> pollAsync(RedisCommand<T> command, Object... params) {
return pollAsync(() -> {
protected <T> RFuture<V> wrapLockedAsync(RedisCommand<T> command, Object... params) {
return wrapLockedAsync(() -> {
return commandExecutor.writeAsync(getName(), codec, command, params);
});
};
protected final <T, R> RFuture<R> pollAsync(Supplier<RFuture<R>> callable) {
protected final <T, R> RFuture<R> wrapLockedAsync(Supplier<RFuture<R>> callable) {
long threadId = Thread.currentThread().getId();
RPromise<R> result = new RedissonPromise<R>();
lock.lockAsync(threadId).onComplete((r, exc) -> {
@ -343,7 +336,7 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
return getFirst();
}
// @Override
@Override
public RFuture<V> peekAsync() {
return getAsync(0);
}
@ -435,8 +428,9 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
return get(pollLastAndOfferFirstToAsync(queueName));
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName) {
return pollAsync(RedisCommands.RPOPLPUSH, getName(), queueName);
return wrapLockedAsync(RedisCommands.RPOPLPUSH, getName(), queueName);
}
@Override
@ -459,4 +453,36 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
return clearExpireAsync(getName(), getComparatorKeyName());
}
@Override
public List<V> poll(int limit) {
return get(pollAsync(limit));
}
@Override
public RFuture<Boolean> offerAsync(V e) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> addAsync(V e) {
throw new UnsupportedOperationException();
}
@Override
public RFuture<List<V>> pollAsync(int limit) {
return wrapLockedAsync(() -> {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('lpop', KEYS[1]);" +
"if value ~= false then " +
"table.insert(result, value);" +
"else " +
"return result;" +
"end;" +
"end; " +
"return result;",
Collections.singletonList(getName()), limit);
});
}
}

@ -25,7 +25,7 @@ import java.util.stream.Stream;
*
* @param <V> value type
*/
public interface RPriorityDeque<V> extends Deque<V>, RPriorityQueue<V> {
public interface RPriorityDeque<V> extends RDeque<V>, RPriorityQueue<V> {
/**
* Returns stream of elements contained in this deque in reverse order

@ -27,7 +27,7 @@ import java.util.Queue;
*
* @param <V> value type
*/
public interface RPriorityQueue<V> extends Queue<V>, RObject {
public interface RPriorityQueue<V> extends RQueue<V>, RObject {
/**
* Returns comparator used by this queue

Loading…
Cancel
Save