From 3189c677f84b5fb4390ea1ac9ebd96443359eb2a Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 26 Jun 2015 15:29:54 +0300 Subject: [PATCH] RMap script atomicity. #145 new methods replaceAsync, removeAsync, putIfAbsentAsync added --- .../redis/RedisAsyncConnection.java | 16 +- .../lambdaworks/redis/ScriptOutputType.java | 2 +- src/main/java/org/redisson/RedissonMap.java | 286 ++++-------------- src/main/java/org/redisson/core/RMap.java | 8 + 4 files changed, 78 insertions(+), 234 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java index 57c92c9e9..ed95587c0 100644 --- a/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java +++ b/src/main/java/com/lambdaworks/redis/RedisAsyncConnection.java @@ -300,28 +300,28 @@ public class RedisAsyncConnection extends ChannelInboundHandlerAdapter { public Future eval(V script, ScriptOutputType type, K[] keys, V... values) { CommandArgs args = new CommandArgs(codec); - args.add(script.toString()).add(keys.length).addKeys(keys).addValues(values); + args.add(script.toString()).add(keys.length).addKeys(keys).addMapValues(values); CommandOutput output = newScriptOutput(codec, type); return dispatch(EVAL, output, args); } - + public Future eval(V script, ScriptOutputType type, List keys, V... values) { CommandArgs args = new CommandArgs(codec); - args.add(script.toString()).add(keys.size()).addKeys(keys).addValues(values); + args.add(script.toString()).add(keys.size()).addKeys(keys).addMapValues(values); CommandOutput output = newScriptOutput(codec, type); return dispatch(EVAL, output, args); } public Future evalsha(String digest, ScriptOutputType type, List keys, V... values) { CommandArgs args = new CommandArgs(codec); - args.add(digest).add(keys.size()).addKeys(keys).addValues(values); + args.add(digest).add(keys.size()).addKeys(keys).addMapValues(values); CommandOutput output = newScriptOutput(codec, type); return dispatch(EVALSHA, output, args); } - + public Future evalsha(String digest, ScriptOutputType type, K[] keys, V... values) { CommandArgs args = new CommandArgs(codec); - args.add(digest).add(keys.length).addKeys(keys).addValues(values); + args.add(digest).add(keys.length).addKeys(keys).addMapValues(values); CommandOutput output = newScriptOutput(codec, type); return dispatch(EVALSHA, output, args); } @@ -1262,7 +1262,7 @@ public class RedisAsyncConnection extends ChannelInboundHandlerAdapter { } return cmd.getNow(); } - + public T awaitInterruptibly(Future cmd, long timeout, TimeUnit unit) throws InterruptedException { if (!cmd.await(timeout, unit)) { Promise promise = (Promise)cmd; @@ -1288,6 +1288,8 @@ public class RedisAsyncConnection extends ChannelInboundHandlerAdapter { case STATUS: return (CommandOutput) new StatusOutput(codec); case MULTI: return (CommandOutput) new NestedMultiOutput(codec); case VALUE: return (CommandOutput) new ValueOutput(codec); + case MAPVALUE: return (CommandOutput) new MapValueOutput(codec); + case MAPVALUELIST: return (CommandOutput) new MapValueListOutput(codec); default: throw new RedisException("Unsupported script output type"); } } diff --git a/src/main/java/com/lambdaworks/redis/ScriptOutputType.java b/src/main/java/com/lambdaworks/redis/ScriptOutputType.java index 8cb14cbc2..c2c5c3a9d 100644 --- a/src/main/java/com/lambdaworks/redis/ScriptOutputType.java +++ b/src/main/java/com/lambdaworks/redis/ScriptOutputType.java @@ -16,6 +16,6 @@ package com.lambdaworks.redis; * @author Will Glozer */ public enum ScriptOutputType { - BOOLEAN, INTEGER, MULTI, STATUS, VALUE + BOOLEAN, INTEGER, MULTI, STATUS, VALUE, MAPVALUE, MAPVALUELIST } diff --git a/src/main/java/org/redisson/RedissonMap.java b/src/main/java/org/redisson/RedissonMap.java index 91e8c2ee4..2b7367afb 100644 --- a/src/main/java/org/redisson/RedissonMap.java +++ b/src/main/java/org/redisson/RedissonMap.java @@ -15,24 +15,27 @@ */ package org.redisson; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; - import java.math.BigDecimal; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; -import org.redisson.async.AsyncOperation; -import org.redisson.async.OperationListener; import org.redisson.async.ResultOperation; -import org.redisson.async.SyncOperation; import org.redisson.connection.ConnectionManager; import org.redisson.core.Predicate; import org.redisson.core.RMap; +import org.redisson.core.RScript; import com.lambdaworks.redis.RedisAsyncConnection; -import com.lambdaworks.redis.RedisConnection; import com.lambdaworks.redis.output.MapScanResult; +import io.netty.util.concurrent.Future; + /** * Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap} * and {@link java.util.Map} @@ -176,120 +179,56 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public V putIfAbsent(final K key, final V value) { -// while (true) { -// Boolean res = connection.hsetnx(getName(), key, value); -// if (!res) { -// V result = (V) connection.hget(getName(), key); -// if (result != null) { -// return result; -// } -// } else { -// return null; -// } -// } - - return connectionManager.write(getName(), new AsyncOperation() { - @Override - public void execute(final Promise promise, final RedisAsyncConnection async) { - final AsyncOperation timeoutCallback = this; - async.hsetnx(getName(), key, value).addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - if (future.get()) { - promise.setSuccess(null); - return; - } - - async.hget(getName(), key).addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - V prev = future.get(); - if (prev != null) { - promise.setSuccess(prev); - } else { - timeoutCallback.execute(promise, async); - } - } - }); - } - }); - } - }); + public V putIfAbsent(K key, V value) { + return connectionManager.get(putIfAbsentAsync(key, value)); } - private boolean isEquals(RedisConnection connection, Object key, Object value) { - Object val = connection.hget(getName(), key); - return (value != null && value.equals(val)) || (value == null && val == null); + @Override + public Future putIfAbsentAsync(K key, V value) { + return new RedissonScript(connectionManager) + .evalAsync("if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return nil else return redis.call('hget', KEYS[1], ARGV[1]) end", + RScript.ReturnType.VALUE, + Collections.singletonList(getName()), key, value); } @Override - public boolean remove(final Object key, final Object value) { - return connectionManager.write(getName(), new SyncOperation() { - @Override - public Boolean execute(RedisConnection connection) { - while (true) { - connection.watch(getName()); - if (connection.hexists(getName(), key) - && isEquals(connection, key, value)) { - connection.multi(); - connection.hdel(getName(), key); - if (connection.exec().size() == 1) { - return true; - } - } else { - connection.unwatch(); - return false; - } - } - } - }); + public boolean remove(Object key, Object value) { + Long result = connectionManager.get(removeAsync(key, value)); + return result != null && result == 1; } @Override - public boolean replace(final K key, final V oldValue, final V newValue) { - return connectionManager.write(getName(), new SyncOperation() { - @Override - public Boolean execute(RedisConnection connection) { - while (true) { - connection.watch(getName()); - if (connection.hexists(getName(), key) - && isEquals(connection, key, oldValue)) { - connection.multi(); - connection.hset(getName(), key, newValue); - if (connection.exec().size() == 1) { - return true; - } - } else { - connection.unwatch(); - return false; - } - } - } - }); + public Future removeAsync(Object key, Object value) { + return new RedissonScript(connectionManager) + .evalAsync("if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then return redis.call('hdel', KEYS[1], ARGV[1]) else return nil end", + RScript.ReturnType.INTEGER, + Collections.singletonList(getName()), key, value); } @Override - public V replace(final K key, final V value) { - return connectionManager.write(getName(), new SyncOperation() { - @Override - public V execute(RedisConnection connection) { - while (true) { - connection.watch(getName()); - if (connection.hexists(getName(), key)) { - V prev = connection.hget(getName(), key); - connection.multi(); - connection.hset(getName(), key, value); - if (connection.exec().size() == 1) { - return prev; - } - } else { - connection.unwatch(); - return null; - } - } - } - }); + public boolean replace(K key, V oldValue, V newValue) { + return "OK".equals(connectionManager.get(replaceAsync(key, oldValue, newValue))); + } + + @Override + public Future replaceAsync(K key, V oldValue, V newValue) { + return new RedissonScript(connectionManager) + .evalAsync("if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); return 'OK'; else return nil; end", + RScript.ReturnType.STATUS, + Collections.singletonList(getName()), key, oldValue, newValue); + } + + @Override + public V replace(K key, V value) { + return connectionManager.get(replaceAsync(key, value)); + } + + @Override + public Future replaceAsync(K key, V value) { + return new RedissonScript(connectionManager) + .evalAsync("if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v; else return nil; end", + RScript.ReturnType.VALUE, + Collections.singletonList(getName()), key, value); } @Override @@ -303,125 +242,20 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public Future putAsync(final K key, final V value) { -// while (true) { -// connection.watch(getName()); -// V prev = (V) connection.hget(getName(), key); -// connection.multi(); -// connection.hset(getName(), key, value); -// if (connection.exec().size() == 1) { -// return prev; -// } -// } - - return connectionManager.writeAsync(getName(), new AsyncOperation() { - @Override - public void execute(final Promise promise, RedisAsyncConnection async) { - putAsync(key, value, promise, async, this); - } - }); + public Future putAsync(K key, V value) { + return new RedissonScript(connectionManager) + .evalAsync("local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v", + RScript.ReturnType.VALUE, + Collections.singletonList(getName()), key, value); } - private void putAsync(final K key, final V value, final Promise promise, - final RedisAsyncConnection async, final AsyncOperation timeoutCallback) { - async.watch(getName()).addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - - async.hget(getName(), key).addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - - final V prev = future.get(); - async.multi().addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - - async.hset(getName(), key, value).addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - - async.exec().addListener(new OperationListener>(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future> future) throws Exception { - - if (future.get().size() == 1) { - promise.setSuccess(prev); - } else { - timeoutCallback.execute(promise, async); - } - } - }); - } - }); - } - }); - } - }); - } - - }); - } @Override - public Future removeAsync(final K key) { -// while (true) { -// connection.watch(getName()); -// V prev = (V) connection.hget(getName(), key); -// connection.multi(); -// connection.hdel(getName(), key); -// if (connection.exec().size() == 1) { -// return prev; -// } -// } - - return connectionManager.writeAsync(getName(), new AsyncOperation() { - @Override - public void execute(final Promise promise, RedisAsyncConnection async) { - removeAsync(key, promise, async, this); - } - }); - } - - private void removeAsync(final K key, final Promise promise, - final RedisAsyncConnection async, final AsyncOperation timeoutCallback) { - async.watch(getName()).addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - - async.hget(getName(), key).addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - final V prev = future.get(); - - async.multi().addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - - async.hdel(getName(), key).addListener(new OperationListener(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future future) throws Exception { - - async.exec().addListener(new OperationListener>(promise, async, timeoutCallback) { - @Override - public void onOperationComplete(Future> future) throws Exception { - - if (future.get().size() == 1) { - promise.setSuccess(prev); - } else { - timeoutCallback.execute(promise, async); - } - } - }); - - } - }); - } - }); - } - }); - } - }); + public Future removeAsync(K key) { + return new RedissonScript(connectionManager) + .evalAsync("local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[1], ARGV[1]); return v", + RScript.ReturnType.VALUE, + Collections.singletonList(getName()), key); } @Override diff --git a/src/main/java/org/redisson/core/RMap.java b/src/main/java/org/redisson/core/RMap.java index b7ef2d802..f315cb190 100644 --- a/src/main/java/org/redisson/core/RMap.java +++ b/src/main/java/org/redisson/core/RMap.java @@ -144,4 +144,12 @@ public interface RMap extends ConcurrentMap, RExpirable { Future removeAsync(K key); + Future replaceAsync(K key, V value); + + Future replaceAsync(K key, V oldValue, V newValue); + + Future removeAsync(Object key, Object value); + + Future putIfAbsentAsync(K key, V value); + }