|
|
|
@ -589,9 +589,32 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
NodeSource source = getNodeSource(key);
|
|
|
|
|
return async(false, source, codec, command, params, false, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private final AtomicBoolean sortRoSupported = new AtomicBoolean(true);
|
|
|
|
|
|
|
|
|
|
public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec,
|
|
|
|
|
RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
|
|
|
|
|
if (readOnlyMode && command.getName().equals("SORT") && !sortRoSupported.get()) {
|
|
|
|
|
readOnlyMode = false;
|
|
|
|
|
} else if (readOnlyMode && command.getName().equals("SORT") && sortRoSupported.get()) {
|
|
|
|
|
RedisCommand cmd = new RedisCommand("SORT_RO", command.getReplayMultiDecoder());
|
|
|
|
|
CompletableFuture<R> mainPromise = createPromise();
|
|
|
|
|
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, cmd, params, mainPromise,
|
|
|
|
|
ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
|
|
|
|
|
executor.execute();
|
|
|
|
|
CompletableFuture<R> result = new CompletableFuture<>();
|
|
|
|
|
mainPromise.whenComplete((r, e) -> {
|
|
|
|
|
if (e != null && e.getMessage().startsWith("ERR unknown command")) {
|
|
|
|
|
sortRoSupported.set(false);
|
|
|
|
|
RFuture<R> future = async(false, source, codec, command, params, ignoreRedirect, noRetry);
|
|
|
|
|
transfer(future.toCompletableFuture(), result);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
transfer(mainPromise, result);
|
|
|
|
|
});
|
|
|
|
|
return new CompletableFutureWrapper<>(result);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompletableFuture<R> mainPromise = createPromise();
|
|
|
|
|
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise,
|
|
|
|
|
ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
|
|
|
|
|