RScriptReactive added. #210

pull/337/head
Nikita 9 years ago
parent 9a4311e844
commit bf4897793e

@ -41,6 +41,7 @@ import org.redisson.core.RMapReactive;
import org.redisson.core.RPatternTopicReactive;
import org.redisson.core.RQueueReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RScriptReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;
@ -226,6 +227,11 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonBitSetReactive(commandExecutor, name);
}
@Override
public RScriptReactive getScript() {
return new RedissonScriptReactive(commandExecutor);
}
public Config getConfig() {
return config;
}

@ -32,6 +32,7 @@ import org.redisson.core.RMapReactive;
import org.redisson.core.RPatternTopicReactive;
import org.redisson.core.RQueueReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RScriptReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;
@ -177,13 +178,13 @@ public interface RedissonReactiveClient {
RBitSetReactive getBitSet(String name);
// /**
// * Returns script operations object
// *
// * @return
// */
// RScript getScript();
//
/**
* Returns script operations object
*
* @return
*/
RScriptReactive getScript();
// /**
// * Return batch object which executes group of
// * command in pipeline.

@ -0,0 +1,148 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.core.RScript;
import org.redisson.core.RScriptReactive;
public class RedissonScriptReactive implements RScriptReactive {
private final CommandReactiveExecutor commandExecutor;
protected RedissonScriptReactive(CommandReactiveExecutor commandExecutor) {
this.commandExecutor = commandExecutor;
}
@Override
public Publisher<String> scriptLoad(String luaScript) {
return commandExecutor.writeAllObservable(RedisCommands.SCRIPT_LOAD, new SlotCallback<String, String>() {
volatile String result;
@Override
public void onSlotResult(String result) {
this.result = result;
}
@Override
public String onFinish() {
return result;
}
}, luaScript);
}
public Publisher<String> scriptLoad(String key, String luaScript) {
return commandExecutor.writeObservable(key, RedisCommands.SCRIPT_LOAD, luaScript);
}
@Override
public <R> Publisher<R> eval(RScript.Mode mode, String luaScript, RScript.ReturnType returnType, List<Object> keys, Object... values) {
return eval(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, keys, values);
}
@Override
public <R> Publisher<R> eval(RScript.Mode mode, Codec codec, String luaScript, RScript.ReturnType returnType, List<Object> keys, Object... values) {
return eval(null, mode, codec, luaScript, returnType, keys, values);
}
public <R> Publisher<R> eval(String key, RScript.Mode mode, Codec codec, String luaScript, RScript.ReturnType returnType, List<Object> keys, Object... values) {
if (mode == RScript.Mode.READ_ONLY) {
return commandExecutor.evalReadObservable(key, codec, returnType.getCommand(), luaScript, keys, values);
}
return commandExecutor.evalWriteObservable(key, codec, returnType.getCommand(), luaScript, keys, values);
}
@Override
public <R> Publisher<R> evalSha(RScript.Mode mode, String shaDigest, RScript.ReturnType returnType, List<Object> keys, Object... values) {
return evalSha(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, keys, values);
}
@Override
public <R> Publisher<R> evalSha(RScript.Mode mode, Codec codec, String shaDigest, RScript.ReturnType returnType, List<Object> keys, Object... values) {
return evalSha(null, mode, codec, shaDigest, returnType, keys, values);
}
public <R> Publisher<R> evalSha(String key, RScript.Mode mode, Codec codec, String shaDigest, RScript.ReturnType returnType, List<Object> keys, Object... values) {
RedisCommand command = new RedisCommand(returnType.getCommand(), "EVALSHA");
if (mode == RScript.Mode.READ_ONLY) {
return commandExecutor.evalReadObservable(key, codec, command, shaDigest, keys, values);
}
return commandExecutor.evalWriteObservable(key, codec, command, shaDigest, keys, values);
}
@Override
public Publisher<Void> scriptKill() {
return commandExecutor.writeAllObservable(RedisCommands.SCRIPT_KILL);
}
@Override
public Publisher<List<Boolean>> scriptExists(final String ... shaDigests) {
return commandExecutor.writeAllObservable(RedisCommands.SCRIPT_EXISTS, new SlotCallback<List<Boolean>, List<Boolean>>() {
volatile List<Boolean> result = new ArrayList<Boolean>(shaDigests.length);
@Override
public synchronized void onSlotResult(List<Boolean> result) {
for (int i = 0; i < result.size(); i++) {
if (this.result.size() == i) {
this.result.add(false);
}
this.result.set(i, this.result.get(i) | result.get(i));
}
}
@Override
public List<Boolean> onFinish() {
return new ArrayList<Boolean>(result);
}
}, (Object[])shaDigests);
}
public Publisher<List<Boolean>> scriptExists(String key, String ... shaDigests) {
return commandExecutor.writeObservable(key, RedisCommands.SCRIPT_EXISTS, (Object[])shaDigests);
}
@Override
public Publisher<Void> scriptFlush() {
return commandExecutor.writeAllObservable(RedisCommands.SCRIPT_FLUSH);
}
@Override
public <R> Publisher<R> evalSha(RScript.Mode mode, String shaDigest, RScript.ReturnType returnType) {
return evalSha(null, mode, commandExecutor.getConnectionManager().getCodec(), shaDigest, returnType, Collections.emptyList());
}
@Override
public <R> Publisher<R> evalSha(RScript.Mode mode, Codec codec, String shaDigest, RScript.ReturnType returnType) {
return evalSha(null, mode, codec, shaDigest, returnType, Collections.emptyList());
}
@Override
public <R> Publisher<R> eval(RScript.Mode mode, String luaScript, RScript.ReturnType returnType) {
return eval(null, mode, commandExecutor.getConnectionManager().getCodec(), luaScript, returnType, Collections.emptyList());
}
@Override
public <R> Publisher<R> eval(RScript.Mode mode, Codec codec, String luaScript, RScript.ReturnType returnType) {
return eval(null, mode, codec, luaScript, returnType, Collections.emptyList());
}
}

@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.SlotCallback;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
@ -32,6 +33,10 @@ public interface CommandReactiveExecutor {
ConnectionManager getConnectionManager();
<T> Publisher<Void> writeAllObservable(RedisCommand<T> command, Object ... params);
<R, T> Publisher<R> writeAllObservable(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
<T, R> Publisher<R> readObservable(InetSocketAddress client, String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> evalWriteObservable(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);

@ -20,6 +20,7 @@ import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.NettyFuturePublisher;
import org.redisson.SlotCallback;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
@ -79,4 +80,17 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
return new NettyFuturePublisher<R>(f);
}
@Override
public <T> Publisher<Void> writeAllObservable(RedisCommand<T> command, Object ... params) {
Future<Void> f = writeAllAsync(command, params);
return new NettyFuturePublisher<Void>(f);
}
@Override
public <R, T> Publisher<R> writeAllObservable(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
Future<R> f = writeAllAsync(command, callback, params);
return new NettyFuturePublisher<R>(f);
}
}

@ -0,0 +1,51 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.core.RScript.Mode;
import org.redisson.core.RScript.ReturnType;
public interface RScriptReactive {
Publisher<Void> scriptFlush();
<R> Publisher<R> evalSha(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
<R> Publisher<R> evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
<R> Publisher<R> evalSha(Mode mode, String shaDigest, ReturnType returnType);
<R> Publisher<R> evalSha(Mode mode, Codec codec, String shaDigest, ReturnType returnType);
<R> Publisher<R> eval(Mode mode, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> Publisher<R> eval(Mode mode, Codec codec, String luaScript, ReturnType returnType, List<Object> keys, Object... values);
<R> Publisher<R> eval(Mode mode, String luaScript, ReturnType returnType);
<R> Publisher<R> eval(Mode mode, Codec codec, String luaScript, ReturnType returnType);
Publisher<String> scriptLoad(String luaScript);
Publisher<List<Boolean>> scriptExists(String ... shaDigests);
Publisher<Void> scriptKill();
}

@ -0,0 +1,77 @@
package org.redisson;
import java.util.Collections;
import java.util.List;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisException;
import org.redisson.core.RScript;
import org.redisson.core.RScriptReactive;
public class RedissonScriptReactiveTest extends BaseReactiveTest {
@Test
public void testEval() {
RScriptReactive script = redisson.getScript();
List<Object> res = sync(script.<List<Object>>eval(RScript.Mode.READ_ONLY, "return {1,2,3.3333,'\"foo\"',nil,'bar'}", RScript.ReturnType.MULTI, Collections.emptyList()));
MatcherAssert.assertThat(res, Matchers.<Object>contains(1L, 2L, 3L, "foo"));
}
@Test
public void testScriptExists() {
RScriptReactive s = redisson.getScript();
String r = sync(s.scriptLoad("return redis.call('get', 'foo')"));
Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r);
List<Boolean> r1 = sync(s.scriptExists(r));
Assert.assertEquals(1, r1.size());
Assert.assertTrue(r1.get(0));
s.scriptFlush();
List<Boolean> r2 = sync(s.scriptExists(r));
Assert.assertEquals(1, r2.size());
Assert.assertFalse(r2.get(0));
}
@Test
public void testScriptFlush() {
redisson.getBucket("foo").set("bar");
String r = sync(redisson.getScript().scriptLoad("return redis.call('get', 'foo')"));
Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r);
String r1 = sync(redisson.getScript().<String>evalSha(RScript.Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList()));
Assert.assertEquals("bar", r1);
sync(redisson.getScript().scriptFlush());
try {
sync(redisson.getScript().evalSha(RScript.Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList()));
} catch (Exception e) {
Assert.assertEquals(RedisException.class, e.getClass());
}
}
@Test
public void testScriptLoad() {
redisson.getBucket("foo").set("bar");
String r = sync(redisson.getScript().scriptLoad("return redis.call('get', 'foo')"));
Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r);
String r1 = sync(redisson.getScript().<String>evalSha(RScript.Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList()));
Assert.assertEquals("bar", r1);
}
@Test
public void testEvalSha() {
RScriptReactive s = redisson.getScript();
String res = sync(s.scriptLoad("return redis.call('get', 'foo')"));
Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", res);
sync(redisson.getBucket("foo").set("bar"));
String r1 = sync(s.<String>evalSha(RScript.Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList()));
Assert.assertEquals("bar", r1);
}
}
Loading…
Cancel
Save