Feature - eval() and evalSha() methods added to RScript object for execution on all nodes. #5946

pull/6051/head
Nikita Koksharov 8 months ago
parent 588d520d67
commit 6ad7fe7911

@ -26,6 +26,7 @@ import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -264,4 +265,79 @@ public class RedissonScript implements RScript {
return commandExecutor.get(evalAsync(key, mode, luaScript, returnType, keys, values));
}
@Override
public <R> R eval(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values) {
return commandExecutor.get(evalAsync(mode, luaScript, returnType, resultMapper, values));
}
@Override
public <R> RFuture<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values) {
List<Object> args = new ArrayList<>();
args.add(luaScript);
args.add(0);
for (Object object : values) {
args.add(commandExecutor.encode(codec, object));
}
List<CompletableFuture<R>> futures;
if (mode == Mode.READ_ONLY) {
futures = commandExecutor.readAllAsync(codec, returnType.getCommand(), args.toArray());
} else {
futures = commandExecutor.writeAllAsync(codec, returnType.getCommand(), args.toArray());
}
CompletableFuture<Void> r = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> res = r.thenApply(v -> {
List<R> l = futures.stream().map(f -> f.join()).collect(Collectors.toList());
return resultMapper.apply(l);
});
return new CompletableFutureWrapper<>(res);
}
@Override
public <R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values) {
return commandExecutor.get(evalShaAsync(mode, shaDigest, returnType, resultMapper, values));
}
@Override
public <R> RFuture<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values) {
List<Object> args = new ArrayList<>();
args.add(shaDigest);
args.add(0);
for (Object object : values) {
args.add(commandExecutor.encode(codec, object));
}
if (mode == Mode.READ_ONLY && commandExecutor.isEvalShaROSupported()) {
RedisCommand cmd = new RedisCommand(returnType.getCommand(), "EVALSHA_RO");
List<CompletableFuture<R>> futures = commandExecutor.readAllAsync(codec, cmd, args.toArray());
CompletableFuture<Void> r = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> rr = r.handle((res, e) -> {
if (e != null) {
if (e.getMessage().startsWith("ERR unknown command")) {
commandExecutor.setEvalShaROSupported(false);
return evalShaAsync(mode, shaDigest, returnType, resultMapper, values);
}
CompletableFuture<R> ex = new CompletableFuture<>();
ex.completeExceptionally(e);
return ex;
}
List<R> l = futures.stream().map(f -> f.join()).collect(Collectors.toList());
R result = resultMapper.apply(l);
return CompletableFuture.completedFuture(result);
}).thenCompose(ff -> ff);
return new CompletableFutureWrapper<>(rr);
}
List<CompletableFuture<R>> futures = commandExecutor.readAllAsync(codec, returnType.getCommand(), args.toArray());
CompletableFuture<Void> r = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<R> res = r.thenApply(v -> {
List<R> l = futures.stream().map(f -> f.join()).collect(Collectors.toList());
return resultMapper.apply(l);
});
return new CompletableFutureWrapper<>(res);
}
}

