|
|
|
@ -32,7 +32,6 @@ import java.util.concurrent.CompletionStage;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
import java.util.function.Function;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* <p>Distributed and concurrent implementation of {@link java.util.concurrent.BlockingQueue}.
|
|
|
|
@ -153,7 +152,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, List<V>>> pollFirstFromAnyAsync(Duration duration, int count, String... queueNames) {
|
|
|
|
|
List<String> mappedNames = Arrays.stream(queueNames).map(m -> getServiceManager().getConfig().getNameMapper().map(m)).collect(Collectors.toList());
|
|
|
|
|
List<String> mappedNames = map(queueNames);
|
|
|
|
|
List<Object> params = new ArrayList<>();
|
|
|
|
|
params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS));
|
|
|
|
|
params.add(queueNames.length + 1);
|
|
|
|
@ -172,7 +171,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Map<String, List<V>>> pollLastFromAnyAsync(Duration duration, int count, String... queueNames) {
|
|
|
|
|
List<String> mappedNames = Arrays.stream(queueNames).map(m -> getServiceManager().getConfig().getNameMapper().map(m)).collect(Collectors.toList());
|
|
|
|
|
List<String> mappedNames = map(queueNames);
|
|
|
|
|
List<Object> params = new ArrayList<>();
|
|
|
|
|
params.add(toSeconds(duration.getSeconds(), TimeUnit.SECONDS));
|
|
|
|
|
params.add(queueNames.length + 1);
|
|
|
|
@ -190,7 +189,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
|
|
|
|
|
return new CompletableFutureWrapper<>((V) null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String mappedName = getServiceManager().getConfig().getNameMapper().map(queueName);
|
|
|
|
|
String mappedName = mapName(queueName);
|
|
|
|
|
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BRPOPLPUSH, getRawName(), mappedName, toSeconds(timeout, unit));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|