|
|
|
@ -23,6 +23,7 @@ import org.redisson.client.protocol.RedisCommand;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.command.CommandAsyncExecutor;
|
|
|
|
|
import org.redisson.connection.decoder.ListDrainToDecoder;
|
|
|
|
|
import org.redisson.misc.CompletableFutureWrapper;
|
|
|
|
|
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
import java.util.*;
|
|
|
|
@ -89,6 +90,9 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
|
|
|
|
|
if (timeout < 0) {
|
|
|
|
|
return new CompletableFutureWrapper<>((V) null);
|
|
|
|
|
}
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), toSeconds(timeout, unit));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -116,6 +120,10 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String... queueNames) {
|
|
|
|
|
if (timeout < 0) {
|
|
|
|
|
return new CompletableFutureWrapper<>((V) null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return commandExecutor.pollFromAnyAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE,
|
|
|
|
|
toSeconds(timeout, unit), queueNames);
|
|
|
|
|
}
|
|
|
|
@ -160,6 +168,10 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
|
|
|
|
|
if (timeout < 0) {
|
|
|
|
|
return new CompletableFutureWrapper<>((V) null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String mappedName = commandExecutor.getConnectionManager().getConfig().getNameMapper().map(queueName);
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BRPOPLPUSH, getRawName(), mappedName, toSeconds(timeout, unit));
|
|
|
|
|
}
|
|
|
|
|