@ -18,10 +18,12 @@ package org.redisson.api;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
/**
* Interface for Redis Script feature
* API for Redis Lua scripts execution.
*
* @author Nikita Koksharov
*
@ -74,6 +76,21 @@ public interface RScript extends RScriptAsync {
*/
<R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script stored in Redis scripts cache by SHA-1 digest <code>shaDigest</code>.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> R evalSha(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script stored in Redis scripts cache by SHA-1 digest
*
@ -102,17 +119,32 @@ public interface RScript extends RScriptAsync {
/**
* Executes Lua script
*
* @param <R> - type of result
* @param key - used to locate Redis node in Cluster which stores cached Lua script
* @param key - used to locate Redis node in Cluster which stores cached Lua script
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> R eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> R eval(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script
*

@ -18,7 +18,9 @@ package org.redisson.api;
import org.redisson.api.RScript.Mode;
import org.redisson.api.RScript.ReturnType;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
/**
* Async interface for Redis Script feature
@ -47,6 +49,21 @@ public interface RScriptAsync {
* @return result object
*/
<R> RFuture<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script stored in Redis scripts cache by SHA-1 digest <code>shaDigest</code>.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> RFuture<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script stored in Redis scripts cache by SHA-1 digest
@ -86,6 +103,21 @@ public interface RScriptAsync {
*/
<R> RFuture<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> RFuture<R> evalAsync(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script
*

@ -19,7 +19,9 @@ import org.redisson.api.RScript.Mode;
import org.redisson.api.RScript.ReturnType;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
/**
* Reactive interface for Redis Script feature
@ -62,7 +64,7 @@ public interface RScriptReactive {
* @return result object
*/
<R> Mono<R> evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes Lua script stored in Redis scripts cache by SHA-1 digest
*
@ -74,6 +76,21 @@ public interface RScriptReactive {
*/
<R> Mono<R> evalSha(Mode mode, String shaDigest, ReturnType returnType);
/**
* Executes a Lua script stored in Redis scripts cache by SHA-1 digest <code>shaDigest</code>.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> Mono<R> evalSha(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script
*
@ -112,6 +129,21 @@ public interface RScriptReactive {
*/
<R> Mono<R> eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> Mono<R> eval(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Loads Lua script into Redis scripts cache and returns its SHA-1 digest
*

@ -15,7 +15,9 @@
*/
package org.redisson.api;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import org.redisson.api.RScript.Mode;
import org.redisson.api.RScript.ReturnType;
@ -65,7 +67,7 @@ public interface RScriptRx {
* @return result object
*/
<R> Maybe<R> evalSha(String key, Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes Lua script stored in Redis scripts cache by SHA-1 digest
*
@ -77,6 +79,21 @@ public interface RScriptRx {
*/
<R> Maybe<R> evalSha(Mode mode, String shaDigest, ReturnType returnType);
/**
* Executes a Lua script stored in Redis scripts cache by SHA-1 digest <code>shaDigest</code>.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param shaDigest - SHA-1 digest
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> Maybe<R> evalSha(Mode mode, String shaDigest, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Executes Lua script
*
@ -115,6 +132,21 @@ public interface RScriptRx {
*/
<R> Maybe<R> eval(String key, Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes a Lua script.
* The script is executed over all Redis master or slave nodes in cluster depending on <code>mode</code> value.
* <code>resultMapper</code> function reduces all results from Redis nodes into one.
*
* @param mode - execution mode
* @param luaScript - lua script
* @param returnType - return type
* @param resultMapper - function for reducing multiple results into one
* @param values - values available through ARGV param in script
* @return result object
* @param <R> - type of result
*/
<R> Maybe<R> eval(Mode mode, String luaScript, ReturnType returnType, Function<Collection<R>, R> resultMapper, Object... values);
/**
* Loads Lua script into Redis scripts cache and returns its SHA-1 digest
*

@ -89,6 +89,8 @@ public interface CommandAsyncExecutor {
<R> List<CompletableFuture<R>> writeAllAsync(RedisCommand<?> command, Object... params);
<R> List<CompletableFuture<R>> writeAllAsync(Codec codec, RedisCommand<?> command, Object... params);
<R> List<CompletableFuture<R>> readAllAsync(Codec codec, RedisCommand<?> command, Object... params);
<R> List<CompletableFuture<R>> readAllAsync(RedisCommand<?> command, Object... params);

@ -292,17 +292,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override
public <T> RFuture<Void> writeAllVoidAsync(RedisCommand<T> command, Object... params) {
List<CompletableFuture<Void>> futures = writeAllAsync(command, StringCodec.INSTANCE, params);
List<CompletableFuture<Void>> futures = writeAllAsync(StringCodec.INSTANCE, command, params);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public <R> List<CompletableFuture<R>> writeAllAsync(RedisCommand<?> command, Object... params) {
return writeAllAsync(command, codec, params);
return writeAllAsync(codec, command, params);
}
private <R> List<CompletableFuture<R>> writeAllAsync(RedisCommand<?> command, Codec codec, Object... params) {
@Override
public <R> List<CompletableFuture<R>> writeAllAsync(Codec codec, RedisCommand<?> command, Object... params) {
List<CompletableFuture<R>> futures = connectionManager.getEntrySet().stream().map(e -> {
RFuture<R> f = async(false, new NodeSource(e),
codec, command, params, true, false);
@ -506,7 +507,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
public void setEvalShaROSupported(boolean value) {
this.EVAL_SHA_RO_SUPPORTED.set(value);
EVAL_SHA_RO_SUPPORTED.set(value);
}
private static final Pattern COMMANDS_PATTERN = Pattern.compile("redis\\.call\\(['\"]{1}([\\w.]+)['\"]{1}");

@ -102,7 +102,17 @@ public class RedissonScriptTest extends RedisDockerTest {
RFuture<List<Object>> res = script.evalAsync(RScript.Mode.READ_ONLY, "return {'1','2','3.3333','foo',nil,'bar'}", RScript.ReturnType.MULTI, Collections.emptyList());
assertThat(res.toCompletableFuture().join()).containsExactly("1", "2", "3.3333", "foo");
}
@Test
public void testEvalResultMapping() {
testInCluster(redissonClient -> {
RScript script = redissonClient.getScript(StringCodec.INSTANCE);
Long res = script.eval(RScript.Mode.READ_ONLY, "return 1;", RScript.ReturnType.INTEGER,
integers -> integers.stream().mapToLong(r -> r).sum());
assertThat(res).isEqualTo(3);
});
}
@Test
public void testScriptEncoding() {
RScript script = redisson.getScript();

Loading…
Cancel
Save