From d55f902a9d47437dfdaf648136f667021b39d95e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 15 Jan 2021 11:18:10 +0300 Subject: [PATCH] Feature - Add putIfExists() method to RMap object #3142 --- .../org/redisson/RedissonLocalCachedMap.java | 45 ++++++++++ .../main/java/org/redisson/RedissonMap.java | 32 ++++++- .../java/org/redisson/RedissonMapCache.java | 84 +++++++++++++++++++ .../src/main/java/org/redisson/api/RMap.java | 15 +++- .../main/java/org/redisson/api/RMapAsync.java | 13 +++ .../java/org/redisson/api/RMapReactive.java | 13 +++ .../main/java/org/redisson/api/RMapRx.java | 13 +++ .../transaction/BaseTransactionalMap.java | 59 ++++++++++--- .../transaction/RedissonTransactionalMap.java | 8 +- .../RedissonTransactionalMapCache.java | 6 ++ .../map/MapPutIfExistsOperation.java | 39 +++++++++ .../test/java/org/redisson/BaseMapTest.java | 17 +++- 12 files changed, 330 insertions(+), 14 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/transaction/operation/map/MapPutIfExistsOperation.java diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 911f6ed12..db18db7ab 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -146,6 +146,23 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return cache.putIfAbsent(cacheKey, new CacheValue(key, value)); } + private CacheValue cachePutIfExists(CacheKey cacheKey, Object key, Object value) { + if (listener.isDisabled(cacheKey)) { + return null; + } + + while (true) { + CacheValue v = cache.get(cacheKey); + if (v != null) { + if (cache.replace(cacheKey, v, new CacheValue(key, value))) { + return v; + } + } else { + return null; + } + } + } + private CacheValue cacheReplace(CacheKey cacheKey, Object key, Object value) { if (listener.isDisabled(cacheKey)) { return null; @@ -1138,6 +1155,34 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } + @Override + public RFuture putIfExistsAsync(K key, V value) { + if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { + ByteBuf mapKey = encodeMapKey(key); + CacheKey cacheKey = localCacheView.toCacheKey(mapKey); + CacheValue prevValue = cachePutIfExists(cacheKey, key, value); + if (prevValue != null) { + broadcastLocalCacheStore((V) value, mapKey, cacheKey); + return RedissonPromise.newSucceededFuture((V) prevValue.getValue()); + } else { + mapKey.release(); + return RedissonPromise.newSucceededFuture(null); + } + } + + RFuture future = super.putIfExistsAsync(key, value); + future.onComplete((res, e) -> { + if (e != null) { + return; + } + + if (res != null) { + CacheKey cacheKey = localCacheView.toCacheKey(key); + cachePut(cacheKey, key, value); + } + }); + return future; + } @Override public RFuture putIfAbsentAsync(K key, V value) { diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 336ce5cca..9605e008a 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -864,7 +864,37 @@ public class RedissonMap extends RedissonExpirable implements RMap { return commandExecutor.readAsync(getName(), codec, RedisCommands.HGETALL, getName()); } - + @Override + public V putIfExists(K key, V value) { + return get(putIfExistsAsync(key, value)); + } + + @Override + public RFuture putIfExistsAsync(K key, V value) { + checkKey(key); + checkValue(value); + + RFuture future = putIfExistsOperationAsync(key, value); + if (hasNoWriter()) { + return future; + } + + MapWriterTask.Add task = new MapWriterTask.Add(key, value); + return mapWriterFuture(future, task, Objects::nonNull); + } + + protected RFuture putIfExistsOperationAsync(K key, V value) { + String name = getName(key); + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE, + "local value = redis.call('hget', KEYS[1], ARGV[1]); " + + "if value ~= false then " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "return value; " + + "end; " + + "return nil; ", + Collections.singletonList(name), encodeMapKey(key), encodeMapValue(value)); + } + @Override public V putIfAbsent(K key, V value) { return get(putIfAbsentAsync(key, value)); diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index d8b169a7e..f7641b8cb 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -612,6 +612,90 @@ public class RedissonMapCache extends RedissonMap implements RMapCac System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } + @Override + protected RFuture putIfExistsOperationAsync(K key, V value) { + String name = getName(key); + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE, + "local value = redis.call('hget', KEYS[1], ARGV[2]); " + + "if value == false then " + + "return nil;" + + "end; " + + "local maxSize = tonumber(redis.call('hget', KEYS[7], 'max-size'));" + + "local lastAccessTimeSetName = KEYS[5]; " + + "local currentTime = tonumber(ARGV[1]); " + + "local t, val;" + + "if value ~= false then " + + "t, val = struct.unpack('dLc0', value); " + + "local expireDate = 92233720368547758; " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " + + "if expireDateScore ~= false then " + + "expireDate = tonumber(expireDateScore) " + + "end; " + + "if t ~= 0 then " + + "local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); " + + "if expireIdle ~= false then " + + "expireDate = math.min(expireDate, tonumber(expireIdle)) " + + "end; " + + "end; " + + "if expireDate > tonumber(ARGV[1]) then " + + "if maxSize ~= nil and maxSize ~= 0 then " + + "local mode = redis.call('hget', KEYS[7], 'mode'); " + + "if mode == false or mode == 'LRU' then " + + "redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]); " + + "else " + + "redis.call('zincrby', lastAccessTimeSetName, 1, ARGV[2]); " + + "end; " + + "end; " + + "else " + + "return nil; " + + "end; " + + "end; " + + + "local newValue = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[2], newValue); " + + // last access time + + "if maxSize ~= nil and maxSize ~= 0 then " + + "local mode = redis.call('hget', KEYS[7], 'mode'); " + + + "if mode == false or mode == 'LRU' then " + + "redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]); " + + "end; " + + + " local cacheSize = tonumber(redis.call('hlen', KEYS[1])); " + + " if cacheSize > maxSize then " + + " local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1); " + + " for index, lruItem in ipairs(lruItems) do " + + " if lruItem and lruItem ~= ARGV[2] then " + + " local lruItemValue = redis.call('hget', KEYS[1], lruItem); " + + " redis.call('hdel', KEYS[1], lruItem); " + + " redis.call('zrem', KEYS[2], lruItem); " + + " redis.call('zrem', KEYS[3], lruItem); " + + " redis.call('zrem', lastAccessTimeSetName, lruItem); " + + " if lruItemValue ~= false then " + + " local removedChannelName = KEYS[6]; " + + "local ttl, obj = struct.unpack('dLc0', lruItemValue);" + + " local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(obj), obj);" + + " redis.call('publish', removedChannelName, msg); " + + "end; " + + " end; " + + " end; " + + " end; " + + + "if mode == 'LFU' then " + + "redis.call('zincrby', lastAccessTimeSetName, 1, ARGV[2]); " + + "end; " + + + "end; " + + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3], string.len(val), val); " + + "redis.call('publish', KEYS[4], msg); " + + "return val;", + Arrays.asList(name, getTimeoutSetName(name), getIdleSetName(name), getUpdatedChannelName(name), + getLastAccessTimeSetName(name), getRemovedChannelName(name), getOptionsName(name)), + System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); + } + @Override protected RFuture putIfAbsentOperationAsync(K key, V value) { String name = getName(key); diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index f65ae41f5..7bf71f993 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -81,7 +81,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncvalue mapped by specified key + * Stores the specified value mapped by key * only if there is no value with specifiedkey stored before. *

