|
|
|
@ -72,6 +72,10 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected <T> void takeAsync(CompletableFuture<V> result, long delay, long timeoutInMicro, RedisCommand<T> command, Object... params) {
|
|
|
|
|
if (timeoutInMicro < 0) {
|
|
|
|
|
result.complete(null);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
|
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
|
|
|
|
|
RFuture<V> future = wrapLockedAsync(command, params);
|
|
|
|
@ -115,9 +119,6 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
|
|
|
|
|
if (timeout < 0) {
|
|
|
|
|
return new CompletableFutureWrapper<>((V) null);
|
|
|
|
|
}
|
|
|
|
|
CompletableFuture<V> result = new CompletableFuture<V>();
|
|
|
|
|
takeAsync(result, 0, unit.toMicros(timeout), RedisCommands.LPOP, getRawName());
|
|
|
|
|
return new CompletableFutureWrapper<>(result);
|
|
|
|
@ -155,9 +156,6 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
|
|
|
|
|
if (timeout < 0) {
|
|
|
|
|
return new CompletableFutureWrapper<>((V) null);
|
|
|
|
|
}
|
|
|
|
|
CompletableFuture<V> result = new CompletableFuture<V>();
|
|
|
|
|
takeAsync(result, 0, unit.toMicros(timeout), RedisCommands.RPOPLPUSH, getRawName(), queueName);
|
|
|
|
|
return new CompletableFutureWrapper<>(result);
|
|
|
|
|