From 369e30640cc05d9a33a992bbab0fc77db6c7f012 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 20 Jun 2017 16:37:31 +0300 Subject: [PATCH] MapWriter (write-trough) support for RMap added. #927 --- .../org/redisson/RedissonLocalCachedMap.java | 158 +++++-- .../main/java/org/redisson/RedissonMap.java | 388 ++++++++++++++++-- .../java/org/redisson/RedissonMapCache.java | 251 ++++++----- .../redisson/api/LocalCachedMapOptions.java | 5 + .../src/main/java/org/redisson/api/RMap.java | 122 +++++- .../main/java/org/redisson/api/RMapAsync.java | 108 ++++- .../java/org/redisson/api/map/MapWriter.java | 5 +- .../client/protocol/RedisCommands.java | 6 +- .../reactive/RedissonMapCacheReactive.java | 3 - .../test/java/org/redisson/BaseMapTest.java | 213 +++++++++- .../redisson/RedissonLocalCachedMapTest.java | 6 + .../org/redisson/RedissonMapCacheTest.java | 67 +++ .../java/org/redisson/RedissonMapTest.java | 5 + 13 files changed, 1129 insertions(+), 208 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 6c5a95332..15430c2f6 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -195,8 +195,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private static final RedisCommand> ALL_KEYS = new RedisCommand>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); private static final RedisCommand>> ALL_ENTRIES = new RedisCommand>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP); private static final RedisCommand> ALL_MAP = new RedisCommand>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP); - private static final RedisCommand EVAL_PUT = new RedisCommand("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE); private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10); private byte[] instanceId; @@ -416,21 +414,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override - public RFuture putAsync(K key, V value) { - if (key == null) { - throw new NullPointerException(); - } - if (value == null) { - throw new NullPointerException(); - } - + public RFuture putOperationAsync(K key, V value) { byte[] mapKey = encodeMapKey(key); CacheKey cacheKey = toCacheKey(mapKey); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); CacheValue cacheValue = new CacheValue(key, value); cache.put(cacheKey, cacheValue); - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT, + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE, "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then " + "if ARGV[4] == '1' then " @@ -447,14 +438,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture fastPutAsync(K key, V value) { - if (key == null) { - throw new NullPointerException(); - } - if (value == null) { - throw new NullPointerException(); - } - + protected RFuture fastPutOperationAsync(K key, V value) { byte[] encodedKey = encodeMapKey(key); byte[] encodedValue = encodeMapValue(value); CacheKey cacheKey = toCacheKey(encodedKey); @@ -489,17 +473,13 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture removeAsync(K key) { - if (key == null) { - throw new NullPointerException(); - } - + public RFuture removeOperationAsync(K key) { byte[] keyEncoded = encodeMapKey(key); CacheKey cacheKey = toCacheKey(keyEncoded); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); cache.remove(cacheKey); - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE, + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE, "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then " + "if ARGV[3] == '1' then " @@ -516,10 +496,87 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture fastRemoveAsync(K ... keys) { - if (keys == null) { - throw new NullPointerException(); + protected RFuture> fastRemoveOperationBatchAsync(@SuppressWarnings("unchecked") K... keys) { + if (invalidateEntryOnChange == 1) { + List params = new ArrayList(keys.length*2); + for (K k : keys) { + byte[] keyEncoded = encodeMapKey(k); + params.add(keyEncoded); + + CacheKey cacheKey = toCacheKey(keyEncoded); + cache.remove(cacheKey); + byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); + params.add(msgEncoded); + } + + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST, + "local result = {}; " + + "for j = 1, #ARGV, 2 do " + + "local val = redis.call('hdel', KEYS[1], ARGV[j]);" + + "if val == 1 then " + + "redis.call('publish', KEYS[2], ARGV[j+1]); " + + "end;" + + "table.insert(result, val);" + + "end;" + + "return result;", + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), + params.toArray()); + } + + if (invalidateEntryOnChange == 2) { + List params = new ArrayList(keys.length*3); + params.add(System.currentTimeMillis()); + for (K k : keys) { + byte[] keyEncoded = encodeMapKey(k); + params.add(keyEncoded); + + CacheKey cacheKey = toCacheKey(keyEncoded); + cache.remove(cacheKey); + byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); + params.add(msgEncoded); + + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); + params.add(entryId); + } + + return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST, + "local result = {}; " + + "for j = 2, #ARGV, 3 do " + + "local val = redis.call('hdel', KEYS[1], ARGV[j]);" + + "if val == 1 then " + + "redis.call('zadd', KEYS[3], ARGV[1], ARGV[j+2]);" + + "redis.call('publish', KEYS[2], ARGV[j+1]); " + + "end;" + + "table.insert(result, val);" + + "end;" + + "return result;", + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + params.toArray()); + } + + List params = new ArrayList(keys.length); + for (K k : keys) { + byte[] keyEncoded = encodeMapKey(k); + params.add(keyEncoded); + + CacheKey cacheKey = toCacheKey(keyEncoded); + cache.remove(cacheKey); } + + RFuture> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST, + "local result = {}; " + + "for i = 1, #ARGV, 1 do " + + "local val = redis.call('hdel', KEYS[1], ARGV[i]); " + + "table.insert(result, val); " + + "end;" + + "return result;", + Arrays.asList(getName()), + params.toArray()); + return future; + } + + @Override + protected RFuture fastRemoveOperationAsync(K ... keys) { if (invalidateEntryOnChange == 1) { List params = new ArrayList(keys.length*2); @@ -892,11 +949,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture putAllAsync(final Map map) { - if (map.isEmpty()) { - return newSucceededFuture(null); - } - + public RFuture putAllOperationAsync(final Map map) { List params = new ArrayList(map.size()*3); params.add(invalidateEntryOnChange); params.add(map.size()*2); @@ -952,7 +1005,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture addAndGetAsync(final K key, Number value) { + public RFuture addAndGetOperationAsync(final K key, Number value) { final byte[] keyState = encodeMapKey(key); CacheKey cacheKey = toCacheKey(keyState); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); @@ -1151,14 +1204,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture replaceAsync(final K key, final V value) { + protected RFuture replaceOperationAsync(K key, V value) { final byte[] keyState = encodeMapKey(key); byte[] valueState = encodeMapValue(value); final CacheKey cacheKey = toCacheKey(keyState); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); - RFuture future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " + "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " @@ -1177,7 +1230,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end", Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); + } + + @Override + public RFuture replaceAsync(final K key, final V value) { + final byte[] keyState = encodeMapKey(key); + final CacheKey cacheKey = toCacheKey(keyState); + RFuture future = super.replaceAsync(key, value); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -1186,7 +1246,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } if (future.getNow() != null) { - CacheKey cacheKey = toCacheKey(key); cache.put(cacheKey, new CacheValue(key, value)); } } @@ -1194,9 +1253,9 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } - + @Override - public RFuture replaceAsync(final K key, V oldValue, final V newValue) { + protected RFuture replaceOperationAsync(K key, V oldValue, V newValue) { final byte[] keyState = encodeMapKey(key); byte[] oldValueState = encodeMapValue(oldValue); byte[] newValueState = encodeMapValue(newValue); @@ -1204,7 +1263,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); - RFuture future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); " + "if ARGV[4] == '1' then " @@ -1220,7 +1279,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end", Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); + } + + @Override + public RFuture replaceAsync(final K key, V oldValue, final V newValue) { + final byte[] keyState = encodeMapKey(key); + final CacheKey cacheKey = toCacheKey(keyState); + RFuture future = super.replaceAsync(key, oldValue, newValue); future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -1238,14 +1304,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture removeAsync(Object key, Object value) { + protected RFuture removeOperationAsync(Object key, Object value) { final byte[] keyState = encodeMapKey(key); byte[] valueState = encodeMapValue(value); final CacheKey cacheKey = toCacheKey(keyState); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); - RFuture future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, + return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + "if ARGV[3] == '1' then " + "redis.call('publish', KEYS[2], ARGV[4]); " @@ -1260,6 +1326,13 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end", Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); + } + + @Override + public RFuture removeAsync(Object key, Object value) { + final byte[] keyState = encodeMapKey(key); + final CacheKey cacheKey = toCacheKey(keyState); + RFuture future = super.removeAsync(key, value); future.addListener(new FutureListener() { @Override @@ -1276,6 +1349,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } + @Override public RFuture putIfAbsentAsync(final K key, final V value) { RFuture future = super.putIfAbsentAsync(key, value); diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index e489cb2cb..bbbc57198 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.RFuture; @@ -40,6 +41,7 @@ import org.redisson.api.map.MapLoader; import org.redisson.api.map.MapWriter; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; @@ -70,15 +72,9 @@ import io.netty.util.concurrent.FutureListener; */ public class RedissonMap extends RedissonExpirable implements RMap { - static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE); - static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE); - static final RedisCommand EVAL_REPLACE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE)); - static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP); - static final RedisCommand EVAL_PUT = EVAL_REPLACE; - final RedissonClient redisson; - private MapLoader mapLoader; - private MapWriter mapWriter; + MapLoader mapLoader; + MapWriter mapWriter; protected RedissonMap(CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapLoader mapLoader, MapWriter mapWriter) { super(commandExecutor, name); @@ -236,10 +232,6 @@ public class RedissonMap extends RedissonExpirable implements RMap { return future; } - private RFuture externalPutAllAsync(Map entries) { - return putAllAsync(entries); - } - @Override public V get(Object key) { return get(getAsync((K)key)); @@ -261,19 +253,48 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture putAllAsync(Map map) { + public RFuture putAllAsync(final Map map) { if (map.isEmpty()) { return newSucceededFuture(null); } + RFuture future = putAllOperationAsync(map); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(Future future, ExecutorService executorService) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.writeAll((Map) map); + trySuccess(null); + } + }); + } + }; + return result; + } + + protected RFuture putAllOperationAsync(Map map) { List params = new ArrayList(map.size()*2 + 1); params.add(getName()); for (java.util.Map.Entry t : map.entrySet()) { - params.add(t.getKey()); - params.add(t.getValue()); + if (t.getKey() == null) { + throw new NullPointerException("map key can't be null"); + } + if (t.getValue() == null) { + throw new NullPointerException("map value can't be null"); + } + + params.add(encodeMapKey(t.getKey())); + params.add(encodeMapValue(t.getValue())); } - return commandExecutor.writeAsync(getName(), codec, RedisCommands.HMSET, params.toArray()); + RFuture future = commandExecutor.writeAsync(getName(), codec, RedisCommands.HMSET, params.toArray()); + return future; } @Override @@ -343,17 +364,43 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture putIfAbsentAsync(K key, V value) { + public RFuture putIfAbsentAsync(final K key, final V value) { checkKey(key); checkValue(key); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT, + RFuture future = putIfAbsentOperationAsync(key, value); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + if (future.getNow() == null) { + commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } else { + trySuccess(future.getNow()); + } + } + }; + + return result; + } + + protected RFuture putIfAbsentOperationAsync(K key, V value) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "if redis.call('hsetnx', KEYS[1], ARGV[1], ARGV[2]) == 1 then " + "return nil " + "else " + "return redis.call('hget', KEYS[1], ARGV[1]) " + "end", - Collections.singletonList(getName(key)), key, value); + Collections.singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value)); } @Override @@ -362,11 +409,37 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture fastPutIfAbsentAsync(K key, V value) { + public RFuture fastPutIfAbsentAsync(final K key, final V value) { checkKey(key); checkValue(value); - return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSETNX, getName(key), key, value); + RFuture future = fastPutIfAbsentOperationAsync(key, value); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + if (future.getNow()) { + commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } else { + trySuccess(future.getNow()); + } + } + }; + + return result; + } + + protected RFuture fastPutIfAbsentOperationAsync(K key, V value) { + return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSETNX, getName(key), encodeMapKey(key), encodeMapValue(value)); } @Override @@ -375,17 +448,44 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture removeAsync(Object key, Object value) { + public RFuture removeAsync(final Object key, Object value) { checkKey(key); checkValue(value); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE_VALUE, + RFuture future = removeOperationAsync(key, value); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + if (future.getNow()) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.delete((K) key); + trySuccess(future.getNow()); + } + }); + } else { + trySuccess(future.getNow()); + } + } + }; + + return result; + } + + protected RFuture removeOperationAsync(Object key, Object value) { + RFuture future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + "return redis.call('hdel', KEYS[1], ARGV[1]) " + "else " + "return 0 " + "end", - Collections.singletonList(getName(key)), key, value); + Collections.singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value)); + return future; } protected void checkValue(Object value) { @@ -400,7 +500,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture replaceAsync(K key, V oldValue, V newValue) { + public RFuture replaceAsync(final K key, V oldValue, final V newValue) { checkKey(key); if (oldValue == null) { throw new NullPointerException("map oldValue can't be null"); @@ -409,15 +509,40 @@ public class RedissonMap extends RedissonExpirable implements RMap { throw new NullPointerException("map newValue can't be null"); } + RFuture future = replaceOperationAsync(key, oldValue, newValue); + if (mapWriter == null) { + return future; + } - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE_VALUE, + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + if (future.getNow()) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, newValue); + trySuccess(future.getNow()); + } + }); + } else { + trySuccess(future.getNow()); + } + } + }; + + return result; + } + + protected RFuture replaceOperationAsync(K key, V oldValue, V newValue) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); " + "return 1; " + "else " + "return 0; " + "end", - Collections.singletonList(getName(key)), key, oldValue, newValue); + Collections.singletonList(getName(key)), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue)); } @Override @@ -426,11 +551,37 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture replaceAsync(K key, V value) { + public RFuture replaceAsync(final K key, final V value) { checkKey(key); checkValue(value); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE, + RFuture future = replaceOperationAsync(key, value); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + if (future.getNow() != null) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } else { + trySuccess(future.getNow()); + } + } + }; + + return result; + } + + protected RFuture replaceOperationAsync(final K key, final V value) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " + "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " @@ -438,10 +589,10 @@ public class RedissonMap extends RedissonExpirable implements RMap { + "else " + "return nil; " + "end", - Collections.singletonList(getName(key)), key, value); + Collections.singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value)); } - protected RFuture getValueAsync(K key) { + protected RFuture getOperationAsync(K key) { return commandExecutor.readAsync(getName(key), codec, RedisCommands.HGET, getName(key), key); } @@ -449,7 +600,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { public RFuture getAsync(final K key) { checkKey(key); - RFuture future = getValueAsync(key); + RFuture future = getOperationAsync(key); if (mapLoader == null) { return future; } @@ -617,34 +768,100 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture putAsync(K key, V value) { + public RFuture putAsync(final K key, final V value) { checkKey(key); checkValue(value); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT, + RFuture future = putOperationAsync(key, value); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } + }; + + return result; + } + + protected RFuture putOperationAsync(K key, V value) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + "return v", - Collections.singletonList(getName(key)), key, value); + Collections.singletonList(getName(key)), encodeMapKey(key), encodeMapValue(value)); } @Override - public RFuture removeAsync(K key) { + public RFuture removeAsync(final K key) { checkKey(key); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE, + RFuture future = removeOperationAsync(key); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.delete(key); + trySuccess(future.getNow()); + } + }); + } + }; + + return result; + } + + protected RFuture removeOperationAsync(K key) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "redis.call('hdel', KEYS[1], ARGV[1]); " + "return v", - Collections.singletonList(getName(key)), key); + Collections.singletonList(getName(key)), encodeMapKey(key)); } @Override - public RFuture fastPutAsync(K key, V value) { + public RFuture fastPutAsync(final K key, final V value) { checkKey(key); checkValue(value); + RFuture future = fastPutOperationAsync(key, value); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } + }; + + return result; + } + + protected RFuture fastPutOperationAsync(K key, V value) { return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSET, getName(key), key, value); } @@ -654,14 +871,72 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture fastRemoveAsync(K ... keys) { - if (keys == null || keys.length == 0) { + public RFuture fastRemoveAsync(final K ... keys) { + if (keys == null) { + throw new NullPointerException(); + } + + if (keys.length == 0) { return newSucceededFuture(0L); } + if (mapWriter == null) { + return fastRemoveOperationAsync(keys); + } + + RFuture> future = fastRemoveOperationBatchAsync(keys); + + final RPromise result = new RedissonPromise(); + future.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + final List deletedKeys = new ArrayList(); + for (int i = 0; i < future.getNow().size(); i++) { + if (future.getNow().get(i) == 1) { + deletedKeys.add(keys[i]); + } + } + commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { + @Override + public void run() { + mapWriter.deleteAll(deletedKeys); + result.trySuccess((long)deletedKeys.size()); + } + }); + } + }); + return result; + } + + protected RFuture> fastRemoveOperationBatchAsync(final K... keys) { + List args = new ArrayList(keys.length); + for (K key : keys) { + args.add(encodeMapKey(key)); + } + + RFuture> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST, + "local result = {}; " + + "for i = 1, #ARGV, 1 do " + + "local val = redis.call('hdel', KEYS[1], ARGV[i]); " + + "table.insert(result, val); " + + "end;" + + "return result;", + Arrays.asList(getName()), + args.toArray()); + return future; + } + + protected RFuture fastRemoveOperationAsync(K... keys) { List args = new ArrayList(keys.length + 1); args.add(getName()); - args.addAll(Arrays.asList(keys)); + for (K key : keys) { + args.add(encodeMapKey(key)); + } return commandExecutor.writeAsync(getName(), codec, RedisCommands.HDEL, args.toArray()); } @@ -682,14 +957,37 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public RFuture addAndGetAsync(K key, Number value) { + public RFuture addAndGetAsync(final K key, Number value) { checkKey(key); checkValue(value); + RFuture future = addAndGetOperationAsync(key, value); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, future.getNow()); + trySuccess(future.getNow()); + } + }); + } + }; + + return result; + } + + protected RFuture addAndGetOperationAsync(K key, Number value) { byte[] keyState = encodeMapKey(key); - return commandExecutor.writeAsync(getName(key), StringCodec.INSTANCE, + RFuture future = commandExecutor.writeAsync(getName(key), StringCodec.INSTANCE, new RedisCommand("HINCRBYFLOAT", new NumberConvertor(value.getClass())), getName(key), keyState, new BigDecimal(value.toString()).toPlainString()); + return future; } @Override @@ -827,7 +1125,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { return; } - getValueAsync(key).addListener(new FutureListener() { + getOperationAsync(key).addListener(new FutureListener() { @Override public void operationComplete(Future valueFuture) throws Exception { if (!valueFuture.isSuccess()) { diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 9c70bee75..f48da6ee3 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -15,7 +15,6 @@ */ package org.redisson; -import java.io.IOException; import java.math.BigDecimal; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -24,6 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; @@ -47,7 +47,6 @@ import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.NumberConvertor; -import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.LongMultiDecoder; import org.redisson.client.protocol.decoder.MapCacheScanResult; @@ -60,6 +59,7 @@ import org.redisson.codec.MapCacheEventCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.eviction.EvictionScheduler; +import org.redisson.misc.RPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -85,16 +85,6 @@ import io.netty.util.concurrent.FutureListener; */ public class RedissonMapCache extends RedissonMap implements RMapCache { - static final RedisCommand EVAL_REPLACE = new RedisCommand("EVAL", 7, ValueType.MAP, ValueType.MAP_VALUE); - static final RedisCommand EVAL_HMSET = new RedisCommand("EVAL", new VoidReplayConvertor(), 5, ValueType.MAP); - private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_PUT_TTL = new RedisCommand("EVAL", 12, ValueType.MAP, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_PUT_TTL_IF_ABSENT = new RedisCommand("EVAL", 11, ValueType.MAP, ValueType.MAP_VALUE); - private static final RedisCommand EVAL_GET_TTL = new RedisCommand("EVAL", 7, ValueType.MAP_KEY, ValueType.MAP_VALUE); - static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 7, ValueType.MAP_KEY); - static final RedisCommand EVAL_PUT = new RedisCommand("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE); - static final RedisCommand EVAL_PUT_IF_ABSENT = new RedisCommand("EVAL", 5, ValueType.MAP, ValueType.MAP_VALUE); - public RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson, MapLoader mapLoader, MapWriter mapWriter) { super(commandExecutor, name, redisson, mapLoader, mapWriter); @@ -239,7 +229,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture putIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { + public RFuture putIfAbsentAsync(final K key, final V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { checkKey(key); checkValue(value); @@ -273,7 +263,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta; } - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_TTL_IF_ABSENT, + RFuture future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local insertable = false; " + "local value = redis.call('hget', KEYS[1], ARGV[5]); " + "if value == false then " @@ -325,14 +315,34 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "return val;" + "end; ", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)), - System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); + System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value)); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + if (future.getNow() == null) { + commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } else { + trySuccess(future.getNow()); + } + } + }; + + return result; + } @Override - public RFuture removeAsync(Object key, Object value) { - checkKey(key); - checkValue(value); - + protected RFuture removeOperationAsync(Object key, Object value) { return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[1]); " + "if value == false then " @@ -354,10 +364,10 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - protected RFuture getValueAsync(K key) { + protected RFuture getOperationAsync(K key) { checkKey(key); - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_GET_TTL, + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[2]); " + "if value == false then " + "return nil; " @@ -383,7 +393,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "return nil; " + "end; " + "return val; ", - Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), System.currentTimeMillis(), key); + Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key)), + System.currentTimeMillis(), encodeMapKey(key)); } @Override @@ -392,11 +403,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture putAsync(K key, V value) { - checkKey(key); - checkValue(value); - - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT, + protected RFuture putOperationAsync(K key, V value) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); " + "redis.call('hset', KEYS[1], ARGV[1], value); " @@ -410,15 +418,12 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "redis.call('publish', KEYS[3], msg); " + "return val; ", Arrays.asList(getName(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)), - key, value); + encodeMapKey(key), encodeMapValue(value)); } @Override - public RFuture putIfAbsentAsync(K key, V value) { - checkKey(key); - checkValue(value); - - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_IF_ABSENT, + public RFuture putIfAbsentOperationAsync(K key, V value) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local value = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); " + "if redis.call('hsetnx', KEYS[1], ARGV[1], value) == 1 then " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2]); " @@ -433,7 +438,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "return val; " + "end", Arrays.asList(getName(key), getCreatedChannelNameByKey(key)), - key, value); + encodeMapKey(key), encodeMapValue(value)); } @Override @@ -442,19 +447,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture addAndGetAsync(K key, Number value) { - checkKey(key); - checkValue(value); - + public RFuture addAndGetOperationAsync(K key, Number value) { byte[] keyState = encodeMapKey(key); - byte[] valueState; - try { - valueState = StringCodec.INSTANCE - .getMapValueEncoder() - .encode(new BigDecimal(value.toString()).toPlainString()); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } return commandExecutor.evalWriteAsync(getName(key), StringCodec.INSTANCE, new RedisCommand("EVAL", new NumberConvertor(value.getClass())), "local value = redis.call('hget', KEYS[1], ARGV[2]); " @@ -494,7 +488,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "redis.call('hset', KEYS[1], ARGV[2], newValuePack); " + "return tostring(newValue); ", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)), - System.currentTimeMillis(), keyState, valueState); + System.currentTimeMillis(), keyState, new BigDecimal(value.toString()).toPlainString()); } @Override @@ -513,7 +507,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture fastPutAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { + public RFuture fastPutAsync(final K key, final V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { checkKey(key); checkValue(value); @@ -547,7 +541,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta; } - return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, + RFuture future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local insertable = false; " + "local value = redis.call('hget', KEYS[1], ARGV[5]); " + "local t, val;" @@ -594,6 +588,24 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "end;", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)), System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value)); + + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } + }; + return result; } @Override @@ -607,7 +619,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture putAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { + public RFuture putAsync(final K key, final V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { checkKey(key); checkValue(value); @@ -641,7 +653,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta; } - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT_TTL, + RFuture future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local insertable = false; " + "local v = redis.call('hget', KEYS[1], ARGV[5]); " + "if v == false then " @@ -691,7 +703,26 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "return val", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key), getUpdatedChannelNameByKey(key)), - System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, key, value); + System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value)); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + executorService.execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } + }; + + return result; + } String getTimeoutSetNameByKey(Object key) { @@ -768,10 +799,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override - public RFuture removeAsync(K key) { - checkKey(key); - - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE, + public RFuture removeOperationAsync(K key) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "redis.call('zrem', KEYS[2], ARGV[1]); " + "redis.call('zrem', KEYS[3], ARGV[1]); " @@ -784,16 +813,47 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "end; " + "return v", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getRemovedChannelNameByKey(key)), - key); + encodeMapKey(key)); } @Override - public RFuture fastRemoveAsync(K ... keys) { - if (keys == null || keys.length == 0) { - return newSucceededFuture(0L); + protected RFuture> fastRemoveOperationBatchAsync(K... keys) { + List args = new ArrayList(keys.length); + for (K key : keys) { + args.add(encodeMapKey(key)); } - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_FAST_REMOVE, + RFuture> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST, + "redis.call('zrem', KEYS[3], unpack(ARGV)); " + + "redis.call('zrem', KEYS[2], unpack(ARGV)); " + + "for i, key in ipairs(ARGV) do " + + "local v = redis.call('hget', KEYS[1], key); " + + "if v ~= false then " + + "local t, val = struct.unpack('dLc0', v); " + + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(val), val); " + + "redis.call('publish', KEYS[4], msg); " + + "end;" + + "end;" + + + "local result = {}; " + + "for i = 1, #ARGV, 1 do " + + "local val = redis.call('hdel', KEYS[1], ARGV[i]); " + + "table.insert(result, val); " + + "end;" + + "return result;", + Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName(), getRemovedChannelName()), + args.toArray()); + return future; + } + + @Override + protected RFuture fastRemoveOperationAsync(K ... keys) { + List params = new ArrayList(keys.length); + for (K key : keys) { + params.add(encodeMapKey(key)); + } + + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, "redis.call('zrem', KEYS[3], unpack(ARGV)); " + "redis.call('zrem', KEYS[2], unpack(ARGV)); " + "for i, key in ipairs(ARGV) do " @@ -806,7 +866,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac "end;" + "return redis.call('hdel', KEYS[1], unpack(ARGV)); ", Arrays.asList(getName(), getTimeoutSetName(), getIdleSetName(), getRemovedChannelName()), - keys); + params.toArray()); } @Override @@ -896,10 +956,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override - public RFuture fastPutAsync(K key, V value) { - checkKey(key); - checkValue(value); - + protected RFuture fastPutOperationAsync(K key, V value) { return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local insertable = false; " + "local v = redis.call('hget', KEYS[1], ARGV[2]); " @@ -941,10 +998,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture fastPutIfAbsentAsync(K key, V value) { - checkKey(key); - checkValue(value); - + protected RFuture fastPutIfAbsentOperationAsync(K key, V value) { return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[2]); " + "if value == false then " @@ -998,7 +1052,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture fastPutIfAbsentAsync(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { + public RFuture fastPutIfAbsentAsync(final K key, final V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { checkKey(key); checkValue(value); @@ -1032,7 +1086,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac maxIdleTimeout = System.currentTimeMillis() + maxIdleDelta; } - return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, + RFuture future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local insertable = false; " + "local value = redis.call('hget', KEYS[1], ARGV[5]); " + "if value == false then " @@ -1085,18 +1139,32 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "end; ", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getIdleSetNameByKey(key), getCreatedChannelNameByKey(key)), System.currentTimeMillis(), ttlTimeout, maxIdleTimeout, maxIdleDelta, encodeMapKey(key), encodeMapValue(value)); + if (mapWriter == null) { + return future; + } + + RPromise result = new MapWriterPromise(future, commandExecutor) { + @Override + public void execute(final Future future, ExecutorService executorService) { + if (future.getNow()) { + commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { + @Override + public void run() { + mapWriter.write(key, value); + trySuccess(future.getNow()); + } + }); + } else { + trySuccess(future.getNow()); + } + } + }; + + return result; } @Override - public RFuture replaceAsync(K key, V oldValue, V newValue) { - checkKey(key); - if (oldValue == null) { - throw new NullPointerException("map old value can't be null"); - } - if (newValue == null) { - throw new NullPointerException("map new value can't be null"); - } - + protected RFuture replaceOperationAsync(K key, V oldValue, V newValue) { return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_BOOLEAN, "local v = redis.call('hget', KEYS[1], ARGV[2]); " + "if v == false then " @@ -1134,11 +1202,8 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public RFuture replaceAsync(K key, V value) { - checkKey(key); - checkValue(value); - - return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE, + public RFuture replaceOperationAsync(K key, V value) { + return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "local v = redis.call('hget', KEYS[1], ARGV[2]); " + "if v ~= false then " + "local t, val = struct.unpack('dLc0', v); " @@ -1156,15 +1221,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac + "return nil; " + "end", Arrays.asList(getName(key), getTimeoutSetNameByKey(key), getUpdatedChannelNameByKey(key)), - System.currentTimeMillis(), key, value); + System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } @Override - public RFuture putAllAsync(final Map map) { - if (map.isEmpty()) { - return newSucceededFuture(null); - } - + public RFuture putAllOperationAsync(Map map) { List params = new ArrayList(map.size()*2); for (java.util.Map.Entry t : map.entrySet()) { if (t.getKey() == null) { @@ -1174,11 +1235,11 @@ public class RedissonMapCache extends RedissonMap implements RMapCac throw new NullPointerException("map value can't be null"); } - params.add(t.getKey()); - params.add(t.getValue()); + params.add(encodeMapKey(t.getKey())); + params.add(encodeMapValue(t.getValue())); } - return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HMSET, + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, "for i, value in ipairs(ARGV) do " + "if i % 2 == 0 then " + "local val = struct.pack('dLc0', 0, string.len(value), value); " diff --git a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java index 291ca086c..aa9e05a89 100644 --- a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java +++ b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java @@ -25,6 +25,8 @@ import org.redisson.api.map.MapWriter; * * @author Nikita Koksharov * + * @param key type + * @param value type */ public class LocalCachedMapOptions { @@ -120,6 +122,9 @@ public class LocalCachedMapOptions { * .invalidateEntryOnChange(true); * * + * @param key type + * @param value type + * * @return LocalCachedMapOptions instance * */ diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index bc52f1fcf..0ddf69931 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.redisson.api.map.MapLoader; +import org.redisson.api.map.MapWriter; import org.redisson.api.mapreduce.RMapReduce; /** @@ -56,7 +57,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsync * If map doesn't contain value for specified key and {@link MapLoader} is defined * then value will be loaded in read-through mode. * @@ -67,6 +68,33 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncvalue with the specified key + * in async manner. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return previous associated value + */ + @Override + V put(K key, V value); + + /** + * Associates the specified value with the specified key + * only if there is no any association with specifiedkey. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return null if key is a new one in the hash and value was set. + * Previous value if key already exists in the hash and change hasn't been made. + */ + @Override + V putIfAbsent(K key, V value); + /** * Returns RMapReduce object associated with this map * @@ -112,13 +140,75 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsynckey from map and returns associated value in async manner. + *

