MapWriter (write-trough) support for RMap added. #927

pull/968/head
Nikita 8 years ago
parent 2da21fb4d8
commit 369e30640c

@ -195,8 +195,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private static final RedisCommand<Set<Object>> ALL_KEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>(), ValueType.MAP_KEY);
private static final RedisCommand<Set<Entry<Object, Object>>> ALL_ENTRIES = new RedisCommand<Set<Entry<Object, Object>>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Map<Object, Object>> ALL_MAP = new RedisCommand<Map<Object, Object>>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10);
private byte[] instanceId;
@ -416,21 +414,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<V> putAsync(K key, V value) {
if (key == null) {
throw new NullPointerException();
}
if (value == null) {
throw new NullPointerException();
}
public RFuture<V> putOperationAsync(K key, V value) {
byte[] mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "if ARGV[4] == '1' then "
@ -447,14 +438,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<Boolean> fastPutAsync(K key, V value) {
if (key == null) {
throw new NullPointerException();
}
if (value == null) {
throw new NullPointerException();
}
protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
byte[] encodedKey = encodeMapKey(key);
byte[] encodedValue = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(encodedKey);
@ -489,17 +473,13 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<V> removeAsync(K key) {
if (key == null) {
throw new NullPointerException();
}
public RFuture<V> removeOperationAsync(K key) {
byte[] keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
cache.remove(cacheKey);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then "
+ "if ARGV[3] == '1' then "
@ -516,10 +496,87 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<Long> fastRemoveAsync(K ... keys) {
if (keys == null) {
throw new NullPointerException();
protected RFuture<List<Long>> fastRemoveOperationBatchAsync(@SuppressWarnings("unchecked") K... keys) {
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for j = 1, #ARGV, 2 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[j]);"
+ "if val == 1 then "
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "end;"
+ "table.insert(result, val);"
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
params.toArray());
}
if (invalidateEntryOnChange == 2) {
List<Object> params = new ArrayList<Object>(keys.length*3);
params.add(System.currentTimeMillis());
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
params.add(entryId);
}
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for j = 2, #ARGV, 3 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[j]);"
+ "if val == 1 then "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[j+2]);"
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "end;"
+ "table.insert(result, val);"
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
params.toArray());
}
List<Object> params = new ArrayList<Object>(keys.length);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
params.add(keyEncoded);
CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
}
RFuture<List<Long>> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for i = 1, #ARGV, 1 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[i]); "
+ "table.insert(result, val); "
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName()),
params.toArray());
return future;
}
@Override
protected RFuture<Long> fastRemoveOperationAsync(K ... keys) {
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
@ -892,11 +949,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return newSucceededFuture(null);
}
public RFuture<Void> putAllOperationAsync(final Map<? extends K, ? extends V> map) {
List<Object> params = new ArrayList<Object>(map.size()*3);
params.add(invalidateEntryOnChange);
params.add(map.size()*2);
@ -952,7 +1005,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<V> addAndGetAsync(final K key, Number value) {
public RFuture<V> addAndGetOperationAsync(final K key, Number value) {
final byte[] keyState = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyState);
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
@ -1151,14 +1204,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<V> replaceAsync(final K key, final V value) {
protected RFuture<V> replaceOperationAsync(K key, V value) {
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<V> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
@ -1177,7 +1230,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@Override
public RFuture<V> replaceAsync(final K key, final V value) {
final byte[] keyState = encodeMapKey(key);
final CacheKey cacheKey = toCacheKey(keyState);
RFuture<V> future = super.replaceAsync(key, value);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
@ -1186,7 +1246,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
if (future.getNow() != null) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
@ -1194,9 +1253,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
}
@Override
public RFuture<Boolean> replaceAsync(final K key, V oldValue, final V newValue) {
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
final byte[] keyState = encodeMapKey(key);
byte[] oldValueState = encodeMapValue(oldValue);
byte[] newValueState = encodeMapValue(newValue);
@ -1204,7 +1263,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "if ARGV[4] == '1' then "
@ -1220,7 +1279,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@Override
public RFuture<Boolean> replaceAsync(final K key, V oldValue, final V newValue) {
final byte[] keyState = encodeMapKey(key);
final CacheKey cacheKey = toCacheKey(keyState);
RFuture<Boolean> future = super.replaceAsync(key, oldValue, newValue);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
@ -1238,14 +1304,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
@ -1260,6 +1326,13 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
final byte[] keyState = encodeMapKey(key);
final CacheKey cacheKey = toCacheKey(keyState);
RFuture<Boolean> future = super.removeAsync(key, value);
future.addListener(new FutureListener<Boolean>() {
@Override
@ -1276,6 +1349,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
}
@Override
public RFuture<V> putIfAbsentAsync(final K key, final V value) {
RFuture<V> future = super.putIfAbsentAsync(key, value);

@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
@ -40,6 +41,7 @@ import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
@ -70,15 +72,9 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
static final RedisCommand<Boolean> EVAL_REMOVE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP);
static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
final RedissonClient redisson;
private MapLoader<K, V> mapLoader;
private MapWriter<K, V> mapWriter;
MapLoader<K, V> mapLoader;
MapWriter<K, V> mapWriter;
protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
super(commandExecutor, name);
@ -236,10 +232,6 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return future;
}
private RFuture<Void> externalPutAllAsync(Map<K, V> entries) {
return putAllAsync(entries);
}
@Override
public V get(Object key) {
return get(getAsync((K)key));
@ -261,19 +253,48 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) {
public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return newSucceededFuture(null);
}
RFuture<Void> future = putAllOperationAsync(map);
if (mapWriter == null) {
return future;
}
RPromise<Void> result = new MapWriterPromise<Void>(future, commandExecutor) {
@Override
public void execute(Future<Void> future, ExecutorService executorService) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.writeAll((Map<K, V>) map);
trySuccess(null);
}
});
}
};
return result;
}
protected RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> map) {
List<Object> params = new ArrayList<Object>(map.size()*2 + 1);
params.add(getName());
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
params.add(t.getKey());
params.add(t.getValue());
if (t.getKey() == null) {
throw new NullPointerException("map key can't be null");
}
if (t.getValue() == null) {
throw new NullPointerException("map value can't be null");
}
params.add(encodeMapKey(t.getKey()));
params.add(encodeMapValue(t.getValue()));
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.HMSET, params.toArray());
RFuture<Void> future = commandExecutor.writeAsync(getName(), codec, RedisCommands.HMSET, params.toArray());
return future;
}
@Override
@ -343,17 +364,43 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<V> putIfAbsentAsync(K key, V value) {
public RFuture<V> putIfAbsentAsync(final K key, final V value) {
checkKey(key);
checkValue(key);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT,
RFuture<V> future = putIfAbsentOperationAsync(key, value);
if (mapWriter == null) {
return future;
}
RPromise<V> result = new MapWriterPromise<V>(future, commandExecutor) {
@Override
public void execute(final Future<V> future, ExecutorService executorService) {
if (future.getNow() == null) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
};
return result;
}
protected RFuture<V> putIfAbsentOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if redis.call('hsetnx', KEYS[1], ARGV[1], ARGV[2]) == 1 then "
+ "return nil "
+ "else "
+ "return redis.call('hget', KEYS[1], ARGV[1]) "
+ "end",
Collections.<Object>singletonList(getName(key)), key, value);
Collections.<Object>singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value));
}
@Override
@ -362,11 +409,37 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value) {
public RFuture<Boolean> fastPutIfAbsentAsync(final K key, final V value) {
checkKey(key);
checkValue(value);
return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSETNX, getName(key), key, value);
RFuture<Boolean> future = fastPutIfAbsentOperationAsync(key, value);
if (mapWriter == null) {
return future;
}
RPromise<Boolean> result = new MapWriterPromise<Boolean>(future, commandExecutor) {
@Override
public void execute(final Future<Boolean> future, ExecutorService executorService) {
if (future.getNow()) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
};
return result;
}
protected RFuture<Boolean> fastPutIfAbsentOperationAsync(K key, V value) {
return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSETNX, getName(key), encodeMapKey(key), encodeMapValue(value));
}
@Override
@ -375,17 +448,44 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
public RFuture<Boolean> removeAsync(final Object key, Object value) {
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE_VALUE,
RFuture<Boolean> future = removeOperationAsync(key, value);
if (mapWriter == null) {
return future;
}
RPromise<Boolean> result = new MapWriterPromise<Boolean>(future, commandExecutor) {
@Override
public void execute(final Future<Boolean> future, ExecutorService executorService) {
if (future.getNow()) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.delete((K) key);
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
};
return result;
}
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "
+ "end",
Collections.<Object>singletonList(getName(key)), key, value);
Collections.<Object>singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value));
return future;
}
protected void checkValue(Object value) {
@ -400,7 +500,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<Boolean> replaceAsync(K key, V oldValue, V newValue) {
public RFuture<Boolean> replaceAsync(final K key, V oldValue, final V newValue) {
checkKey(key);
if (oldValue == null) {
throw new NullPointerException("map oldValue can't be null");
@ -409,15 +509,40 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
throw new NullPointerException("map newValue can't be null");
}
RFuture<Boolean> future = replaceOperationAsync(key, oldValue, newValue);
if (mapWriter == null) {
return future;
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE_VALUE,
RPromise<Boolean> result = new MapWriterPromise<Boolean>(future, commandExecutor) {
@Override
public void execute(final Future<Boolean> future, ExecutorService executorService) {
if (future.getNow()) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, newValue);
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
};
return result;
}
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Collections.<Object>singletonList(getName(key)), key, oldValue, newValue);
Collections.<Object>singletonList(getName(key)), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue));
}
@Override
@ -426,11 +551,37 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<V> replaceAsync(K key, V value) {
public RFuture<V> replaceAsync(final K key, final V value) {
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE,
RFuture<V> future = replaceOperationAsync(key, value);
if (mapWriter == null) {
return future;
}
RPromise<V> result = new MapWriterPromise<V>(future, commandExecutor) {
@Override
public void execute(final Future<V> future, ExecutorService executorService) {
if (future.getNow() != null) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
};
return result;
}
protected RFuture<V> replaceOperationAsync(final K key, final V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
@ -438,10 +589,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
+ "else "
+ "return nil; "
+ "end",
Collections.<Object>singletonList(getName(key)), key, value);
Collections.<Object>singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value));
}
protected RFuture<V> getValueAsync(K key) {
protected RFuture<V> getOperationAsync(K key) {
return commandExecutor.readAsync(getName(key), codec, RedisCommands.HGET, getName(key), key);
}
@ -449,7 +600,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public RFuture<V> getAsync(final K key) {
checkKey(key);
RFuture<V> future = getValueAsync(key);
RFuture<V> future = getOperationAsync(key);
if (mapLoader == null) {
return future;
}
@ -617,34 +768,100 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<V> putAsync(K key, V value) {
public RFuture<V> putAsync(final K key, final V value) {
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT,
RFuture<V> future = putOperationAsync(key, value);
if (mapWriter == null) {
return future;
}
RPromise<V> result = new MapWriterPromise<V>(future, commandExecutor) {
@Override
public void execute(final Future<V> future, ExecutorService executorService) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
}
};
return result;
}
protected RFuture<V> putOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v",
Collections.<Object>singletonList(getName(key)), key, value);
Collections.<Object>singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value));
}
@Override
public RFuture<V> removeAsync(K key) {
public RFuture<V> removeAsync(final K key) {
checkKey(key);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE,
RFuture<V> future = removeOperationAsync(key);
if (mapWriter == null) {
return future;
}
RPromise<V> result = new MapWriterPromise<V>(future, commandExecutor) {
@Override
public void execute(final Future<V> future, ExecutorService executorService) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.delete(key);
trySuccess(future.getNow());
}
});
}
};
return result;
}
protected RFuture<V> removeOperationAsync(K key) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "return v",
Collections.<Object>singletonList(getName(key)), key);
Collections.<Object>singletonList(getName(key)), encodeMapKey(key));
}
@Override
public RFuture<Boolean> fastPutAsync(K key, V value) {
public RFuture<Boolean> fastPutAsync(final K key, final V value) {
checkKey(key);
checkValue(value);
RFuture<Boolean> future = fastPutOperationAsync(key, value);
if (mapWriter == null) {
return future;
}
RPromise<Boolean> result = new MapWriterPromise<Boolean>(future, commandExecutor) {
@Override
public void execute(final Future<Boolean> future, ExecutorService executorService) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
}
};
return result;
}
protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSET, getName(key), key, value);
}
@ -654,14 +871,72 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<Long> fastRemoveAsync(K ... keys) {
if (keys == null || keys.length == 0) {
public RFuture<Long> fastRemoveAsync(final K ... keys) {
if (keys == null) {
throw new NullPointerException();
}
if (keys.length == 0) {
return newSucceededFuture(0L);
}
if (mapWriter == null) {
return fastRemoveOperationAsync(keys);
}
RFuture<List<Long>> future = fastRemoveOperationBatchAsync(keys);
final RPromise<Long> result = new RedissonPromise<Long>();
future.addListener(new FutureListener<List<Long>>() {
@Override
public void operationComplete(Future<List<Long>> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
final List<K> deletedKeys = new ArrayList<K>();
for (int i = 0; i < future.getNow().size(); i++) {
if (future.getNow().get(i) == 1) {
deletedKeys.add(keys[i]);
}
}
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
mapWriter.deleteAll(deletedKeys);
result.trySuccess((long)deletedKeys.size());
}
});
}
});
return result;
}
protected RFuture<List<Long>> fastRemoveOperationBatchAsync(final K... keys) {
List<Object> args = new ArrayList<Object>(keys.length);
for (K key : keys) {
args.add(encodeMapKey(key));
}
RFuture<List<Long>> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for i = 1, #ARGV, 1 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[i]); "
+ "table.insert(result, val); "
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName()),
args.toArray());
return future;
}
protected RFuture<Long> fastRemoveOperationAsync(K... keys) {
List<Object> args = new ArrayList<Object>(keys.length + 1);
args.add(getName());
args.addAll(Arrays.asList(keys));
for (K key : keys) {
args.add(encodeMapKey(key));
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.HDEL, args.toArray());
}
@ -682,14 +957,37 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public RFuture<V> addAndGetAsync(K key, Number value) {
public RFuture<V> addAndGetAsync(final K key, Number value) {
checkKey(key);
checkValue(value);
RFuture<V> future = addAndGetOperationAsync(key, value);
if (mapWriter == null) {
return future;
}
RPromise<V> result = new MapWriterPromise<V>(future, commandExecutor) {
@Override
public void execute(final Future<V> future, ExecutorService executorService) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, future.getNow());
trySuccess(future.getNow());
}
});
}
};
return result;
}
protected RFuture<V> addAndGetOperationAsync(K key, Number value) {
byte[] keyState = encodeMapKey(key);
return commandExecutor.writeAsync(getName(key), StringCodec.INSTANCE,
RFuture<V> future = commandExecutor.writeAsync(getName(key), StringCodec.INSTANCE,
new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
getName(key), keyState, new BigDecimal(value.toString()).toPlainString());
return future;
}
@Override
@ -827,7 +1125,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return;
}
getValueAsync(key).addListener(new FutureListener<V>() {
getOperationAsync(key).addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> valueFuture) throws Exception {
if (!valueFuture.isSuccess()) {

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@ -24,6 +23,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
@ -47,7 +47,6 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.MapCacheScanResult;
@ -60,6 +59,7 @@ import org.redisson.codec.MapCacheEventCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -85,16 +85,6 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> {
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 7, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Void> EVAL_HMSET = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 12, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_PUT_TTL_IF_ABSENT = new RedisCommand<Object>("EVAL", 11, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_GET_TTL = new RedisCommand<Object>("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE);
static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 7, ValueType.MAP_KEY);
static final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Object> EVAL_PUT_IF_ABSENT = new RedisCommand<Object>("EVAL", 5, ValueType.MAP, ValueType.MAP_VALUE);
public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor,
String name, RedissonClient redisson, MapLoader<K, V> mapLoader, MapWriter<K, V> mapWriter) {
super(commandExecutor, name, redisson, mapLoader, mapWriter);
@ -239,7 +229,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
public RFuture<V> putIfAbsentAsync(final K key, final V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
checkKey(key);
checkValue(value);
@ -273,7 +263,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_TTL_IF_ABSENT,
RFuture<V> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local insertable = false; "
+ "local value = redis.call('hget', KEYS[1], ARGV[5]); "
+ "if value == false then "
@ -325,14 +315,34 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return val;"
+ "end; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
if (mapWriter == null) {
return future;
}
RPromise<V> result = new MapWriterPromise<V>(future, commandExecutor) {
@Override
public void execute(final Future<V> future, ExecutorService executorService) {
if (future.getNow() == null) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
};
return result;
}
@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
checkKey(key);
checkValue(value);
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if value == false then "
@ -354,10 +364,10 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
protected RFuture<V> getValueAsync(K key) {
protected RFuture<V> getOperationAsync(K key) {
checkKey(key);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_GET_TTL,
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
+ "return nil; "
@ -383,7 +393,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return nil; "
+ "end; "
+ "return val; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), key);
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)),
System.currentTimeMillis(), encodeMapKey(key));
}
@Override
@ -392,11 +403,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<V> putAsync(K key, V value) {
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT,
protected RFuture<V> putOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "redis.call('hset', KEYS[1], ARGV[1], value); "
@ -410,15 +418,12 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('publish', KEYS[3], msg); "
+ "return val; ",
Arrays.<Object>asList(getName(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
key, value);
encodeMapKey(key), encodeMapValue(value));
}
@Override
public RFuture<V> putIfAbsentAsync(K key, V value) {
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_IF_ABSENT,
public RFuture<V> putIfAbsentOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "if redis.call('hsetnx', KEYS[1], ARGV[1], value) == 1 then "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2]); "
@ -433,7 +438,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return val; "
+ "end",
Arrays.<Object>asList(getName(key), getCreatedChannelNameByKey(key)),
key, value);
encodeMapKey(key), encodeMapValue(value));
}
@Override
@ -442,19 +447,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<V> addAndGetAsync(K key, Number value) {
checkKey(key);
checkValue(value);
public RFuture<V> addAndGetOperationAsync(K key, Number value) {
byte[] keyState = encodeMapKey(key);
byte[] valueState;
try {
valueState = StringCodec.INSTANCE
.getMapValueEncoder()
.encode(new BigDecimal(value.toString()).toPlainString());
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
return commandExecutor.evalWriteAsync(getName(key), StringCodec.INSTANCE,
new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
@ -494,7 +488,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "redis.call('hset', KEYS[1], ARGV[2], newValuePack); "
+ "return tostring(newValue); ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), keyState, valueState);
System.currentTimeMillis(), keyState, new BigDecimal(value.toString()).toPlainString());
}
@Override
@ -513,7 +507,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<Boolean> fastPutAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
public RFuture<Boolean> fastPutAsync(final K key, final V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
checkKey(key);
checkValue(value);
@ -547,7 +541,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
}
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local insertable = false; "
+ "local value = redis.call('hget', KEYS[1], ARGV[5]); "
+ "local t, val;"
@ -594,6 +588,24 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end;",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
if (mapWriter == null) {
return future;
}
RPromise<Boolean> result = new MapWriterPromise<Boolean>(future, commandExecutor) {
@Override
public void execute(final Future<Boolean> future, ExecutorService executorService) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
}
};
return result;
}
@Override
@ -607,7 +619,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<V> putAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
public RFuture<V> putAsync(final K key, final V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
checkKey(key);
checkValue(value);
@ -641,7 +653,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_TTL,
RFuture<V> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local insertable = false; "
+ "local v = redis.call('hget', KEYS[1], ARGV[5]); "
+ "if v == false then "
@ -691,7 +703,26 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return val",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value);
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
if (mapWriter == null) {
return future;
}
RPromise<V> result = new MapWriterPromise<V>(future, commandExecutor) {
@Override
public void execute(final Future<V> future, ExecutorService executorService) {
executorService.execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
}
};
return result;
}
String getTimeoutSetNameByKey(Object key) {
@ -768,10 +799,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<V> removeAsync(K key) {
checkKey(key);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE,
public RFuture<V> removeOperationAsync(K key) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('zrem', KEYS[2], ARGV[1]); "
+ "redis.call('zrem', KEYS[3], ARGV[1]); "
@ -784,16 +813,47 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; "
+ "return v",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelNameByKey(key)),
key);
encodeMapKey(key));
}
@Override
public RFuture<Long> fastRemoveAsync(K ... keys) {
if (keys == null || keys.length == 0) {
return newSucceededFuture(0L);
protected RFuture<List<Long>> fastRemoveOperationBatchAsync(K... keys) {
List<Object> args = new ArrayList<Object>(keys.length);
for (K key : keys) {
args.add(encodeMapKey(key));
}
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE,
RFuture<List<Long>> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"redis.call('zrem', KEYS[3], unpack(ARGV)); " +
"redis.call('zrem', KEYS[2], unpack(ARGV)); " +
"for i, key in ipairs(ARGV) do "
+ "local v = redis.call('hget', KEYS[1], key); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end;" +
"end;" +
"local result = {}; " +
"for i = 1, #ARGV, 1 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[i]); "
+ "table.insert(result, val); "
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName(), getRemovedChannelName()),
args.toArray());
return future;
}
@Override
protected RFuture<Long> fastRemoveOperationAsync(K ... keys) {
List<Object> params = new ArrayList<Object>(keys.length);
for (K key : keys) {
params.add(encodeMapKey(key));
}
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"redis.call('zrem', KEYS[3], unpack(ARGV)); " +
"redis.call('zrem', KEYS[2], unpack(ARGV)); " +
"for i, key in ipairs(ARGV) do "
@ -806,7 +866,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
"end;" +
"return redis.call('hdel', KEYS[1], unpack(ARGV)); ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName(), getRemovedChannelName()),
keys);
params.toArray());
}
@Override
@ -896,10 +956,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<Boolean> fastPutAsync(K key, V value) {
checkKey(key);
checkValue(value);
protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local insertable = false; "
+ "local v = redis.call('hget', KEYS[1], ARGV[2]); "
@ -941,10 +998,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value) {
checkKey(key);
checkValue(value);
protected RFuture<Boolean> fastPutIfAbsentOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
@ -998,7 +1052,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
public RFuture<Boolean> fastPutIfAbsentAsync(final K key, final V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
checkKey(key);
checkValue(value);
@ -1032,7 +1086,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta;
}
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local insertable = false; "
+ "local value = redis.call('hget', KEYS[1], ARGV[5]); "
+ "if value == false then "
@ -1085,18 +1139,32 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "end; ",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)),
System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value));
if (mapWriter == null) {
return future;
}
RPromise<Boolean> result = new MapWriterPromise<Boolean>(future, commandExecutor) {
@Override
public void execute(final Future<Boolean> future, ExecutorService executorService) {
if (future.getNow()) {
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
mapWriter.write(key, value);
trySuccess(future.getNow());
}
});
} else {
trySuccess(future.getNow());
}
}
};
return result;
}
@Override
public RFuture<Boolean> replaceAsync(K key, V oldValue, V newValue) {
checkKey(key);
if (oldValue == null) {
throw new NullPointerException("map old value can't be null");
}
if (newValue == null) {
throw new NullPointerException("map new value can't be null");
}
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN,
"local v = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if v == false then "
@ -1134,11 +1202,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public RFuture<V> replaceAsync(K key, V value) {
checkKey(key);
checkValue(value);
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE,
public RFuture<V> replaceOperationAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if v ~= false then "
+ "local t, val = struct.unpack('dLc0', v); "
@ -1156,15 +1221,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
+ "return nil; "
+ "end",
Arrays.<Object>asList(getName(key), getTimeoutSetNameByKey(key), getUpdatedChannelNameByKey(key)),
System.currentTimeMillis(), key, value);
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@Override
public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return newSucceededFuture(null);
}
public RFuture<Void> putAllOperationAsync(Map<? extends K, ? extends V> map) {
List<Object> params = new ArrayList<Object>(map.size()*2);
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
if (t.getKey() == null) {
@ -1174,11 +1235,11 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
throw new NullPointerException("map value can't be null");
}
params.add(t.getKey());
params.add(t.getValue());
params.add(encodeMapKey(t.getKey()));
params.add(encodeMapValue(t.getValue()));
}
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HMSET,
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"for i, value in ipairs(ARGV) do "
+ "if i % 2 == 0 then "
+ "local val = struct.pack('dLc0', 0, string.len(value), value); "

