RMap script atomicity. #145 new methods replaceAsync, removeAsync, putIfAbsentAsync added

pull/100/merge
Nikita 10 years ago
parent 270d891b29
commit 3189c677f8

@ -300,28 +300,28 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
public <T> Future<T> eval(V script, ScriptOutputType type, K[] keys, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(script.toString()).add(keys.length).addKeys(keys).addValues(values);
args.add(script.toString()).add(keys.length).addKeys(keys).addMapValues(values);
CommandOutput<K, V, T> output = newScriptOutput(codec, type);
return dispatch(EVAL, output, args);
}
public <T> Future<T> eval(V script, ScriptOutputType type, List<K> keys, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(script.toString()).add(keys.size()).addKeys(keys).addValues(values);
args.add(script.toString()).add(keys.size()).addKeys(keys).addMapValues(values);
CommandOutput<K, V, T> output = newScriptOutput(codec, type);
return dispatch(EVAL, output, args);
}
public <T> Future<T> evalsha(String digest, ScriptOutputType type, List<K> keys, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(digest).add(keys.size()).addKeys(keys).addValues(values);
args.add(digest).add(keys.size()).addKeys(keys).addMapValues(values);
CommandOutput<K, V, T> output = newScriptOutput(codec, type);
return dispatch(EVALSHA, output, args);
}
public <T> Future<T> evalsha(String digest, ScriptOutputType type, K[] keys, V... values) {
CommandArgs<K, V> args = new CommandArgs<K, V>(codec);
args.add(digest).add(keys.length).addKeys(keys).addValues(values);
args.add(digest).add(keys.length).addKeys(keys).addMapValues(values);
CommandOutput<K, V, T> output = newScriptOutput(codec, type);
return dispatch(EVALSHA, output, args);
}
@ -1262,7 +1262,7 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
}
return cmd.getNow();
}
public <T> T awaitInterruptibly(Future<T> cmd, long timeout, TimeUnit unit) throws InterruptedException {
if (!cmd.await(timeout, unit)) {
Promise<T> promise = (Promise<T>)cmd;
@ -1288,6 +1288,8 @@ public class RedisAsyncConnection<K, V> extends ChannelInboundHandlerAdapter {
case STATUS: return (CommandOutput<K, V, T>) new StatusOutput<K, V>(codec);
case MULTI: return (CommandOutput<K, V, T>) new NestedMultiOutput<K, V>(codec);
case VALUE: return (CommandOutput<K, V, T>) new ValueOutput<K, V>(codec);
case MAPVALUE: return (CommandOutput<K, V, T>) new MapValueOutput<K, V>(codec);
case MAPVALUELIST: return (CommandOutput<K, V, T>) new MapValueListOutput<K, V>(codec);
default: throw new RedisException("Unsupported script output type");
}
}

@ -16,6 +16,6 @@ package com.lambdaworks.redis;
* @author Will Glozer
*/
public enum ScriptOutputType {
BOOLEAN, INTEGER, MULTI, STATUS, VALUE
BOOLEAN, INTEGER, MULTI, STATUS, VALUE, MAPVALUE, MAPVALUELIST
}

@ -15,24 +15,27 @@
*/
package org.redisson;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.math.BigDecimal;
import java.util.*;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.redisson.async.AsyncOperation;
import org.redisson.async.OperationListener;
import org.redisson.async.ResultOperation;
import org.redisson.async.SyncOperation;
import org.redisson.connection.ConnectionManager;
import org.redisson.core.Predicate;
import org.redisson.core.RMap;
import org.redisson.core.RScript;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.output.MapScanResult;
import io.netty.util.concurrent.Future;
/**
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
* and {@link java.util.Map}
@ -176,120 +179,56 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public V putIfAbsent(final K key, final V value) {
// while (true) {
// Boolean res = connection.hsetnx(getName(), key, value);
// if (!res) {
// V result = (V) connection.hget(getName(), key);
// if (result != null) {
// return result;
// }
// } else {
// return null;
// }
// }
return connectionManager.write(getName(), new AsyncOperation<V, V>() {
@Override
public void execute(final Promise<V> promise, final RedisAsyncConnection<Object, V> async) {
final AsyncOperation<V, V> timeoutCallback = this;
async.hsetnx(getName(), key, value).addListener(new OperationListener<V, V, Boolean>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<Boolean> future) throws Exception {
if (future.get()) {
promise.setSuccess(null);
return;
}
async.hget(getName(), key).addListener(new OperationListener<V, V, V>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<V> future) throws Exception {
V prev = future.get();
if (prev != null) {
promise.setSuccess(prev);
} else {
timeoutCallback.execute(promise, async);
}
}
});
}
});
}
});
public V putIfAbsent(K key, V value) {
return connectionManager.get(putIfAbsentAsync(key, value));
}
private boolean isEquals(RedisConnection<Object, Object> connection, Object key, Object value) {
Object val = connection.hget(getName(), key);
return (value != null && value.equals(val)) || (value == null && val == null);
@Override
public Future<V> putIfAbsentAsync(K key, V value) {
return new RedissonScript(connectionManager)
.evalAsync("if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return nil else return redis.call('hget', KEYS[1], ARGV[1]) end",
RScript.ReturnType.VALUE,
Collections.<Object>singletonList(getName()), key, value);
}
@Override
public boolean remove(final Object key, final Object value) {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> connection) {
while (true) {
connection.watch(getName());
if (connection.hexists(getName(), key)
&& isEquals(connection, key, value)) {
connection.multi();
connection.hdel(getName(), key);
if (connection.exec().size() == 1) {
return true;
}
} else {
connection.unwatch();
return false;
}
}
}
});
public boolean remove(Object key, Object value) {
Long result = connectionManager.get(removeAsync(key, value));
return result != null && result == 1;
}
@Override
public boolean replace(final K key, final V oldValue, final V newValue) {
return connectionManager.write(getName(), new SyncOperation<Object, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Object> connection) {
while (true) {
connection.watch(getName());
if (connection.hexists(getName(), key)
&& isEquals(connection, key, oldValue)) {
connection.multi();
connection.hset(getName(), key, newValue);
if (connection.exec().size() == 1) {
return true;
}
} else {
connection.unwatch();
return false;
}
}
}
});
public Future<Long> removeAsync(Object key, Object value) {
return new RedissonScript(connectionManager)
.evalAsync("if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then return redis.call('hdel', KEYS[1], ARGV[1]) else return nil end",
RScript.ReturnType.INTEGER,
Collections.<Object>singletonList(getName()), key, value);
}
@Override
public V replace(final K key, final V value) {
return connectionManager.write(getName(), new SyncOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> connection) {
while (true) {
connection.watch(getName());
if (connection.hexists(getName(), key)) {
V prev = connection.hget(getName(), key);
connection.multi();
connection.hset(getName(), key, value);
if (connection.exec().size() == 1) {
return prev;
}
} else {
connection.unwatch();
return null;
}
}
}
});
public boolean replace(K key, V oldValue, V newValue) {
return "OK".equals(connectionManager.get(replaceAsync(key, oldValue, newValue)));
}
@Override
public Future<V> replaceAsync(K key, V oldValue, V newValue) {
return new RedissonScript(connectionManager)
.evalAsync("if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); return 'OK'; else return nil; end",
RScript.ReturnType.STATUS,
Collections.<Object>singletonList(getName()), key, oldValue, newValue);
}
@Override
public V replace(K key, V value) {
return connectionManager.get(replaceAsync(key, value));
}
@Override
public Future<V> replaceAsync(K key, V value) {
return new RedissonScript(connectionManager)
.evalAsync("if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v; else return nil; end",
RScript.ReturnType.VALUE,
Collections.<Object>singletonList(getName()), key, value);
}
@Override
@ -303,125 +242,20 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public Future<V> putAsync(final K key, final V value) {
// while (true) {
// connection.watch(getName());
// V prev = (V) connection.hget(getName(), key);
// connection.multi();
// connection.hset(getName(), key, value);
// if (connection.exec().size() == 1) {
// return prev;
// }
// }
return connectionManager.writeAsync(getName(), new AsyncOperation<V, V>() {
@Override
public void execute(final Promise<V> promise, RedisAsyncConnection<Object, V> async) {
putAsync(key, value, promise, async, this);
}
});
public Future<V> putAsync(K key, V value) {
return new RedissonScript(connectionManager)
.evalAsync("local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); return v",
RScript.ReturnType.VALUE,
Collections.<Object>singletonList(getName()), key, value);
}
private void putAsync(final K key, final V value, final Promise<V> promise,
final RedisAsyncConnection<Object, V> async, final AsyncOperation<V, V> timeoutCallback) {
async.watch(getName()).addListener(new OperationListener<V, V, String>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<String> future) throws Exception {
async.hget(getName(), key).addListener(new OperationListener<V, V, V>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<V> future) throws Exception {
final V prev = future.get();
async.multi().addListener(new OperationListener<V, V, String>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<String> future) throws Exception {
async.hset(getName(), key, value).addListener(new OperationListener<V, V, Boolean>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<Boolean> future) throws Exception {
async.exec().addListener(new OperationListener<V, V, List<Object>>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<List<Object>> future) throws Exception {
if (future.get().size() == 1) {
promise.setSuccess(prev);
} else {
timeoutCallback.execute(promise, async);
}
}
});
}
});
}
});
}
});
}
});
}
@Override
public Future<V> removeAsync(final K key) {
// while (true) {
// connection.watch(getName());
// V prev = (V) connection.hget(getName(), key);
// connection.multi();
// connection.hdel(getName(), key);
// if (connection.exec().size() == 1) {
// return prev;
// }
// }
return connectionManager.writeAsync(getName(), new AsyncOperation<V, V>() {
@Override
public void execute(final Promise<V> promise, RedisAsyncConnection<Object, V> async) {
removeAsync(key, promise, async, this);
}
});
}
private void removeAsync(final K key, final Promise<V> promise,
final RedisAsyncConnection<Object, V> async, final AsyncOperation<V, V> timeoutCallback) {
async.watch(getName()).addListener(new OperationListener<V, V, String>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<String> future) throws Exception {
async.hget(getName(), key).addListener(new OperationListener<V, V, V>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<V> future) throws Exception {
final V prev = future.get();
async.multi().addListener(new OperationListener<V, V, String>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<String> future) throws Exception {
async.hdel(getName(), key).addListener(new OperationListener<V, V, Long>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<Long> future) throws Exception {
async.exec().addListener(new OperationListener<V, V, List<Object>>(promise, async, timeoutCallback) {
@Override
public void onOperationComplete(Future<List<Object>> future) throws Exception {
if (future.get().size() == 1) {
promise.setSuccess(prev);
} else {
timeoutCallback.execute(promise, async);
}
}
});
}
});
}
});
}
});
}
});
public Future<V> removeAsync(K key) {
return new RedissonScript(connectionManager)
.evalAsync("local v = redis.call('hget', KEYS[1], ARGV[1]); redis.call('hdel', KEYS[1], ARGV[1]); return v",
RScript.ReturnType.VALUE,
Collections.<Object>singletonList(getName()), key);
}
@Override

@ -144,4 +144,12 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable {
Future<V> removeAsync(K key);
Future<V> replaceAsync(K key, V value);
Future<V> replaceAsync(K key, V oldValue, V newValue);
Future<Long> removeAsync(Object key, Object value);
Future<V> putIfAbsentAsync(K key, V value);
}

Loading…
Cancel
Save