|
|
|
@ -484,8 +484,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
return connectionManager.getServiceManager().getCfg().isUseScriptCache();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected final Object[] copy(Object[] params) {
|
|
|
|
|
List<Object> result = new ArrayList<>(params.length);
|
|
|
|
|
protected final List<Object> copy(List<Object> params) {
|
|
|
|
|
List<Object> result = new ArrayList<>(params.size());
|
|
|
|
|
for (Object object : params) {
|
|
|
|
|
if (object instanceof ByteBuf) {
|
|
|
|
|
ByteBuf b = (ByteBuf) object;
|
|
|
|
@ -495,7 +495,11 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
result.add(object);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return result.toArray();
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected final Object[] copy(Object[] params) {
|
|
|
|
|
return copy(Arrays.asList(params)).toArray();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static final AtomicBoolean EVAL_SHA_RO_SUPPORTED = new AtomicBoolean(true);
|
|
|
|
@ -537,7 +541,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
|
|
|
|
|
CompletableFuture<R> mainPromise = new CompletableFuture<>();
|
|
|
|
|
|
|
|
|
|
Object[] pps = copy(params);
|
|
|
|
|
List<Object> keysCopy = copy(keys);
|
|
|
|
|
Object[] paramsCopy = copy(params);
|
|
|
|
|
|
|
|
|
|
CompletableFuture<R> promise = new CompletableFuture<>();
|
|
|
|
|
String sha1 = getServiceManager().calcSHA(mappedScript);
|
|
|
|
@ -563,13 +568,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
if (e != null) {
|
|
|
|
|
if (e.getMessage().startsWith("ERR unknown command")) {
|
|
|
|
|
EVAL_SHA_RO_SUPPORTED.set(false);
|
|
|
|
|
RFuture<R> future = evalAsync(nodeSource, readOnlyMode, codec, evalCommandType, mappedScript, keys, noRetry, pps);
|
|
|
|
|
RFuture<R> future = evalAsync(nodeSource, readOnlyMode, codec, evalCommandType, mappedScript, keysCopy, noRetry, paramsCopy);
|
|
|
|
|
transfer(future.toCompletableFuture(), mainPromise);
|
|
|
|
|
} else if (e.getMessage().startsWith("NOSCRIPT")) {
|
|
|
|
|
RFuture<String> loadFuture = loadScript(executor.getRedisClient(), mappedScript);
|
|
|
|
|
loadFuture.whenComplete((r, ex) -> {
|
|
|
|
|
if (ex != null) {
|
|
|
|
|
free(pps);
|
|
|
|
|
free(keysCopy);
|
|
|
|
|
free(paramsCopy);
|
|
|
|
|
mainPromise.completeExceptionally(ex);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -577,8 +583,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
List<Object> newargs = new ArrayList<Object>(2 + keys.size() + params.length);
|
|
|
|
|
newargs.add(sha1);
|
|
|
|
|
newargs.add(keys.size());
|
|
|
|
|
newargs.addAll(keys);
|
|
|
|
|
newargs.addAll(Arrays.asList(pps));
|
|
|
|
|
newargs.addAll(keysCopy);
|
|
|
|
|
newargs.addAll(Arrays.asList(paramsCopy));
|
|
|
|
|
|
|
|
|
|
NodeSource ns = nodeSource;
|
|
|
|
|
if (ns.getRedisClient() == null) {
|
|
|
|
@ -589,12 +595,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
transfer(future.toCompletableFuture(), mainPromise);
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
free(pps);
|
|
|
|
|
free(keysCopy);
|
|
|
|
|
free(paramsCopy);
|
|
|
|
|
mainPromise.completeExceptionally(e);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
free(pps);
|
|
|
|
|
free(keysCopy);
|
|
|
|
|
free(paramsCopy);
|
|
|
|
|
mainPromise.complete(res);
|
|
|
|
|
});
|
|
|
|
|
return new CompletableFutureWrapper<>(mainPromise);
|
|
|
|
@ -672,6 +680,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void free(List<Object> params) {
|
|
|
|
|
for (Object obj : params) {
|
|
|
|
|
ReferenceCountUtil.safeRelease(obj);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public <T, R> RFuture<R> readBatchedAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... keys) {
|
|
|
|
|
return executeBatchedAsync(true, codec, command, callback, keys);
|
|
|
|
|