Scripts fixed

pull/243/head
Nikita 10 years ago
parent 4a132d4cbf
commit 1d42dc3123

@ -73,7 +73,7 @@ public interface CommandExecutor {
<T, R> Future<Queue<R>> readAllAsync(RedisCommand<T> command, Object ... params);
<T> Future<Boolean> writeAllAsync(RedisCommand<T> command, Object ... params);
<T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params);
<T, R> Future<R> writeAsync(String key, RedisCommand<T> command, Object ... params);

@ -99,34 +99,8 @@ public class CommandExecutorService implements CommandExecutor {
return mainPromise;
}
public <T> Future<Boolean> writeAllAsync(RedisCommand<T> command, Object ... params) {
return allAsync(false, command, params);
}
public <T> Future<Boolean> allAsync(boolean readOnlyMode, RedisCommand<T> command, Object ... params) {
final Promise<Boolean> mainPromise = connectionManager.newPromise();
Promise<Object> promise = new DefaultPromise<Object>() {
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
@Override
public Promise<Object> setSuccess(Object result) {
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.setSuccess(true);
}
return this;
}
@Override
public Promise<Object> setFailure(Throwable cause) {
mainPromise.setFailure(cause);
return this;
}
};
for (Integer slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, 0);
}
return mainPromise;
public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
return writeAllAsync(command, null, params);
}
public <R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params) {
@ -139,10 +113,15 @@ public class CommandExecutorService implements CommandExecutor {
AtomicInteger counter = new AtomicInteger(connectionManager.getEntries().keySet().size());
@Override
public Promise<T> setSuccess(T result) {
callback.onSlotResult(result);
if (counter.decrementAndGet() == 0
&& !mainPromise.isDone()) {
mainPromise.setSuccess(callback.onFinish());
if (callback != null) {
callback.onSlotResult(result);
}
if (counter.decrementAndGet() == 0) {
if (callback != null) {
mainPromise.setSuccess(callback.onFinish());
} else {
mainPromise.setSuccess(null);
}
}
return this;
}

@ -34,7 +34,7 @@ public class RedissonScript implements RScript {
@Override
public String scriptLoad(String luaScript) {
return scriptLoad(null, luaScript);
return commandExecutor.get(scriptLoadAsync(luaScript));
}
@Override
@ -43,7 +43,18 @@ public class RedissonScript implements RScript {
}
public Future<String> scriptLoadAsync(String luaScript) {
return scriptLoadAsync(null, luaScript);
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_LOAD, new SlotCallback<String, String>() {
volatile String result;
@Override
public void onSlotResult(String result) {
this.result = result;
}
@Override
public String onFinish() {
return result;
}
}, luaScript);
}
@Override
@ -119,22 +130,22 @@ public class RedissonScript implements RScript {
}
@Override
public boolean scriptKill() {
return commandExecutor.get(scriptKillAsync());
public void scriptKill() {
commandExecutor.get(scriptKillAsync());
}
@Override
public boolean scriptKill(String key) {
return commandExecutor.get(scriptKillAsync(key));
public void scriptKill(String key) {
commandExecutor.get(scriptKillAsync(key));
}
@Override
public Future<Boolean> scriptKillAsync() {
public Future<Void> scriptKillAsync() {
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_KILL);
}
@Override
public Future<Boolean> scriptKillAsync(String key) {
public Future<Void> scriptKillAsync(String key) {
return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_KILL);
}
@ -149,22 +160,22 @@ public class RedissonScript implements RScript {
}
@Override
public boolean scriptFlush() {
return commandExecutor.get(scriptFlushAsync());
public void scriptFlush() {
commandExecutor.get(scriptFlushAsync());
}
@Override
public boolean scriptFlush(String key) {
return commandExecutor.get(scriptFlushAsync(key));
public void scriptFlush(String key) {
commandExecutor.get(scriptFlushAsync(key));
}
@Override
public Future<Boolean> scriptFlushAsync() {
public Future<Void> scriptFlushAsync() {
return commandExecutor.writeAllAsync(RedisCommands.SCRIPT_FLUSH);
}
@Override
public Future<Boolean> scriptFlushAsync(String key) {
public Future<Void> scriptFlushAsync(String key) {
return commandExecutor.writeAsync(key, RedisCommands.SCRIPT_FLUSH);
}

@ -55,15 +55,15 @@ public interface RScript extends RScriptAsync {
String scriptLoad(String luaScript);
boolean scriptKill();
void scriptKill();
List<Boolean> scriptExists(String key, String ... shaDigests);
boolean scriptFlush();
void scriptFlush();
boolean scriptFlush(String key);
void scriptFlush(String key);
boolean scriptKill(String key);
void scriptKill(String key);
String scriptLoad(String key, String luaScript);

@ -24,7 +24,7 @@ import io.netty.util.concurrent.Future;
public interface RScriptAsync {
Future<Boolean> scriptFlushAsync();
Future<Void> scriptFlushAsync();
<R> Future<R> evalShaAsync(Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values);
@ -32,13 +32,13 @@ public interface RScriptAsync {
Future<String> scriptLoadAsync(String luaScript);
Future<Boolean> scriptKillAsync();
Future<Void> scriptKillAsync();
Future<List<Boolean>> scriptExistsAsync(String key, String ... shaDigests);
Future<Boolean> scriptFlushAsync(String key);
Future<Void> scriptFlushAsync(String key);
Future<Boolean> scriptKillAsync(String key);
Future<Void> scriptKillAsync(String key);
Future<String> scriptLoadAsync(String key, String luaScript);

@ -53,8 +53,8 @@ public class RedissonScriptTest extends BaseTest {
Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", r);
String r1 = redisson.getScript().evalSha(Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList());
Assert.assertEquals("bar", r1);
Boolean r2 = redisson.getScript().scriptFlush();
Assert.assertTrue(r2);
redisson.getScript().scriptFlush();
try {
redisson.getScript().evalSha(Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList());
} catch (Exception e) {
@ -82,15 +82,21 @@ public class RedissonScriptTest extends BaseTest {
@Test
public void testEvalSha() {
RScript s = redisson.getScript();
String res = s.scriptLoad(null, "return redis.call('get', 'foo')");
Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", res);
redisson.getBucket("foo").set("bar");
String r = redisson.getScript().eval(Mode.READ_ONLY, "return redis.call('get', 'foo')", RScript.ReturnType.VALUE);
Assert.assertEquals("bar", r);
String r1 = redisson.getScript().evalSha(Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList());
String r1 = s.evalSha(Mode.READ_ONLY, "282297a0228f48cd3fc6a55de6316f31422f5d17", RScript.ReturnType.VALUE, Collections.emptyList());
Assert.assertEquals("bar", r1);
}
@Test
public void testEvalshaAsync() {
RScript s = redisson.getScript();
String res = s.scriptLoad(null, "return redis.call('get', 'foo')");
Assert.assertEquals("282297a0228f48cd3fc6a55de6316f31422f5d17", res);
redisson.getBucket("foo").set("bar");
String r = redisson.getScript().eval(Mode.READ_ONLY, "return redis.call('get', 'foo')", RScript.ReturnType.VALUE);
Assert.assertEquals("bar", r);

Loading…
Cancel
Save