@ -25,6 +25,8 @@ import org.redisson.api.map.MapWriter;
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public class LocalCachedMapOptions<K, V> {
@ -120,6 +122,9 @@ public class LocalCachedMapOptions<K, V> {
* .invalidateEntryOnChange(true);
* </pre>
*
* @param <K> key type
* @param <V> value type
*
* @return LocalCachedMapOptions instance
*
*/

@ -21,6 +21,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.api.mapreduce.RMapReduce;
/**
@ -56,7 +57,7 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>
* If map doesn't contain value for specified key and {@link MapLoader} is defined
* then value will be loaded in read-through mode.
*
@ -67,6 +68,33 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
@Override
V get(Object key);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return previous associated value
*/
@Override
V put(K key, V value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>null</code> if key is a new one in the hash and value was set.
* Previous value if key already exists in the hash and change hasn't been made.
*/
@Override
V putIfAbsent(K key, V value);
/**
* Returns <code>RMapReduce</code> object associated with this map
*
@ -112,13 +140,75 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
*/
V addAndGet(K key, Number delta);
/**
* Removes <code>key</code> from map and returns associated value in async manner.
* <p>
* If {@link MapWriter} is defined then <code>key</code>is deleted in write-through mode.
*
* @param key - map key
* @return deleted value, <code>null</code> if there wasn't any association
*/
@Override
V remove(Object key);
/**
* Replaces previous value with a new <code>value</code> associated with the <code>key</code>.
* If there wasn't any association before then method returns <code>null</code>.
* <p>
* If {@link MapWriter} is defined then new <code>value</code>is written in write-through mode.
*
* @param key - map key
* @param value - map value
* @return previous associated value
* or <code>null</code> if there wasn't any association and change hasn't been made
*/
@Override
V replace(K key, V value);
/**
* Replaces previous <code>oldValue</code> with a <code>newValue</code> associated with the <code>key</code>.
* If previous value doesn't exist or equal to <code>oldValue</code> then method returns <code>false</code>.
* <p>
* If {@link MapWriter} is defined then <code>newValue</code>is written in write-through mode.
*
* @param key - map key
* @param oldValue - map old value
* @param newValue - map new value
* @return <code>true</code> if value has been replaced otherwise <code>false</code>.
*/
@Override
boolean replace(K key, V oldValue, V newValue);
/**
* Removes <code>key</code> from map only if it associated with <code>value</code>.
* <p>
* If {@link MapWriter} is defined then <code>key</code>is deleted in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if map entry has been replaced otherwise <code>false</code>.
*/
@Override
boolean remove(Object key, Object value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
* <p>
* If {@link MapWriter} is defined then new map entries will be stored in write-through mode.
*
* @param map mappings to be stored in this map
*/
@Override
void putAll(java.util.Map<? extends K, ? extends V> map);
/**
* Gets a map slice contained the mappings with defined <code>keys</code>
* by one operation.
*
* <p>
* If map doesn't contain value/values for specified key/keys and {@link MapLoader} is defined
* then value/values will be loaded in read-through mode.
*
* <p>
* The returned map is <b>NOT</b> backed by the original map.
*
* @param keys - map keys
@ -128,9 +218,11 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
/**
* Removes <code>keys</code> from map by one operation
*
* Works faster than <code>RMap.remove</code> but not returning
* <p>
* Works faster than <code>{@link RMap#remove(Object)}</code> but not returning
* the value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then <code>keys</code>are deleted in write-through mode.
*
* @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
@ -139,9 +231,11 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
/**
* Associates the specified <code>value</code> with the specified <code>key</code>.
*
* Works faster than <code>RMap.put</code> but not returning
* <p>
* Works faster than <code>{@link RMap#put(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
@ -150,6 +244,20 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
*/
boolean fastPut(K key, V value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* Works faster than <code>{@link RMap#putIfAbsent(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key is a new one in the hash and value was set.
* <code>false</code> if key already exists in the hash and change hasn't been made.
*/
boolean fastPutIfAbsent(K key, V value);
/**

@ -20,6 +20,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import java.util.Set;
@ -61,10 +62,10 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
/**
* Gets a map slice contained the mappings with defined <code>keys</code>
* by one operation.
*
* <p>
* If map doesn't contain value/values for specified key/keys and {@link MapLoader} is defined
* then value/values will be loaded in read-through mode.
*
* <p>
* The returned map is <b>NOT</b> backed by the original map.
*
* @param keys - map keys
@ -72,6 +73,15 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
*/
RFuture<Map<K, V>> getAllAsync(Set<K> keys);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch.
* <p>
* If {@link MapWriter} is defined then new map entries are stored in write-through mode.
*
* @param map mappings to be stored in this map
* @return void
*/
RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map);
RFuture<V> addAndGetAsync(K key, Number value);
@ -83,10 +93,12 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
RFuture<Integer> sizeAsync();
/**
* Removes <code>keys</code> from map by one operation in async manner
*
* Works faster than <code>RMap.removeAsync</code> but doesn't return
* the value associated with <code>key</code>
* Removes <code>keys</code> from map by one operation in async manner.
* <p>
* Works faster than <code>{@link RMap#removeAsync(Object, Object)}</code> but doesn't return
* the value associated with <code>key</code>.
* <p>
* If {@link MapWriter} is defined then <code>keys</code>are deleted in write-through mode.
*
* @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
@ -96,17 +108,33 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
*
* Works faster than <code>RMap.putAsync</code> but not returning
* <p>
* Works faster than <code>{@link RMap#putAsync(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key is a new key in the hash and value was set.
* @return <code>true</code> if key is a new one in the hash and value was set.
* <code>false</code> if key already exists in the hash and the value was updated.
*/
RFuture<Boolean> fastPutAsync(K key, V value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* Works faster than <code>{@link RMap#putIfAbsentAsync(Object, Object)}</code> but not returning
* the previous value associated with <code>key</code>
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if key is a new one in the hash and value was set.
* <code>false</code> if key already exists in the hash and change hasn't been made.
*/
RFuture<Boolean> fastPutIfAbsentAsync(K key, V value);
/**
@ -140,7 +168,7 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>
* If map doesn't contain value for specified key and {@link MapLoader} is defined
* then value will be loaded in read-through mode.
*
@ -150,16 +178,76 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
*/
RFuture<V> getAsync(K key);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in async manner.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return previous associated value
*/
RFuture<V> putAsync(K key, V value);
/**
* Removes <code>key</code> from map and returns associated value in async manner.
* <p>
* If {@link MapWriter} is defined then <code>key</code>is deleted in write-through mode.
*
* @param key - map key
* @return deleted value or <code>null</code> if there wasn't any association
*/
RFuture<V> removeAsync(K key);
/**
* Replaces previous value with a new <code>value</code> associated with the <code>key</code>.
* If there wasn't any association before then method returns <code>null</code>.
* <p>
* If {@link MapWriter} is defined then new <code>value</code>is written in write-through mode.
*
* @param key - map key
* @param value - map value
* @return previous associated value
* or <code>null</code> if there wasn't any association and change hasn't been made
*/
RFuture<V> replaceAsync(K key, V value);
/**
* Replaces previous <code>oldValue</code> with a <code>newValue</code> associated with the <code>key</code>.
* If previous value doesn't exist or equal to <code>oldValue</code> then method returns <code>false</code>.
* <p>
* If {@link MapWriter} is defined then <code>newValue</code>is written in write-through mode.
*
* @param key - map key
* @param oldValue - map old value
* @param newValue - map new value
* @return <code>true</code> if value has been replaced otherwise <code>false</code>.
*/
RFuture<Boolean> replaceAsync(K key, V oldValue, V newValue);
/**
* Removes <code>key</code> from map only if it associated with <code>value</code>.
* <p>
* If {@link MapWriter} is defined then <code>key</code>is deleted in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if map entry has been replaced otherwise <code>false</code>.
*/
RFuture<Boolean> removeAsync(Object key, Object value);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* only if there is no any association with specified<code>key</code>.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>null</code> if key is a new one in the hash and value was set.
* Previous value if key already exists in the hash and change hasn't been made.
*/
RFuture<V> putIfAbsentAsync(K key, V value);
}

@ -19,11 +19,12 @@ import java.util.Collection;
import java.util.Map;
/**
* Map writer used for write-through operations.
*
* @author Nikita Koksharov
*
* @param <K>
* @param <V>
* @param <K> key type
* @param <V> value type
*/
public interface MapWriter<K, V> {

@ -252,7 +252,7 @@ public interface RedisCommands {
RedisStrictCommand<Void> MSET = new RedisStrictCommand<Void>("MSET", new VoidReplayConvertor());
RedisStrictCommand<Boolean> MSETNX = new RedisStrictCommand<Boolean>("MSETNX", new BooleanReplayConvertor());
RedisCommand<Boolean> HSETNX = new RedisCommand<Boolean>("HSETNX", new BooleanReplayConvertor(), 2, ValueType.MAP);
RedisCommand<Boolean> HSETNX = new RedisCommand<Boolean>("HSETNX", new BooleanReplayConvertor());
RedisCommand<Boolean> HSET = new RedisCommand<Boolean>("HSET", new BooleanReplayConvertor(), 2, ValueType.MAP);
RedisCommand<MapScanResult<Object, Object>> HSCAN = new RedisCommand<MapScanResult<Object, Object>>("HSCAN", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP);
@ -263,10 +263,10 @@ public interface RedisCommands {
RedisCommand<Integer> HSTRLEN = new RedisCommand<Integer>("HSTRLEN", new IntegerReplayConvertor(), 2, ValueType.MAP_KEY);
RedisStrictCommand<Long> HLEN_LONG = new RedisStrictCommand<Long>("HLEN");
RedisCommand<Set<Object>> HKEYS = new RedisCommand<Set<Object>>("HKEYS", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
RedisCommand<Void> HMSET = new RedisCommand<Void>("HMSET", new VoidReplayConvertor(), 2, ValueType.MAP);
RedisCommand<Void> HMSET = new RedisCommand<Void>("HMSET", new VoidReplayConvertor());
RedisCommand<List<Object>> HMGET = new RedisCommand<List<Object>>("HMGET", new ObjectListReplayDecoder<Object>(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Object> HGET = new RedisCommand<Object>("HGET", 2, ValueType.MAP_KEY, ValueType.MAP_VALUE);
RedisCommand<Long> HDEL = new RedisStrictCommand<Long>("HDEL", 2, ValueType.MAP_KEY);
RedisCommand<Long> HDEL = new RedisStrictCommand<Long>("HDEL");
RedisStrictCommand<Long> DEL = new RedisStrictCommand<Long>("DEL");
RedisStrictCommand<Long> DBSIZE = new RedisStrictCommand<Long>("DBSIZE");

@ -63,9 +63,6 @@ import reactor.rx.Streams;
*/
public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive implements RMapCacheReactive<K, V>, MapReactive<K, V> {
private static final RedisCommand<MapScanResult<Object, Object>> EVAL_HSCAN =
new RedisCommand<MapScanResult<Object, Object>>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
private final RMapCache<K, V> mapCache;
public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) {

@ -3,6 +3,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -10,14 +11,196 @@ import java.util.Map;
import org.junit.Test;
import org.redisson.api.RMap;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
public abstract class BaseMapTest extends BaseTest {
protected abstract <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map);
protected abstract <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map);
@Test
public void testWriterAddAndGet() {
Map<String, Integer> store = new HashMap<>();
RMap<String, Integer> map = getWriterTestMap("test", store);
assertThat(map.addAndGet("1", 11)).isEqualTo(11);
assertThat(map.addAndGet("1", 7)).isEqualTo(18);
Map<String, Integer> expected = new HashMap<>();
expected.put("1", 18);
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterFastRemove() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.put("1", "11");
map.put("2", "22");
map.put("3", "33");
map.fastRemove("1", "2", "4");
Map<String, String> expected = new HashMap<>();
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterFastPut() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.fastPut("1", "11");
map.fastPut("2", "22");
map.fastPut("3", "33");
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterRemove() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.put("1", "11");
map.remove("1");
map.put("3", "33");
Map<String, String> expected = new HashMap<>();
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterReplaceKeyValue() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.put("1", "11");
map.replace("1", "00");
map.replace("2", "22");
map.put("3", "33");
Map<String, String> expected = new HashMap<>();
expected.put("1", "00");
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterReplaceKeyOldNewValue() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.put("1", "11");
map.replace("1", "11", "00");
map.put("3", "33");
Map<String, String> expected = new HashMap<>();
expected.put("1", "00");
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterRemoveKeyValue() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.put("1", "11");
map.put("2", "22");
map.put("3", "33");
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
map.remove("1", "11");
Map<String, String> expected2 = new HashMap<>();
expected2.put("2", "22");
expected2.put("3", "33");
assertThat(store).isEqualTo(expected2);
}
@Test
public void testWriterFastPutIfAbsent() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.fastPutIfAbsent("1", "11");
map.fastPutIfAbsent("1", "00");
map.fastPutIfAbsent("2", "22");
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterPutIfAbsent() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.putIfAbsent("1", "11");
map.putIfAbsent("1", "00");
map.putIfAbsent("2", "22");
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterPutAll() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
Map<String, String> newMap = new HashMap<>();
newMap.put("1", "11");
newMap.put("2", "22");
newMap.put("3", "33");
map.putAll(newMap);
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterPut() {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriterTestMap("test", store);
map.put("1", "11");
map.put("2", "22");
map.put("3", "33");
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testLoadAllReplaceValues() {
Map<String, String> cache = new HashMap<String, String>();
Map<String, String> cache = new HashMap<>();
for (int i = 0; i < 10; i++) {
cache.put("" + i, "" + i + "" + i);
}
@ -58,6 +241,34 @@ public abstract class BaseMapTest extends BaseTest {
}
}
protected <K, V> MapWriter<K, V> createMapWriter(Map<K, V> map) {
return new MapWriter<K, V>() {
@Override
public void write(K key, V value) {
map.put(key, value);
}
@Override
public void writeAll(Map<K, V> values) {
map.putAll(values);
}
@Override
public void delete(K key) {
map.remove(key);
}
@Override
public void deleteAll(Collection<K> keys) {
for (K key : keys) {
map.remove(key);
}
}
};
}
protected <K, V> MapLoader<K, V> createMapLoader(Map<K, V> map) {
return new MapLoader<K, V>() {
@Override

@ -48,6 +48,12 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
}
@Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().mapWriter(createMapWriter(map));
return redisson.getLocalCachedMap("test", options);
}
@Override
protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().mapLoader(createMapLoader(map));

@ -137,11 +137,78 @@ public class RedissonMapCacheTest extends BaseMapTest {
}
@Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
return redisson.getMapCache("test", null, createMapWriter(map));
}
@Override
protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {
return redisson.getMapCache("test", createMapLoader(map), null);
}
@Test
public void testWriterPutIfAbsent() {
Map<String, String> store = new HashMap<>();
RMapCache<String, String> map = (RMapCache<String, String>) getWriterTestMap("test", store);
map.putIfAbsent("1", "11", 10, TimeUnit.SECONDS);
map.putIfAbsent("1", "00", 10, TimeUnit.SECONDS);
map.putIfAbsent("2", "22", 10, TimeUnit.SECONDS);
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterPutTTL() {
Map<String, String> store = new HashMap<>();
RMapCache<String, String> map = (RMapCache<String, String>) getWriterTestMap("test", store);
map.put("1", "11", 10, TimeUnit.SECONDS);
map.put("2", "22", 10, TimeUnit.SECONDS);
map.put("3", "33", 10, TimeUnit.SECONDS);
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterFastPutIfAbsentTTL() {
Map<String, String> store = new HashMap<>();
RMapCache<String, String> map = (RMapCache<String, String>) getWriterTestMap("test", store);
map.fastPutIfAbsent("1", "11", 10, TimeUnit.SECONDS);
map.fastPutIfAbsent("1", "00", 10, TimeUnit.SECONDS);
map.fastPutIfAbsent("2", "22", 10, TimeUnit.SECONDS);
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
assertThat(store).isEqualTo(expected);
}
@Test
public void testWriterFastPutTTL() {
Map<String, String> store = new HashMap<>();
RMapCache<String, String> map = (RMapCache<String, String>) getWriterTestMap("test", store);
map.fastPut("1", "11", 10, TimeUnit.SECONDS);
map.fastPut("2", "22", 10, TimeUnit.SECONDS);
map.fastPut("3", "33", 10, TimeUnit.SECONDS);
Map<String, String> expected = new HashMap<>();
expected.put("1", "11");
expected.put("2", "22");
expected.put("3", "33");
assertThat(store).isEqualTo(expected);
}
@Test
public void testOrdering() {
Map<String, String> map = new LinkedHashMap<String, String>();

@ -134,6 +134,11 @@ public class RedissonMapTest extends BaseMapTest {
return redisson.getMap("test", createMapLoader(map), null);
}
@Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
return redisson.getMap("test", null, createMapWriter(map));
}
@Test
public void testAddAndGet() throws InterruptedException {
RMap<Integer, Integer> map = redisson.getMap("getAll");

Loading…
Cancel
Save