* If {@link MapWriter} is defined then new map entry is stored in write-through mode. @@ -93,6 +93,19 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncvalue mapped by key + * only if mapping already exists. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return null if key is doesn't exists in the hash and value hasn't been set. + * Previous value if key already exists in the hash and new value has been stored. + */ + V putIfExists(K key, V value); /** * Returns RMapReduce object associated with this map diff --git a/redisson/src/main/java/org/redisson/api/RMapAsync.java b/redisson/src/main/java/org/redisson/api/RMapAsync.java index ff6c99c0d..93f6b9ca4 100644 --- a/redisson/src/main/java/org/redisson/api/RMapAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapAsync.java @@ -363,4 +363,17 @@ public interface RMapAsync extends RExpirableAsync { */ RFuture putIfAbsentAsync(K key, V value); + /** + * Stores the specified value mapped by key + * only if mapping already exists. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return null if key is doesn't exists in the hash and value hasn't been set. + * Previous value if key already exists in the hash and new value has been stored. + */ + RFuture putIfExistsAsync(K key, V value); + } diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index 0d38dda43..7aafebb8a 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -334,6 +334,19 @@ public interface RMapReactive extends RExpirableReactive { */ Mono putIfAbsent(K key, V value); + /** + * Stores the specified value mapped by key + * only if mapping already exists. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return null if key is doesn't exists in the hash and value hasn't been set. + * Previous value if key already exists in the hash and new value has been stored. + */ + Mono putIfExists(K key, V value); + /** * Returns iterator over map entries collection. * Map entries are loaded in batch. Batch size is 10. diff --git a/redisson/src/main/java/org/redisson/api/RMapRx.java b/redisson/src/main/java/org/redisson/api/RMapRx.java index 78d28c511..d045c109b 100644 --- a/redisson/src/main/java/org/redisson/api/RMapRx.java +++ b/redisson/src/main/java/org/redisson/api/RMapRx.java @@ -337,6 +337,19 @@ public interface RMapRx extends RExpirableRx { */ Maybe putIfAbsent(K key, V value); + /** + * Stores the specified value mapped by key + * only if mapping already exists. + *

+ * If {@link MapWriter} is defined then new map entry is stored in write-through mode. + * + * @param key - map key + * @param value - map value + * @return null if key is doesn't exists in the hash and value hasn't been set. + * Previous value if key already exists in the hash and new value has been stored. + */ + Maybe putIfExists(K key, V value); + /** * Returns iterator over map entries collection. * Map entries are loaded in batch. Batch size is 10. diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index 92bac7c55..bd67eff6f 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -47,15 +47,7 @@ import org.redisson.transaction.operation.DeleteOperation; import org.redisson.transaction.operation.TouchOperation; import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.UnlinkOperation; -import org.redisson.transaction.operation.map.MapAddAndGetOperation; -import org.redisson.transaction.operation.map.MapFastPutIfAbsentOperation; -import org.redisson.transaction.operation.map.MapFastPutOperation; -import org.redisson.transaction.operation.map.MapFastRemoveOperation; -import org.redisson.transaction.operation.map.MapOperation; -import org.redisson.transaction.operation.map.MapPutIfAbsentOperation; -import org.redisson.transaction.operation.map.MapPutOperation; -import org.redisson.transaction.operation.map.MapRemoveOperation; -import org.redisson.transaction.operation.map.MapReplaceOperation; +import org.redisson.transaction.operation.map.*; import io.netty.buffer.ByteBuf; @@ -286,7 +278,54 @@ public class BaseTransactionalMap { }); return result; } - + + protected RFuture putIfExistsOperationAsync(K key, V value) { + long threadId = Thread.currentThread().getId(); + return putIfExistsOperationAsync(key, value, new MapPutIfExistsOperation(map, key, value, transactionId, threadId)); + } + + protected RFuture putIfExistsOperationAsync(K key, V value, MapOperation mapOperation) { + RPromise result = new RedissonPromise(); + executeLocked(result, key, new Runnable() { + @Override + public void run() { + HashValue keyHash = toKeyHash(key); + MapEntry entry = state.get(keyHash); + if (entry != null) { + operations.add(mapOperation); + if (entry != MapEntry.NULL) { + state.put(keyHash, new MapEntry(key, value)); + if (deleted != null) { + deleted = false; + } + + result.trySuccess((V) entry.getValue()); + } else { + result.trySuccess(null); + } + return; + } + + map.getAsync(key).onComplete((res, e) -> { + if (e != null) { + result.tryFailure(e); + return; + } + + operations.add(mapOperation); + if (res != null) { + state.put(keyHash, new MapEntry(key, value)); + if (deleted != null) { + deleted = false; + } + } + result.trySuccess(res); + }); + } + }); + return result; + } + protected RFuture putIfAbsentOperationAsync(K key, V value) { long threadId = Thread.currentThread().getId(); return putIfAbsentOperationAsync(key, value, new MapPutIfAbsentOperation(map, key, value, transactionId, threadId)); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java index f94c7613c..d3784c980 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java @@ -132,7 +132,13 @@ public class RedissonTransactionalMap extends RedissonMap { checkState(); return transactionalMap.addAndGetOperationAsync(key, value); } - + + @Override + protected RFuture putIfExistsOperationAsync(K key, V value) { + checkState(); + return transactionalMap.putIfExistsOperationAsync(key, value); + } + @Override protected RFuture putIfAbsentOperationAsync(K key, V value) { checkState(); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java index 7d6d2e35b..da3a9d987 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java @@ -182,6 +182,12 @@ public class RedissonTransactionalMapCache extends RedissonMapCache return transactionalMap.addAndGetOperationAsync(key, value); } + @Override + protected RFuture putIfExistsOperationAsync(K key, V value) { + checkState(); + return transactionalMap.putIfExistsOperationAsync(key, value); + } + @Override protected RFuture putIfAbsentOperationAsync(K key, V value) { checkState(); diff --git a/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutIfExistsOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutIfExistsOperation.java new file mode 100644 index 000000000..7b8a76196 --- /dev/null +++ b/redisson/src/main/java/org/redisson/transaction/operation/map/MapPutIfExistsOperation.java @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2013-2020 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.transaction.operation.map; + +import org.redisson.api.RMap; + +/** + * + * @author Nikita Koksharov + * + */ +public class MapPutIfExistsOperation extends MapOperation { + + public MapPutIfExistsOperation() { + } + + public MapPutIfExistsOperation(RMap map, Object key, Object value, String transactionId, long threadId) { + super(map, key, value, transactionId, threadId); + } + + @Override + public void commit(RMap map) { + map.putIfExistsAsync(key, value); + } + +} diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index 5f63121df..a9648b977 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -492,7 +492,22 @@ public abstract class BaseMapTest extends BaseTest { assertThat(map.get(key1)).isEqualTo(value1); destroy(map); } - + + @Test + public void testPutIfExists() throws Exception { + RMap map = getMap("simple"); + SimpleKey key = new SimpleKey("1"); + SimpleValue value = new SimpleValue("2"); + + assertThat(map.putIfExists(key, new SimpleValue("3"))).isNull(); + assertThat(map.get(key)).isNull(); + + map.put(key, value); + assertThat(map.putIfExists(key, new SimpleValue("3"))).isEqualTo(value); + assertThat(map.get(key)).isEqualTo(new SimpleValue("3")); + destroy(map); + } + @Test(timeout = 5000) public void testDeserializationErrorReturnsErrorImmediately() throws Exception { RMap map = getMap("deserializationFailure", new JsonJacksonCodec());