|
|
|
@ -659,7 +659,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <T, R> RFuture<R> evalBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, String script, List<Object> keys, SlotCallback<T, R> callback) {
|
|
|
|
|
if (!connectionManager.isClusterMode()) {
|
|
|
|
|
if (!getServiceManager().getCfg().isClusterConfig()) {
|
|
|
|
|
Object[] keysArray = callback.createKeys(null, keys);
|
|
|
|
|
Object[] paramsArray = callback.createParams(Collections.emptyList());
|
|
|
|
|
if (readOnly) {
|
|
|
|
@ -740,7 +740,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object[] keys) {
|
|
|
|
|
if (!connectionManager.isClusterMode()) {
|
|
|
|
|
if (!getServiceManager().getCfg().isClusterConfig()) {
|
|
|
|
|
Object[] params = callback.createParams(Arrays.asList(keys));
|
|
|
|
|
if (readOnly) {
|
|
|
|
|
return readAsync((String) null, codec, command, params);
|
|
|
|
@ -881,7 +881,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
@Override
|
|
|
|
|
public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<?> command, long secondsTimeout, String... queueNames) {
|
|
|
|
|
List<String> mappedNames = Arrays.stream(queueNames).map(m -> connectionManager.getServiceManager().getConfig().getNameMapper().map(m)).collect(Collectors.toList());
|
|
|
|
|
if (connectionManager.isClusterMode() && queueNames.length > 0) {
|
|
|
|
|
if (getServiceManager().getCfg().isClusterConfig() && queueNames.length > 0) {
|
|
|
|
|
AtomicReference<Iterator<String>> ref = new AtomicReference<>();
|
|
|
|
|
List<String> names = new ArrayList<>();
|
|
|
|
|
names.add(name);
|
|
|
|
|