Feature - RFunction object added. #4117

pull/4226/head
Nikita Koksharov 3 years ago
parent 884a6f30b1
commit 666faba3ba

@ -381,6 +381,16 @@ public class Redisson implements RedissonClient {
return new RedissonSet<V>(codec, commandExecutor, name, this);
}
@Override
public RFunction getFunction() {
return new RedissonFuction(commandExecutor);
}
@Override
public RFunction getFunction(Codec codec) {
return new RedissonFuction(commandExecutor, codec);
}
@Override
public RScript getScript() {
return new RedissonScript(commandExecutor);

@ -0,0 +1,238 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.redisson.api.FunctionLibrary;
import org.redisson.api.FunctionStats;
import org.redisson.api.RFunction;
import org.redisson.api.RFuture;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import java.util.*;
import java.util.concurrent.CompletableFuture;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonFuction implements RFunction {
private final Codec codec;
private final CommandAsyncExecutor commandExecutor;
public RedissonFuction(CommandAsyncExecutor commandExecutor) {
this.commandExecutor = commandExecutor;
this.codec = commandExecutor.getConnectionManager().getCodec();
}
public RedissonFuction(CommandAsyncExecutor commandExecutor, Codec codec) {
this.commandExecutor = commandExecutor;
this.codec = codec;
}
@Override
public void delete(String libraryName) {
commandExecutor.get(deleteAsync(libraryName));
}
@Override
public RFuture<Void> deleteAsync(String libraryName) {
List<CompletableFuture<Void>> futures = commandExecutor.executeMasters(RedisCommands.FUNCTION_DELETE, libraryName);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public byte[] dump() {
return commandExecutor.get(dumpAsync());
}
@Override
public RFuture<byte[]> dumpAsync() {
return commandExecutor.readAsync((String) null, ByteArrayCodec.INSTANCE, RedisCommands.FUNCTION_DUMP);
}
@Override
public void flush() {
commandExecutor.get(flushAsync());
}
@Override
public RFuture<Void> flushAsync() {
List<CompletableFuture<Void>> futures = commandExecutor.executeMasters(RedisCommands.FUNCTION_FLUSH);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public void kill() {
commandExecutor.get(killAsync());
}
@Override
public RFuture<Void> killAsync() {
List<CompletableFuture<Void>> futures = commandExecutor.executeAll(RedisCommands.FUNCTION_KILL);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public List<FunctionLibrary> list() {
return commandExecutor.get(listAsync());
}
@Override
public RFuture<List<FunctionLibrary>> listAsync() {
return commandExecutor.readAsync((String) null, ByteArrayCodec.INSTANCE, RedisCommands.FUNCTION_LIST);
}
@Override
public List<FunctionLibrary> list(String namePattern) {
return commandExecutor.get(listAsync(namePattern));
}
@Override
public RFuture<List<FunctionLibrary>> listAsync(String namePattern) {
return commandExecutor.readAsync((String) null, ByteArrayCodec.INSTANCE, RedisCommands.FUNCTION_LIST, "LIBRARYNAME", namePattern);
}
@Override
public void load(String libraryName, String code) {
commandExecutor.get(loadAsync(libraryName, code));
}
@Override
public RFuture<Void> loadAsync(String libraryName, String code) {
List<CompletableFuture<Void>> futures = commandExecutor.executeMasters(RedisCommands.FUNCTION_LOAD, "Lua", libraryName, code);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public void loadAndReplace(String libraryName, String code) {
commandExecutor.get(loadAndReplaceAsync(libraryName, code));
}
@Override
public RFuture<Void> loadAndReplaceAsync(String libraryName, String code) {
List<CompletableFuture<Void>> futures = commandExecutor.executeMasters(RedisCommands.FUNCTION_LOAD,
"Lua", libraryName, "REPLACE", code);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public void restore(byte[] payload) {
commandExecutor.get(restoreAsync(payload));
}
@Override
public RFuture<Void> restoreAsync(byte[] payload) {
List<CompletableFuture<Void>> futures = commandExecutor.executeMasters(RedisCommands.FUNCTION_RESTORE, (Object) payload);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public void restoreAndReplace(byte[] payload) {
commandExecutor.get(restoreAndReplaceAsync(payload));
}
@Override
public RFuture<Void> restoreAndReplaceAsync(byte[] payload) {
List<CompletableFuture<Void>> futures = commandExecutor.executeMasters(RedisCommands.FUNCTION_RESTORE, payload, "REPLACE");
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
@Override
public void restoreAfterFlush(byte[] payload) {
commandExecutor.get(restoreAfterFlushAsync(payload));
}
@Override
public RFuture<Void> restoreAfterFlushAsync(byte[] payload) {
List<CompletableFuture<Void>> futures = commandExecutor.executeMasters(RedisCommands.FUNCTION_RESTORE, payload, "FLUSH");
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return new CompletableFutureWrapper<>(f);
}
private List<Object> encode(Collection<?> values, Codec codec) {
List<Object> result = new ArrayList<Object>(values.size());
for (Object object : values) {
result.add(commandExecutor.encode(codec, object));
}
return result;
}
@Override
public <R> R call(String key, Mode mode, String name, ReturnType returnType, List<Object> keys, Object... values) {
return commandExecutor.get(callAsync(key, mode, name, returnType, keys, values));
}
@Override
public <R> R call(Mode mode, String name, ReturnType returnType, List<Object> keys, Object... values) {
return commandExecutor.get(callAsync(mode, name, returnType, keys, values));
}
@Override
public <R> R call(Mode mode, String name, ReturnType returnType) {
return commandExecutor.get(callAsync(mode, name, returnType));
}
@Override
public FunctionStats stats() {
return commandExecutor.get(statsAsync());
}
@Override
public RFuture<FunctionStats> statsAsync() {
return commandExecutor.readAsync((String) null, StringCodec.INSTANCE, RedisCommands.FUNCTION_STATS);
}
@Override
public <R> RFuture<R> callAsync(String key, Mode mode, String name, ReturnType returnType, List<Object> keys, Object... values) {
List<Object> args = new ArrayList<>();
args.add(name);
args.add(keys.size());
if (keys.size() > 0) {
args.add(keys);
}
args.addAll(encode(Arrays.asList(values), codec));
if (mode == Mode.READ) {
return commandExecutor.readAsync(key, codec, returnType.getCommand(), args.toArray());
}
return commandExecutor.writeAsync(key, codec, returnType.getCommand(), args.toArray());
}
@Override
public <R> RFuture<R> callAsync(Mode mode, String name, ReturnType returnType, List<Object> keys, Object... values) {
return callAsync(null, mode, name, returnType, keys, values);
}
@Override
public <R> RFuture<R> callAsync(Mode mode, String name, ReturnType returnType) {
return callAsync(mode, name, returnType, Collections.emptyList());
}
}

@ -497,6 +497,16 @@ public class RedissonReactive implements RedissonReactiveClient {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(commandExecutor, name), RBitSetReactive.class);
}
@Override
public RFunctionReactive getFunction() {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionReactive.class);
}
@Override
public RFunctionReactive getFunction(Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor, codec), RFunctionReactive.class);
}
@Override
public RScriptReactive getScript() {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonScript(commandExecutor), RScriptReactive.class);

@ -478,6 +478,16 @@ public class RedissonRx implements RedissonRxClient {
return RxProxyBuilder.create(commandExecutor, new RedissonBitSet(commandExecutor, name), RBitSetRx.class);
}
@Override
public RFunctionRx getFunction() {
return RxProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor), RFunctionRx.class);
}
@Override
public RFunctionRx getFunction(Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonFuction(commandExecutor, codec), RFunctionRx.class);
}
@Override
public RScriptRx getScript() {
return RxProxyBuilder.create(commandExecutor, new RedissonScript(commandExecutor), RScriptRx.class);

@ -0,0 +1,89 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.List;
/**
* Encapsulates information about Redis functions library.
*
* @author Nikita Koksharov
*
*/
public class FunctionLibrary {
public enum Flag {NO_WRITES, ALLOW_OOM, ALLOW_STALE, NO_CLUSTER}
public static class Function {
private final String name;
private final String description;
private final List<Flag> flags;
public Function(String name, String description, List<Flag> flags) {
this.name = name;
this.description = description;
this.flags = flags;
}
public List<Flag> getFlags() {
return flags;
}
public String getDescription() {
return description;
}
public String getName() {
return name;
}
}
private final String name;
private final String engine;
private final String description;
private final String code;
private final List<Function> functions;
public FunctionLibrary(String name, String engine, String description, String code, List<Function> functions) {
this.name = name;
this.engine = engine;
this.description = description;
this.code = code;
this.functions = functions;
}
public String getName() {
return name;
}
public String getEngine() {
return engine;
}
public String getDescription() {
return description;
}
public String getCode() {
return code;
}
public List<Function> getFunctions() {
return functions;
}
}

@ -0,0 +1,125 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.time.Duration;
import java.util.List;
import java.util.Map;
/**
* Encapsulates information about currently running
* Redis function and available execution engines.
*
* @author Nikita Koksharov
*
*/
public class FunctionStats {
public static class Engine {
private final Long libraries;
private final Long functions;
public Engine(Long libraries, Long functions) {
this.libraries = libraries;
this.functions = functions;
}
/**
* Returns libraries amount
*
* @return libraries amount
*/
public Long getLibraries() {
return libraries;
}
/**
* Returns functions amount
*
* @return functions amount
*/
public Long getFunctions() {
return functions;
}
}
public static class RunningFunction {
private final String name;
private final List<Object> command;
private final Duration duration;
public RunningFunction(String name, List<Object> command, Duration duration) {
this.name = name;
this.command = command;
this.duration = duration;
}
/**
* Returns name of running function
*
* @return name
*/
public String getName() {
return name;
}
/**
* Returns arguments of running function
*
* @return arguments
*/
public List<Object> getCommand() {
return command;
}
/**
* Returns runtime duration of running function
*
* @return runtime duration
*/
public Duration getDuration() {
return duration;
}
}
private final RunningFunction runningFunction;
private final Map<String, Engine> engines;
public FunctionStats(RunningFunction runningFunction, Map<String, Engine> engines) {
this.runningFunction = runningFunction;
this.engines = engines;
}
/**
* Returns currently running fuction otherwise {@code null}
*
* @return running function
*/
public RunningFunction getRunningFunction() {
return runningFunction;
}
/**
* Returns engine objects mapped by function engine name
*
* @return engine objects
*/
public Map<String, Engine> getEngines() {
return engines;
}
}

@ -0,0 +1,227 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import java.util.List;
/**
* Interface for Redis Function feature
*
* @author Nikita Koksharov
*
*/
public interface RFunction extends RFunctionAsync {
enum Mode {
/**
* Execute script as read operation
*/
READ,
/**
* Execute function as read operation
*/
WRITE
}
enum ReturnType {
/**
* Result is a value of Boolean type
*/
BOOLEAN(RedisCommands.FCALL_BOOLEAN_SAFE),
/**
* Result is a value of Long type
*/
LONG(RedisCommands.FCALL_LONG),
/**
* Result is a value of List type
*/
LIST(RedisCommands.FCALL_LIST),
/**
* Result is a value of plain String type
*/
STRING(RedisCommands.FCALL_STRING),
/**
* Result is a value of Object type
*/
VALUE(RedisCommands.FCALL_OBJECT),
/**
* Result is a value of Map Value type. Codec.getMapValueDecoder() and Codec.getMapValueEncoder()
* methods are used for data deserialization or serialization.
*/
MAPVALUE(RedisCommands.FCALL_MAP_VALUE),
/**
* Result is a value of List type, which consists of objects of Map Value type.
* Codec.getMapValueDecoder() and Codec.getMapValueEncoder()
* methods are used for data deserialization or serialization.
*/
MAPVALUELIST(RedisCommands.FCALL_MAP_VALUE_LIST);
private final RedisCommand<?> command;
ReturnType(RedisCommand<?> command) {
this.command = command;
}
public RedisCommand<?> getCommand() {
return command;
}
};
/**
* Deletes library. Error is thrown if library doesn't exist.
*
* @param libraryName library name
*/
void delete(String libraryName);
/**
* Returns serialized state of all libraries.
*
* @return serialized state
*/
byte[] dump();
/**
* Deletes all libraries.
*
*/
void flush();
/**
* Kills currently executed functions.
* Applied only to functions which don't modify data.
*
*/
void kill();
/**
* Returns information about libraries and functions per each.
*
* @return list of libraries
*/
List<FunctionLibrary> list();
/**
* Returns information about libraries and functions per each by name pattern.
* <p>
* Supported glob-style patterns:
* h?llo matches hello, hallo and hxllo
* h*llo matches hllo and heeeello
* h[ae]llo matches hello and hallo, but not hillo
*
* @param namePattern name pattern
* @return list of libraries
*/
List<FunctionLibrary> list(String namePattern);
/**
* Loads a library. Error is thrown if library already exists.
*
* @param libraryName library name
* @param code function code
*/
void load(String libraryName, String code);
/**
* Loads a library and overwrites existing library.
*
* @param libraryName library name
* @param code function code
*/
void loadAndReplace(String libraryName, String code);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Restored libraries are appended to the existing libraries and throws error in case of collision.
*
* @param payload serialized state
*/
void restore(byte[] payload);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Restored libraries are appended to the existing libraries.
*
* @param payload serialized state
*/
void restoreAndReplace(byte[] payload);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Deletes all existing libraries before restoring.
*
* @param payload serialized state
*/
void restoreAfterFlush(byte[] payload);
/**
* Returns information about currently running
* Redis function and available execution engines.
*
* @return function information
*/
FunctionStats stats();
/**
* Executes function
*
* @param <R> - type of result
* @param key - used to locate Redis node in Cluster
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @return result object
*/
<R> R call(String key, Mode mode, String name, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes function
*
* @param <R> - type of result
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @return result object
*/
<R> R call(Mode mode, String name, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes function
*
* @param <R> - type of result
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @return result object
*/
<R> R call(Mode mode, String name, ReturnType returnType);
}

@ -0,0 +1,164 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.api.RFunction.Mode;
import org.redisson.api.RFunction.ReturnType;
import java.util.List;
/**
* Interface for Redis Function feature
*
* @author Nikita Koksharov
*
*/
public interface RFunctionAsync {
/**
* Deletes library. Error is thrown if library doesn't exist.
*
* @param libraryName library name
*/
RFuture<Void> deleteAsync(String libraryName);
/**
* Returns serialized state of all libraries.
*
* @return serialized state
*/
RFuture<byte[]> dumpAsync();
/**
* Deletes all libraries.
*
*/
RFuture<Void> flushAsync();
/**
* Kills currently executed functions.
* Applied only to functions which don't modify data.
*
*/
RFuture<Void> killAsync();
/**
* Returns information about libraries and functions per each.
*
* @return list of libraries
*/
RFuture<List<FunctionLibrary>> listAsync();
/**
* Returns information about libraries and functions per each by name pattern.
* <p>
* Supported glob-style patterns:
* h?llo matches hello, hallo and hxllo
* h*llo matches hllo and heeeello
* h[ae]llo matches hello and hallo, but not hillo
*
* @param namePattern name pattern
* @return list of libraries
*/
RFuture<List<FunctionLibrary>> listAsync(String namePattern);
/**
* Loads a library. Error is thrown if library already exists.
*
* @param libraryName library name
* @param code function code
*/
RFuture<Void> loadAsync(String libraryName, String code);
/**
* Loads a library and overwrites existing library.
*
* @param libraryName library name
* @param code function code
*/
RFuture<Void> loadAndReplaceAsync(String libraryName, String code);
/**
* Restores libraries using their state returned by {@link #dumpAsync()} method.
* Restored libraries are appended to the existing libraries and throws error in case of collision.
*
* @param payload serialized state
*/
RFuture<Void> restoreAsync(byte[] payload);
/**
* Restores libraries using their state returned by {@link #dumpAsync()} method.
* Restored libraries are appended to the existing libraries.
*
* @param payload serialized state
*/
RFuture<Void> restoreAndReplaceAsync(byte[] payload);
/**
* Restores libraries using their state returned by {@link #dumpAsync()} method.
* Deletes all existing libraries before restoring.
*
* @param payload serialized state
*/
RFuture<Void> restoreAfterFlushAsync(byte[] payload);
/**
* Returns information about currently running
* Redis function and available execution engines.
*
* @return function information
*/
RFuture<FunctionStats> statsAsync();
/**
* Executes function
*
* @param <R> - type of result
* @param key - used to locate Redis node in Cluster which stores cached Lua script
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @return result object
*/
<R> RFuture<R> callAsync(String key, Mode mode, String name, ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes function
*
* @param <R> - type of result
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @return result object
*/
<R> RFuture<R> callAsync(Mode mode, String name, RFunction.ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes function
*
* @param <R> - type of result
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @return result object
*/
<R> RFuture<R> callAsync(Mode mode, String name, ReturnType returnType);
}

@ -0,0 +1,163 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import reactor.core.publisher.Mono;
import java.util.List;
/**
* Interface for Redis Function feature
*
* @author Nikita Koksharov
*
*/
public interface RFunctionReactive {
/**
* Deletes library. Error is thrown if library doesn't exist.
*
* @param libraryName library name
*/
Mono<Void> delete(String libraryName);
/**
* Returns serialized state of all libraries.
*
* @return serialized state
*/
Mono<byte[]> dump();
/**
* Deletes all libraries.
*
*/
Mono<Void> flush();
/**
* Kills currently executed functions.
* Applied only to functions which don't modify data.
*
*/
Mono<Void> kill();
/**
* Returns information about libraries and functions per each.
*
* @return list of libraries
*/
Mono<List<FunctionLibrary>> list();
/**
* Returns information about libraries and functions per each by name pattern.
* <p>
* Supported glob-style patterns:
* h?llo matches hello, hallo and hxllo
* h*llo matches hllo and heeeello
* h[ae]llo matches hello and hallo, but not hillo
*
* @param namePattern name pattern
* @return list of libraries
*/
Mono<List<FunctionLibrary>> list(String namePattern);
/**
* Loads a library. Error is thrown if library already exists.
*
* @param libraryName library name
* @param code function code
*/
Mono<Void> load(String libraryName, String code);
/**
* Loads a library and overwrites existing library.
*
* @param libraryName library name
* @param code function code
*/
Mono<Void> loadAndReplace(String libraryName, String code);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Restored libraries are appended to the existing libraries and throws error in case of collision.
*
* @param payload serialized state
*/
Mono<Void> restore(byte[] payload);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Restored libraries are appended to the existing libraries.
*
* @param payload serialized state
*/
Mono<Void> restoreAndReplace(byte[] payload);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Deletes all existing libraries before restoring.
*
* @param payload serialized state
*/
Mono<Void> restoreAfterFlush(byte[] payload);
/**
* Returns information about currently running
* Redis function and available execution engines.
*
* @return function information
*/
Mono<FunctionStats> stats();
/**
* Executes function
*
* @param <R> - type of result
* @param key - used to locate Redis node in Cluster
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @return result object
*/
<R> Mono<R> call(String key, RFunction.Mode mode, String name, RFunction.ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes function
*
* @param <R> - type of result
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @return result object
*/
<R> Mono<R> call(RFunction.Mode mode, String name, RFunction.ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes function
*
* @param <R> - type of result
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @return result object
*/
<R> Mono<R> call(RFunction.Mode mode, String name, RFunction.ReturnType returnType);
}

@ -0,0 +1,165 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.util.List;
/**
* Interface for Redis Function feature
*
* @author Nikita Koksharov
*
*/
public interface RFunctionRx {
/**
* Deletes library. Error is thrown if library doesn't exist.
*
* @param libraryName library name
*/
Completable delete(String libraryName);
/**
* Returns serialized state of all libraries.
*
* @return serialized state
*/
Single<byte[]> dump();
/**
* Deletes all libraries.
*
*/
Completable flush();
/**
* Kills currently executed functions.
* Applied only to functions which don't modify data.
*
*/
Completable kill();
/**
* Returns information about libraries and functions per each.
*
* @return list of libraries
*/
Single<List<FunctionLibrary>> list();
/**
* Returns information about libraries and functions per each by name pattern.
* <p>
* Supported glob-style patterns:
* h?llo matches hello, hallo and hxllo
* h*llo matches hllo and heeeello
* h[ae]llo matches hello and hallo, but not hillo
*
* @param namePattern name pattern
* @return list of libraries
*/
Single<List<FunctionLibrary>> list(String namePattern);
/**
* Loads a library. Error is thrown if library already exists.
*
* @param libraryName library name
* @param code function code
*/
Completable load(String libraryName, String code);
/**
* Loads a library and overwrites existing library.
*
* @param libraryName library name
* @param code function code
*/
Completable loadAndReplace(String libraryName, String code);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Restored libraries are appended to the existing libraries and throws error in case of collision.
*
* @param payload serialized state
*/
Completable restore(byte[] payload);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Restored libraries are appended to the existing libraries.
*
* @param payload serialized state
*/
Completable restoreAndReplace(byte[] payload);
/**
* Restores libraries using their state returned by {@link #dump()} method.
* Deletes all existing libraries before restoring.
*
* @param payload serialized state
*/
Completable restoreAfterFlush(byte[] payload);
/**
* Returns information about currently running
* Redis function and available execution engines.
*
* @return function information
*/
Single<FunctionStats> stats();
/**
* Executes function
*
* @param <R> - type of result
* @param key - used to locate Redis node in Cluster
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @return result object
*/
<R> Maybe<R> call(String key, RFunction.Mode mode, String name, RFunction.ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes function
*
* @param <R> - type of result
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @param keys - keys available through KEYS param in script
* @param values - values available through VALUES param in script
* @return result object
*/
<R> Maybe<R> call(RFunction.Mode mode, String name, RFunction.ReturnType returnType, List<Object> keys, Object... values);
/**
* Executes function
*
* @param <R> - type of result
* @param mode - execution mode
* @param name - function name
* @param returnType - return type
* @return result object
*/
<R> Maybe<R> call(RFunction.Mode mode, String name, RFunction.ReturnType returnType);
}

@ -1004,6 +1004,21 @@ public interface RedissonClient {
*/
RIdGenerator getIdGenerator(String name);
/**
* Returns interface for Redis Function feature
*
* @return function object
*/
RFunction getFunction();
/**
* Returns interface for Redis Function feature using provided codec
*
* @param codec - codec for params and result
* @return function interface
*/
RFunction getFunction(Codec codec);
/**
* Returns script operations object
*

@ -830,6 +830,21 @@ public interface RedissonReactiveClient {
*/
RBitSetReactive getBitSet(String name);
/**
* Returns interface for Redis Function feature
*
* @return function object
*/
RFunctionReactive getFunction();
/**
* Returns interface for Redis Function feature using provided codec
*
* @param codec - codec for params and result
* @return function interface
*/
RFunctionReactive getFunction(Codec codec);
/**
* Returns script operations object
*

@ -820,6 +820,21 @@ public interface RedissonRxClient {
*/
RBitSetRx getBitSet(String name);
/**
* Returns interface for Redis Function feature
*
* @return function object
*/
RFunctionRx getFunction();
/**
* Returns interface for Redis Function feature using provided codec
*
* @param codec - codec for params and result
* @return function interface
*/
RFunctionRx getFunction(Codec codec);
/**
* Returns script operations object
*

@ -15,10 +15,7 @@
*/
package org.redisson.client.protocol;
import org.redisson.api.FastAutoClaimResult;
import org.redisson.api.RType;
import org.redisson.api.StreamInfo;
import org.redisson.api.StreamMessageId;
import org.redisson.api.*;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.StringCodec;
@ -29,8 +26,10 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.misc.RedisURI;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
*
@ -290,6 +289,84 @@ public interface RedisCommands {
RedisCommand<Boolean> RPUSH_BOOLEAN = new RedisCommand<Boolean>("RPUSH", new TrueReplayConvertor());
RedisCommand<Void> RPUSH_VOID = new RedisCommand<Void>("RPUSH", new VoidReplayConvertor());
RedisStrictCommand<Void> FUNCTION_DELETE = new RedisStrictCommand<>("FUNCTION", "DELETE", new VoidReplayConvertor());
RedisStrictCommand<Void> FUNCTION_FLUSH = new RedisStrictCommand<>("FUNCTION", "FLUSH", new VoidReplayConvertor());
RedisStrictCommand<Void> FUNCTION_KILL = new RedisStrictCommand<>("FUNCTION", "KILL", new VoidReplayConvertor());
RedisStrictCommand<Void> FUNCTION_RESTORE = new RedisStrictCommand<>("FUNCTION", "RESTORE", new VoidReplayConvertor());
RedisStrictCommand<Void> FUNCTION_LOAD = new RedisStrictCommand<>("FUNCTION", "LOAD", new VoidReplayConvertor());
RedisStrictCommand<Object> FUNCTION_DUMP = new RedisStrictCommand<>("FUNCTION", "DUMP");
RedisStrictCommand<Object> FUNCTION_STATS = new RedisStrictCommand<>("FUNCTION", "STATS",
new ListMultiDecoder2(
new CodecDecoder() {
@Override
public Object decode(List<Object> parts, State state) {
FunctionStats.RunningFunction runningFunction = (FunctionStats.RunningFunction) parts.get(1);
Map<String, FunctionStats.Engine> engines = (Map<String, FunctionStats.Engine>) parts.get(3);
return new FunctionStats(runningFunction, engines);
}
},
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) {
@Override
public Object decode(List parts, State state) {
if (parts.size() == 2) {
Map<String, FunctionStats.Engine> result = new HashMap<>();
List<Object> objects = (List<Object>) parts.get(1);
Long libraries = (Long) objects.get(1);
Long functions = (Long) objects.get(3);
String engine = (String) parts.get(0);
result.put(engine, new FunctionStats.Engine(libraries, functions));
return result;
}
String name = (String) parts.get(1);
List<Object> command = (List<Object>) parts.get(3);
Long duration = (Long) parts.get(5);
return new FunctionStats.RunningFunction(name, command, Duration.ofMillis(duration));
}
},
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder())));
RedisStrictCommand<Object> FUNCTION_LIST = new RedisStrictCommand<>("FUNCTION", "LIST",
new ListMultiDecoder2(
new CodecDecoder(),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) {
@Override
public Object decode(List parts, State state) {
String name = (String) parts.get(1);
String engine = (String) parts.get(3);
String description = (String) parts.get(5);
String code = null;
if (parts.size() > 8) {
code = (String) parts.get(9);
}
List<FunctionLibrary.Function> functions = (List<FunctionLibrary.Function>) parts.get(7);
return new FunctionLibrary(name, engine, description, code, functions);
}
},
new CodecDecoder(),
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) {
@Override
public Object decode(List parts, State state) {
String functionName = (String) parts.get(1);
String functionDescription = (String) parts.get(3);
List<FunctionLibrary.Flag> functionFlags = ((List<String>) parts.get(5)).stream()
.map(s -> FunctionLibrary.Flag.valueOf(s.toUpperCase().replace("-", "_")))
.collect(Collectors.toList());
return new FunctionLibrary.Function(functionName, functionDescription, functionFlags);
}
},
new CodecDecoder()));
RedisStrictCommand<Boolean> FCALL_BOOLEAN_SAFE = new RedisStrictCommand<Boolean>("FCALL", new BooleanNullSafeReplayConvertor());
RedisStrictCommand<Long> FCALL_LONG = new RedisStrictCommand<Long>("FCALL");
RedisCommand<List<Object>> FCALL_LIST = new RedisCommand<List<Object>>("FCALL", new ObjectListReplayDecoder<Object>());
RedisStrictCommand<String> FCALL_STRING = new RedisStrictCommand("FCALL",
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()));
RedisCommand<Object> FCALL_OBJECT = new RedisCommand<Object>("FCALL");
RedisCommand<Object> FCALL_MAP_VALUE = new RedisCommand<Object>("FCALL", new MapValueDecoder());
RedisCommand<List<Object>> FCALL_MAP_VALUE_LIST = new RedisCommand<List<Object>>("FCALL",
new MapValueDecoder(new ObjectListReplayDecoder<>()));
RedisStrictCommand<String> SCRIPT_LOAD = new RedisStrictCommand<String>("SCRIPT", "LOAD", new ObjectDecoder(new StringDataDecoder()));
RedisStrictCommand<Boolean> SCRIPT_KILL = new RedisStrictCommand<Boolean>("SCRIPT", "KILL", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> SCRIPT_FLUSH = new RedisStrictCommand<Boolean>("SCRIPT", "FLUSH", new BooleanReplayConvertor());
@ -300,7 +377,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> EVAL_BOOLEAN_SAFE = new RedisStrictCommand<Boolean>("EVAL", new BooleanNullSafeReplayConvertor());
RedisStrictCommand<Boolean> EVAL_NULL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanNullReplayConvertor());
RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand("EVAL",
new ObjectDecoder(new StringReplayDecoder()));
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()));
RedisStrictCommand<String> EVAL_PERMIT_DATA = new RedisStrictCommand("EVAL",
new ObjectDecoder(new PermitDecoder()));
RedisStrictCommand<Integer> EVAL_INTEGER = new RedisStrictCommand<Integer>("EVAL", new IntegerReplayConvertor());

@ -73,6 +73,8 @@ public interface CommandAsyncExecutor {
<R> List<CompletableFuture<R>> executeAll(RedisCommand<?> command, Object... params);
<R> List<CompletableFuture<R>> executeMasters(RedisCommand<?> command, Object... params);
<R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);
<T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params);

@ -260,6 +260,16 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return writeAllAsync(command, null, params);
}
@Override
public <R> List<CompletableFuture<R>> executeMasters(RedisCommand<?> command, Object... params) {
List<CompletableFuture<R>> futures = connectionManager.getEntrySet().stream().map(e -> {
RFuture<R> f = async(false, new NodeSource(e),
connectionManager.getCodec(), command, params, true, false);
return f.toCompletableFuture();
}).collect(Collectors.toList());
return futures;
}
@Override
public <R> List<CompletableFuture<R>> executeAll(RedisCommand<?> command, Object... params) {
Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
@ -271,7 +281,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
e.getAllEntries().stream().filter(c -> c.getNodeType() == NodeType.SLAVE).forEach(c -> {
RFuture<R> slavePromise = async(true, new NodeSource(e, c.getClient()),
connectionManager.getCodec(), RedisCommands.SCRIPT_LOAD, params, true, false);
connectionManager.getCodec(), command, params, true, false);
futures.add(slavePromise.toCompletableFuture());
});
});

