diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index 4c5547e23..62eceee03 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -41,6 +41,7 @@ import org.redisson.core.RMapReactive; import org.redisson.core.RPatternTopicReactive; import org.redisson.core.RQueueReactive; import org.redisson.core.RScoredSortedSetReactive; +import org.redisson.core.RScriptReactive; import org.redisson.core.RSetReactive; import org.redisson.core.RTopicReactive; @@ -226,6 +227,11 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonBitSetReactive(commandExecutor, name); } + @Override + public RScriptReactive getScript() { + return new RedissonScriptReactive(commandExecutor); + } + public Config getConfig() { return config; } diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 76a6e8d92..9674e5b06 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -32,6 +32,7 @@ import org.redisson.core.RMapReactive; import org.redisson.core.RPatternTopicReactive; import org.redisson.core.RQueueReactive; import org.redisson.core.RScoredSortedSetReactive; +import org.redisson.core.RScriptReactive; import org.redisson.core.RSetReactive; import org.redisson.core.RTopicReactive; @@ -177,13 +178,13 @@ public interface RedissonReactiveClient { RBitSetReactive getBitSet(String name); -// /** -// * Returns script operations object -// * -// * @return -// */ -// RScript getScript(); -// + /** + * Returns script operations object + * + * @return + */ + RScriptReactive getScript(); + // /** // * Return batch object which executes group of // * command in pipeline. diff --git a/src/main/java/org/redisson/RedissonScriptReactive.java b/src/main/java/org/redisson/RedissonScriptReactive.java new file mode 100644 index 000000000..c1003fbcd --- /dev/null +++ b/src/main/java/org/redisson/RedissonScriptReactive.java @@ -0,0 +1,148 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.core.RScript; +import org.redisson.core.RScriptReactive; + +public class RedissonScriptReactive implements RScriptReactive { + + private final CommandReactiveExecutor commandExecutor; + + protected RedissonScriptReactive(CommandReactiveExecutor commandExecutor) { + this.commandExecutor = commandExecutor; + } + + @Override + public Publisher scriptLoad(String luaScript) { + return commandExecutor.writeAllObservable(RedisCommands.SCRIPT_LOAD, new SlotCallback() { + volatile String result; + @Override + public void onSlotResult(String result) { + this.result = result; + } + + @Override + public String onFinish() { + return result; + } + }, luaScript); + } + + public Publisher scriptLoad(String key, String luaScript) { + return commandExecutor.writeObservable(key, RedisCommands.SCRIPT_LOAD, luaScript); + } + + @Override + public Publisher eval(RScript.Mode mode, String luaScript, RScript.ReturnType returnType, List keys, Object... values) { + return eval(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, keys, values); + } + + @Override + public Publisher eval(RScript.Mode mode, Codec codec, String luaScript, RScript.ReturnType returnType, List keys, Object... values) { + return eval(null, mode, codec, luaScript, returnType, keys, values); + } + + public Publisher eval(String key, RScript.Mode mode, Codec codec, String luaScript, RScript.ReturnType returnType, List keys, Object... values) { + if (mode == RScript.Mode.READ_ONLY) { + return commandExecutor.evalReadObservable(key, codec, returnType.getCommand(), luaScript, keys, values); + } + return commandExecutor.evalWriteObservable(key, codec, returnType.getCommand(), luaScript, keys, values); + } + + @Override + public Publisher evalSha(RScript.Mode mode, String shaDigest, RScript.ReturnType returnType, List keys, Object... values) { + return evalSha(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, keys, values); + } + + @Override + public Publisher evalSha(RScript.Mode mode, Codec codec, String shaDigest, RScript.ReturnType returnType, List keys, Object... values) { + return evalSha(null, mode, codec, shaDigest, returnType, keys, values); + } + + public Publisher evalSha(String key, RScript.Mode mode, Codec codec, String shaDigest, RScript.ReturnType returnType, List keys, Object... values) { + RedisCommand command = new RedisCommand(returnType.getCommand(), "EVALSHA"); + if (mode == RScript.Mode.READ_ONLY) { + return commandExecutor.evalReadObservable(key, codec, command, shaDigest, keys, values); + } + return commandExecutor.evalWriteObservable(key, codec, command, shaDigest, keys, values); + } + + @Override + public Publisher scriptKill() { + return commandExecutor.writeAllObservable(RedisCommands.SCRIPT_KILL); + } + + @Override + public Publisher> scriptExists(final String ... shaDigests) { + return commandExecutor.writeAllObservable(RedisCommands.SCRIPT_EXISTS, new SlotCallback, List>() { + volatile List result = new ArrayList(shaDigests.length); + @Override + public synchronized void onSlotResult(List result) { + for (int i = 0; i < result.size(); i++) { + if (this.result.size() == i) { + this.result.add(false); + } + this.result.set(i, this.result.get(i) | result.get(i)); + } + } + + @Override + public List onFinish() { + return new ArrayList(result); + } + }, (Object[])shaDigests); + } + + public Publisher> scriptExists(String key, String ... shaDigests) { + return commandExecutor.writeObservable(key, RedisCommands.SCRIPT_EXISTS, (Object[])shaDigests); + } + + @Override + public Publisher scriptFlush() { + return commandExecutor.writeAllObservable(RedisCommands.SCRIPT_FLUSH); + } + + @Override + public Publisher evalSha(RScript.Mode mode, String shaDigest, RScript.ReturnType returnType) { + return evalSha(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, Collections.emptyList()); + } + + @Override + public Publisher evalSha(RScript.Mode mode, Codec codec, String shaDigest, RScript.ReturnType returnType) { + return evalSha(null, mode, codec, shaDigest, returnType, Collections.emptyList()); + } + + @Override + public Publisher eval(RScript.Mode mode, String luaScript, RScript.ReturnType returnType) { + return eval(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, Collections.emptyList()); + } + + @Override + public Publisher eval(RScript.Mode mode, Codec codec, String luaScript, RScript.ReturnType returnType) { + return eval(null, mode, codec, luaScript, returnType, Collections.emptyList()); + } + +} diff --git a/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/src/main/java/org/redisson/command/CommandReactiveExecutor.java index f2a5b43df..5534f63e5 100644 --- a/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/src/main/java/org/redisson/command/CommandReactiveExecutor.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.util.List; import org.reactivestreams.Publisher; +import org.redisson.SlotCallback; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -32,6 +33,10 @@ public interface CommandReactiveExecutor { ConnectionManager getConnectionManager(); + Publisher writeAllObservable(RedisCommand command, Object ... params); + + Publisher writeAllObservable(RedisCommand command, SlotCallback callback, Object ... params); + Publisher readObservable(InetSocketAddress client, String key, Codec codec, RedisCommand command, Object ... params); Publisher evalWriteObservable(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); diff --git a/src/main/java/org/redisson/command/CommandReactiveService.java b/src/main/java/org/redisson/command/CommandReactiveService.java index 91017b4be..03d6c9f95 100644 --- a/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/src/main/java/org/redisson/command/CommandReactiveService.java @@ -20,6 +20,7 @@ import java.util.List; import org.reactivestreams.Publisher; import org.redisson.NettyFuturePublisher; +import org.redisson.SlotCallback; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; @@ -79,4 +80,17 @@ public class CommandReactiveService extends CommandAsyncService implements Comma return new NettyFuturePublisher(f); } + @Override + public Publisher writeAllObservable(RedisCommand command, Object ... params) { + Future f = writeAllAsync(command, params); + return new NettyFuturePublisher(f); + } + + @Override + public Publisher writeAllObservable(RedisCommand command, SlotCallback callback, Object ... params) { + Future f = writeAllAsync(command, callback, params); + return new NettyFuturePublisher(f); + } + + } diff --git a/src/main/java/org/redisson/core/RScriptReactive.java b/src/main/java/org/redisson/core/RScriptReactive.java new file mode 100644 index 000000000..5318732a7 --- /dev/null +++ b/src/main/java/org/redisson/core/RScriptReactive.java @@ -0,0 +1,51 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.List; + +import org.reactivestreams.Publisher; +import org.redisson.client.codec.Codec; +import org.redisson.core.RScript.Mode; +import org.redisson.core.RScript.ReturnType; + +public interface RScriptReactive { + + Publisher scriptFlush(); + + Publisher evalSha(Mode mode, String shaDigest, ReturnType returnType, List keys, Object... values); + + Publisher evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List keys, Object... values); + + Publisher evalSha(Mode mode, String shaDigest, ReturnType returnType); + + Publisher evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType); + + Publisher eval(Mode mode, String luaScript, ReturnType returnType, List keys, Object... values); + + Publisher eval(Mode mode, Codec codec, String luaScript, ReturnType returnType, List keys, Object... values); + + Publisher eval(Mode mode, String luaScript, ReturnType returnType); + + Publisher eval(Mode mode, Codec codec, String luaScript, ReturnType returnType); + + Publisher scriptLoad(String luaScript); + + Publisher> scriptExists(String ... shaDigests); + + Publisher scriptKill(); + +} diff --git a/src/test/java/org/redisson/RedissonScriptReactiveTest.java b/src/test/java/org/redisson/RedissonScriptReactiveTest.java new file mode 100644 index 000000000..ff35599b0 --- /dev/null +++ b/src/test/java/org/redisson/RedissonScriptReactiveTest.java @@ -0,0 +1,77 @@ +package org.redisson; + +import java.util.Collections; +import java.util.List; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.client.RedisException; +import org.redisson.core.RScript; +import org.redisson.core.RScriptReactive; + +public class RedissonScriptReactiveTest extends BaseReactiveTest { + + @Test + public void testEval() { + RScriptReactive script = redisson.getScript(); + List res = sync(script.>eval(RScript.Mode.READ_ONLY, "return {1,2,3.3333,'\"foo\"',nil,'bar'}", RScript.ReturnType.MULTI, Collections.emptyList())); + MatcherAssert.assertThat(res, Matchers.contains(1L, 2L, 3L, "foo")); + } + + @Test + public void testScriptExists() { + RScriptReactive s = redisson.getScript(); + String r = sync(s.scriptLoad("return redis.call('get', 'foo')")); + Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r); + + List r1 = sync(s.scriptExists(r)); + Assert.assertEquals(1, r1.size()); + Assert.assertTrue(r1.get(0)); + + s.scriptFlush(); + + List r2 = sync(s.scriptExists(r)); + Assert.assertEquals(1, r2.size()); + Assert.assertFalse(r2.get(0)); + } + + @Test + public void testScriptFlush() { + redisson.getBucket("foo").set("bar"); + String r = sync(redisson.getScript().scriptLoad("return redis.call('get', 'foo')")); + Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r); + String r1 = sync(redisson.getScript().evalSha(RScript.Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList())); + Assert.assertEquals("bar", r1); + sync(redisson.getScript().scriptFlush()); + + try { + sync(redisson.getScript().evalSha(RScript.Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList())); + } catch (Exception e) { + Assert.assertEquals(RedisException.class, e.getClass()); + } + } + + @Test + public void testScriptLoad() { + redisson.getBucket("foo").set("bar"); + String r = sync(redisson.getScript().scriptLoad("return redis.call('get', 'foo')")); + Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r); + String r1 = sync(redisson.getScript().evalSha(RScript.Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList())); + Assert.assertEquals("bar", r1); + } + + @Test + public void testEvalSha() { + RScriptReactive s = redisson.getScript(); + String res = sync(s.scriptLoad("return redis.call('get', 'foo')")); + Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", res); + + sync(redisson.getBucket("foo").set("bar")); + String r1 = sync(s.evalSha(RScript.Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList())); + Assert.assertEquals("bar", r1); + } + + +}