+ * If {@link MapWriter} is defined then keyis deleted in write-through mode. + * + * @param key - map key + * @return deleted value, null if there wasn't any association + */ + @Override + V remove(Object key); + + /** + * Replaces previous value with a new value associated with the key. + * If there wasn't any association before then method returns null. + *

+ * If {@link MapWriter} is defined then new valueis written in write-through mode. + * + * @param key - map key + * @param value - map value + * @return previous associated value + * or null if there wasn't any association and change hasn't been made + */ + @Override + V replace(K key, V value); + + /** + * Replaces previous oldValue with a newValue associated with the key. + * If previous value doesn't exist or equal to oldValue then method returns false. + *

+ * If {@link MapWriter} is defined then newValueis written in write-through mode. + * + * @param key - map key + * @param oldValue - map old value + * @param newValue - map new value + * @return true if value has been replaced otherwise false. + */ + @Override + boolean replace(K key, V oldValue, V newValue); + + /** + * Removes key from map only if it associated with value. + *

+ * If {@link MapWriter} is defined then keyis deleted in write-through mode. + * + * @param key - map key + * @param value - map value + * @return true if map entry has been replaced otherwise false. + */ + @Override + boolean remove(Object key, Object value); + + /** + * Associates the specified value with the specified key + * in batch. + *

+ * If {@link MapWriter} is defined then new map entries will be stored in write-through mode. + * + * @param map mappings to be stored in this map + */ + @Override + void putAll(java.util.Map map); + /** * Gets a map slice contained the mappings with defined keys * by one operation. - * + *

* If map doesn't contain value/values for specified key/keys and {@link MapLoader} is defined * then value/values will be loaded in read-through mode. - * + *

* The returned map is NOT backed by the original map. * * @param keys - map keys @@ -128,9 +218,11 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsynckeys from map by one operation - * - * Works faster than RMap.remove but not returning + *

+ * Works faster than {@link RMap#remove(Object)} but not returning * the value associated with key + *

+ * If {@link MapWriter} is defined then keysare deleted in write-through mode. * * @param keys - map keys * @return the number of keys that were removed from the hash, not including specified but non existing keys @@ -139,9 +231,11 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncvalue with the specified key. - * - * Works faster than RMap.put but not returning + *

+ * Works faster than {@link RMap#put(Object, Object)} but not returning * the previous value associated with key + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. * * @param key - map key * @param value - map value @@ -150,6 +244,20 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncvalue with the specified key + * only if there is no any association with specifiedkey. + *

+ * Works faster than {@link RMap#putIfAbsent(Object, Object)} but not returning + * the previous value associated with key + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return true if key is a new one in the hash and value was set. + * false if key already exists in the hash and change hasn't been made. + */ boolean fastPutIfAbsent(K key, V value); /** diff --git a/redisson/src/main/java/org/redisson/api/RMapAsync.java b/redisson/src/main/java/org/redisson/api/RMapAsync.java index e086c2a4f..f77dd09ae 100644 --- a/redisson/src/main/java/org/redisson/api/RMapAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapAsync.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Map.Entry; import org.redisson.api.map.MapLoader; +import org.redisson.api.map.MapWriter; import java.util.Set; @@ -61,10 +62,10 @@ public interface RMapAsync extends RExpirableAsync { /** * Gets a map slice contained the mappings with defined keys * by one operation. - * + *

* If map doesn't contain value/values for specified key/keys and {@link MapLoader} is defined * then value/values will be loaded in read-through mode. - * + *

* The returned map is NOT backed by the original map. * * @param keys - map keys @@ -72,6 +73,15 @@ public interface RMapAsync extends RExpirableAsync { */ RFuture> getAllAsync(Set keys); + /** + * Associates the specified value with the specified key + * in batch. + *

+ * If {@link MapWriter} is defined then new map entries are stored in write-through mode. + * + * @param map mappings to be stored in this map + * @return void + */ RFuture putAllAsync(Map map); RFuture addAndGetAsync(K key, Number value); @@ -83,10 +93,12 @@ public interface RMapAsync extends RExpirableAsync { RFuture sizeAsync(); /** - * Removes keys from map by one operation in async manner - * - * Works faster than RMap.removeAsync but doesn't return - * the value associated with key + * Removes keys from map by one operation in async manner. + *

+ * Works faster than {@link RMap#removeAsync(Object, Object)} but doesn't return + * the value associated with key. + *

+ * If {@link MapWriter} is defined then keysare deleted in write-through mode. * * @param keys - map keys * @return the number of keys that were removed from the hash, not including specified but non existing keys @@ -96,17 +108,33 @@ public interface RMapAsync extends RExpirableAsync { /** * Associates the specified value with the specified key * in async manner. - * - * Works faster than RMap.putAsync but not returning + *

+ * Works faster than {@link RMap#putAsync(Object, Object)} but not returning * the previous value associated with key + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. * * @param key - map key * @param value - map value - * @return true if key is a new key in the hash and value was set. + * @return true if key is a new one in the hash and value was set. * false if key already exists in the hash and the value was updated. */ RFuture fastPutAsync(K key, V value); + /** + * Associates the specified value with the specified key + * only if there is no any association with specifiedkey. + *

+ * Works faster than {@link RMap#putIfAbsentAsync(Object, Object)} but not returning + * the previous value associated with key + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return true if key is a new one in the hash and value was set. + * false if key already exists in the hash and change hasn't been made. + */ RFuture fastPutIfAbsentAsync(K key, V value); /** @@ -140,7 +168,7 @@ public interface RMapAsync extends RExpirableAsync { /** * Returns the value to which the specified key is mapped, * or {@code null} if this map contains no mapping for the key. - * + *

* If map doesn't contain value for specified key and {@link MapLoader} is defined * then value will be loaded in read-through mode. * @@ -150,16 +178,76 @@ public interface RMapAsync extends RExpirableAsync { */ RFuture getAsync(K key); + /** + * Associates the specified value with the specified key + * in async manner. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return previous associated value + */ RFuture putAsync(K key, V value); + /** + * Removes key from map and returns associated value in async manner. + *

+ * If {@link MapWriter} is defined then keyis deleted in write-through mode. + * + * @param key - map key + * @return deleted value or null if there wasn't any association + */ RFuture removeAsync(K key); + /** + * Replaces previous value with a new value associated with the key. + * If there wasn't any association before then method returns null. + *

+ * If {@link MapWriter} is defined then new valueis written in write-through mode. + * + * @param key - map key + * @param value - map value + * @return previous associated value + * or null if there wasn't any association and change hasn't been made + */ RFuture replaceAsync(K key, V value); + /** + * Replaces previous oldValue with a newValue associated with the key. + * If previous value doesn't exist or equal to oldValue then method returns false. + *

+ * If {@link MapWriter} is defined then newValueis written in write-through mode. + * + * @param key - map key + * @param oldValue - map old value + * @param newValue - map new value + * @return true if value has been replaced otherwise false. + */ RFuture replaceAsync(K key, V oldValue, V newValue); + /** + * Removes key from map only if it associated with value. + *

+ * If {@link MapWriter} is defined then keyis deleted in write-through mode. + * + * @param key - map key + * @param value - map value + * @return true if map entry has been replaced otherwise false. + */ RFuture removeAsync(Object key, Object value); + /** + * Associates the specified value with the specified key + * only if there is no any association with specifiedkey. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return null if key is a new one in the hash and value was set. + * Previous value if key already exists in the hash and change hasn't been made. + */ RFuture putIfAbsentAsync(K key, V value); } diff --git a/redisson/src/main/java/org/redisson/api/map/MapWriter.java b/redisson/src/main/java/org/redisson/api/map/MapWriter.java index d76fe5397..905668cca 100644 --- a/redisson/src/main/java/org/redisson/api/map/MapWriter.java +++ b/redisson/src/main/java/org/redisson/api/map/MapWriter.java @@ -19,11 +19,12 @@ import java.util.Collection; import java.util.Map; /** + * Map writer used for write-through operations. * * @author Nikita Koksharov * - * @param - * @param + * @param key type + * @param value type */ public interface MapWriter { diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index b46ded991..05b2e5387 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -252,7 +252,7 @@ public interface RedisCommands { RedisStrictCommand MSET = new RedisStrictCommand("MSET", new VoidReplayConvertor()); RedisStrictCommand MSETNX = new RedisStrictCommand("MSETNX", new BooleanReplayConvertor()); - RedisCommand HSETNX = new RedisCommand("HSETNX", new BooleanReplayConvertor(), 2, ValueType.MAP); + RedisCommand HSETNX = new RedisCommand("HSETNX", new BooleanReplayConvertor()); RedisCommand HSET = new RedisCommand("HSET", new BooleanReplayConvertor(), 2, ValueType.MAP); RedisCommand> HSCAN = new RedisCommand>("HSCAN", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); RedisCommand> HGETALL = new RedisCommand>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP); @@ -263,10 +263,10 @@ public interface RedisCommands { RedisCommand HSTRLEN = new RedisCommand("HSTRLEN", new IntegerReplayConvertor(), 2, ValueType.MAP_KEY); RedisStrictCommand HLEN_LONG = new RedisStrictCommand("HLEN"); RedisCommand> HKEYS = new RedisCommand>("HKEYS", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); - RedisCommand HMSET = new RedisCommand("HMSET", new VoidReplayConvertor(), 2, ValueType.MAP); + RedisCommand HMSET = new RedisCommand("HMSET", new VoidReplayConvertor()); RedisCommand> HMGET = new RedisCommand>("HMGET", new ObjectListReplayDecoder(), 2, ValueType.MAP_KEY, ValueType.MAP_VALUE); RedisCommand HGET = new RedisCommand("HGET", 2, ValueType.MAP_KEY, ValueType.MAP_VALUE); - RedisCommand HDEL = new RedisStrictCommand("HDEL", 2, ValueType.MAP_KEY); + RedisCommand HDEL = new RedisStrictCommand("HDEL"); RedisStrictCommand DEL = new RedisStrictCommand("DEL"); RedisStrictCommand DBSIZE = new RedisStrictCommand("DBSIZE"); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index d25916e22..2e710fa7c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -63,9 +63,6 @@ import reactor.rx.Streams; */ public class RedissonMapCacheReactive extends RedissonExpirableReactive implements RMapCacheReactive, MapReactive { - private static final RedisCommand> EVAL_HSCAN = - new RedisCommand>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP); - private final RMapCache mapCache; public RedissonMapCacheReactive(UUID id, EvictionScheduler evictionScheduler, CommandReactiveExecutor commandExecutor, String name) { diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index d9420bb7b..6278384b9 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -3,6 +3,7 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -10,14 +11,196 @@ import java.util.Map; import org.junit.Test; import org.redisson.api.RMap; import org.redisson.api.map.MapLoader; +import org.redisson.api.map.MapWriter; public abstract class BaseMapTest extends BaseTest { + protected abstract RMap getWriterTestMap(String name, Map map); + protected abstract RMap getLoaderTestMap(String name, Map map); + + @Test + public void testWriterAddAndGet() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + assertThat(map.addAndGet("1", 11)).isEqualTo(11); + assertThat(map.addAndGet("1", 7)).isEqualTo(18); + + Map expected = new HashMap<>(); + expected.put("1", 18); + assertThat(store).isEqualTo(expected); + } + + + @Test + public void testWriterFastRemove() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.put("1", "11"); + map.put("2", "22"); + map.put("3", "33"); + + map.fastRemove("1", "2", "4"); + + Map expected = new HashMap<>(); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterFastPut() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.fastPut("1", "11"); + map.fastPut("2", "22"); + map.fastPut("3", "33"); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterRemove() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.put("1", "11"); + map.remove("1"); + map.put("3", "33"); + + Map expected = new HashMap<>(); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterReplaceKeyValue() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.put("1", "11"); + map.replace("1", "00"); + map.replace("2", "22"); + map.put("3", "33"); + + Map expected = new HashMap<>(); + expected.put("1", "00"); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterReplaceKeyOldNewValue() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.put("1", "11"); + map.replace("1", "11", "00"); + map.put("3", "33"); + + Map expected = new HashMap<>(); + expected.put("1", "00"); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterRemoveKeyValue() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.put("1", "11"); + map.put("2", "22"); + map.put("3", "33"); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + + map.remove("1", "11"); + + Map expected2 = new HashMap<>(); + expected2.put("2", "22"); + expected2.put("3", "33"); + assertThat(store).isEqualTo(expected2); + } + + @Test + public void testWriterFastPutIfAbsent() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.fastPutIfAbsent("1", "11"); + map.fastPutIfAbsent("1", "00"); + map.fastPutIfAbsent("2", "22"); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterPutIfAbsent() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.putIfAbsent("1", "11"); + map.putIfAbsent("1", "00"); + map.putIfAbsent("2", "22"); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterPutAll() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + Map newMap = new HashMap<>(); + newMap.put("1", "11"); + newMap.put("2", "22"); + newMap.put("3", "33"); + map.putAll(newMap); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } + + + @Test + public void testWriterPut() { + Map store = new HashMap<>(); + RMap map = getWriterTestMap("test", store); + + map.put("1", "11"); + map.put("2", "22"); + map.put("3", "33"); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } @Test public void testLoadAllReplaceValues() { - Map cache = new HashMap(); + Map cache = new HashMap<>(); for (int i = 0; i < 10; i++) { cache.put("" + i, "" + i + "" + i); } @@ -58,6 +241,34 @@ public abstract class BaseMapTest extends BaseTest { } } + protected MapWriter createMapWriter(Map map) { + return new MapWriter() { + + @Override + public void write(K key, V value) { + map.put(key, value); + } + + @Override + public void writeAll(Map values) { + map.putAll(values); + } + + @Override + public void delete(K key) { + map.remove(key); + } + + @Override + public void deleteAll(Collection keys) { + for (K key : keys) { + map.remove(key); + } + } + + }; + } + protected MapLoader createMapLoader(Map map) { return new MapLoader() { @Override diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index dd9841950..932e7fa40 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -48,6 +48,12 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { } + @Override + protected RMap getWriterTestMap(String name, Map map) { + LocalCachedMapOptions options = LocalCachedMapOptions.defaults().mapWriter(createMapWriter(map)); + return redisson.getLocalCachedMap("test", options); + } + @Override protected RMap getLoaderTestMap(String name, Map map) { LocalCachedMapOptions options = LocalCachedMapOptions.defaults().mapLoader(createMapLoader(map)); diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index 90328fd44..eb8492b29 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -137,11 +137,78 @@ public class RedissonMapCacheTest extends BaseMapTest { } + @Override + protected RMap getWriterTestMap(String name, Map map) { + return redisson.getMapCache("test", null, createMapWriter(map)); + } + @Override protected RMap getLoaderTestMap(String name, Map map) { return redisson.getMapCache("test", createMapLoader(map), null); } + @Test + public void testWriterPutIfAbsent() { + Map store = new HashMap<>(); + RMapCache map = (RMapCache) getWriterTestMap("test", store); + + map.putIfAbsent("1", "11", 10, TimeUnit.SECONDS); + map.putIfAbsent("1", "00", 10, TimeUnit.SECONDS); + map.putIfAbsent("2", "22", 10, TimeUnit.SECONDS); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterPutTTL() { + Map store = new HashMap<>(); + RMapCache map = (RMapCache) getWriterTestMap("test", store); + + map.put("1", "11", 10, TimeUnit.SECONDS); + map.put("2", "22", 10, TimeUnit.SECONDS); + map.put("3", "33", 10, TimeUnit.SECONDS); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterFastPutIfAbsentTTL() { + Map store = new HashMap<>(); + RMapCache map = (RMapCache) getWriterTestMap("test", store); + + map.fastPutIfAbsent("1", "11", 10, TimeUnit.SECONDS); + map.fastPutIfAbsent("1", "00", 10, TimeUnit.SECONDS); + map.fastPutIfAbsent("2", "22", 10, TimeUnit.SECONDS); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + assertThat(store).isEqualTo(expected); + } + + @Test + public void testWriterFastPutTTL() { + Map store = new HashMap<>(); + RMapCache map = (RMapCache) getWriterTestMap("test", store); + + map.fastPut("1", "11", 10, TimeUnit.SECONDS); + map.fastPut("2", "22", 10, TimeUnit.SECONDS); + map.fastPut("3", "33", 10, TimeUnit.SECONDS); + + Map expected = new HashMap<>(); + expected.put("1", "11"); + expected.put("2", "22"); + expected.put("3", "33"); + assertThat(store).isEqualTo(expected); + } + @Test public void testOrdering() { Map map = new LinkedHashMap(); diff --git a/redisson/src/test/java/org/redisson/RedissonMapTest.java b/redisson/src/test/java/org/redisson/RedissonMapTest.java index e2ab088ab..36756e4ce 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapTest.java @@ -134,6 +134,11 @@ public class RedissonMapTest extends BaseMapTest { return redisson.getMap("test", createMapLoader(map), null); } + @Override + protected RMap getWriterTestMap(String name, Map map) { + return redisson.getMap("test", null, createMapWriter(map)); + } + @Test public void testAddAndGet() throws InterruptedException { RMap map = redisson.getMap("getAll");