@ -0,0 +1,99 @@
package org.redisson;
import org.junit.jupiter.api.Test;
import org.redisson.api.FunctionLibrary;
import org.redisson.api.FunctionStats;
import org.redisson.api.RFunction;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonFunctionTest extends BaseTest {
@Test
public void testEmpty() {
RFunction f = redisson.getFunction();
f.flush();
assertThat(f.dump()).isNotEmpty();
assertThat(f.list()).isEmpty();
assertThat(f.list("test")).isEmpty();
}
@Test
public void testStats() {
RFunction f = redisson.getFunction();
f.flush();
f.load("lib", "redis.register_function('myfun', function(keys, args) for i = 1, 8829381983, 1 do end return args[1] end)" +
"redis.register_function('myfun2', function(keys, args) return 'test' end)" +
"redis.register_function('myfun3', function(keys, args) return 123 end)");
f.callAsync(RFunction.Mode.READ, "myfun", RFunction.ReturnType.VALUE, Collections.emptyList(), "test");
FunctionStats stats = f.stats();
FunctionStats.RunningFunction func = stats.getRunningFunction();
assertThat(func.getName()).isEqualTo("myfun");
FunctionStats.Engine engine = stats.getEngines().get("LUA");
assertThat(engine.getLibraries()).isEqualTo(1);
assertThat(engine.getFunctions()).isEqualTo(3);
f.kill();
FunctionStats stats2 = f.stats();
assertThat(stats2.getRunningFunction()).isNull();
}
@Test
public void testCall() {
RFunction f = redisson.getFunction();
f.flush();
f.load("lib", "redis.register_function('myfun', function(keys, args) return args[1] end)" +
"redis.register_function('myfun2', function(keys, args) return 'test' end)" +
"redis.register_function('myfun3', function(keys, args) return 123 end)");
String s = f.call(RFunction.Mode.READ, "myfun", RFunction.ReturnType.VALUE, Collections.emptyList(), "test");
assertThat(s).isEqualTo("test");
RFunction f2 = redisson.getFunction(StringCodec.INSTANCE);
String s2 = f2.call(RFunction.Mode.READ, "myfun2", RFunction.ReturnType.STRING, Collections.emptyList());
assertThat(s2).isEqualTo("test");
RFunction f3 = redisson.getFunction(LongCodec.INSTANCE);
Long s3 = f3.call(RFunction.Mode.READ, "myfun3", RFunction.ReturnType.LONG, Collections.emptyList());
assertThat(s3).isEqualTo(123L);
}
@Test
public void testList() {
RFunction f = redisson.getFunction();
f.flush();
f.load("lib", "redis.register_function('myfun', function(keys, args) return args[1] end)" +
"redis.register_function{function_name='myfun2', callback=function(keys, args) return args[1] end, flags={ 'no-writes' }}");
List<FunctionLibrary> data = f.list();
FunctionLibrary fl = data.get(0);
assertThat(fl.getName()).isEqualTo("lib");
FunctionLibrary.Function f2 = fl.getFunctions().stream().filter(e -> e.getName().equals("myfun2")).findFirst().get();
assertThat(f2.getFlags()).containsExactly(FunctionLibrary.Flag.NO_WRITES);
}
@Test
public void testListPattern() {
RFunction f = redisson.getFunction();
f.flush();
f.load("alib", "redis.register_function('myfun', function(keys, args) return args[1] end)");
f.load("lib2", "redis.register_function('myfun2', function(keys, args) return args[1] end)");
List<FunctionLibrary> data = f.list("ali*");
FunctionLibrary fl = data.get(0);
assertThat(data).hasSize(1);
assertThat(fl.getName()).isEqualTo("alib");
List<FunctionLibrary> data1 = f.list("ali2*");
assertThat(data1).isEmpty();
}
}
Loading…
Cancel
Save