|
|
|
@ -46,6 +46,7 @@ import java.util.*;
|
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
@ -482,6 +483,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
}
|
|
|
|
|
return result.toArray();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private AtomicBoolean evalShaROSupported = new AtomicBoolean(true);
|
|
|
|
|
|
|
|
|
|
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType,
|
|
|
|
|
String script, List<Object> keys, boolean noRetry, Object... params) {
|
|
|
|
@ -489,10 +492,15 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
CompletableFuture<R> mainPromise = new CompletableFuture<>();
|
|
|
|
|
|
|
|
|
|
Object[] pps = copy(params);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CompletableFuture<R> promise = new CompletableFuture<>();
|
|
|
|
|
String sha1 = calcSHA(script);
|
|
|
|
|
RedisCommand cmd = new RedisCommand(evalCommandType, "EVALSHA");
|
|
|
|
|
RedisCommand cmd;
|
|
|
|
|
if (readOnlyMode && evalShaROSupported.get()) {
|
|
|
|
|
cmd = new RedisCommand(evalCommandType, "EVALSHA_RO");
|
|
|
|
|
} else {
|
|
|
|
|
cmd = new RedisCommand(evalCommandType, "EVALSHA");
|
|
|
|
|
}
|
|
|
|
|
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
|
|
|
|
|
args.add(sha1);
|
|
|
|
|
args.add(keys.size());
|
|
|
|
@ -506,7 +514,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
|
|
|
|
|
promise.whenComplete((res, e) -> {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
if (e.getMessage().startsWith("NOSCRIPT")) {
|
|
|
|
|
if (e.getMessage().startsWith("ERR unknown command")) {
|
|
|
|
|
evalShaROSupported.set(false);
|
|
|
|
|
free(pps);
|
|
|
|
|
RFuture<R> future = evalAsync(nodeSource, readOnlyMode, codec, evalCommandType, script, keys, noRetry, params);
|
|
|
|
|
transfer(future.toCompletableFuture(), mainPromise);
|
|
|
|
|
} else if (e.getMessage().startsWith("NOSCRIPT")) {
|
|
|
|
|
RFuture<String> loadFuture = loadScript(executor.getRedisClient(), script);
|
|
|
|
|
loadFuture.whenComplete((r, ex) -> {
|
|
|
|
|
if (ex != null) {
|
|
|
|
@ -515,7 +528,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RedisCommand command = new RedisCommand(evalCommandType, "EVALSHA");
|
|
|
|
|
List<Object> newargs = new ArrayList<Object>(2 + keys.size() + params.length);
|
|
|
|
|
newargs.add(sha1);
|
|
|
|
|
newargs.add(keys.size());
|
|
|
|
@ -527,14 +539,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
ns = new NodeSource(nodeSource, executor.getRedisClient());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RFuture<R> future = async(readOnlyMode, ns, codec, command, newargs.toArray(), false, noRetry);
|
|
|
|
|
future.whenComplete((re, exс) -> {
|
|
|
|
|
if (exс != null) {
|
|
|
|
|
mainPromise.completeExceptionally(exс);
|
|
|
|
|
} else {
|
|
|
|
|
mainPromise.complete(re);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
RFuture<R> future = async(readOnlyMode, ns, codec, cmd, newargs.toArray(), false, noRetry);
|
|
|
|
|
transfer(future.toCompletableFuture(), mainPromise);
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
free(pps);
|
|
|
|
|