Feature - Add putIfExists() method to RMap object #3142

pull/3355/head
Nikita Koksharov 4 years ago
parent 58afd22cb6
commit d55f902a9d

@ -146,6 +146,23 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return cache.putIfAbsent(cacheKey, new CacheValue(key, value));
}
private CacheValue cachePutIfExists(CacheKey cacheKey, Object key, Object value) {
if (listener.isDisabled(cacheKey)) {
return null;
}
while (true) {
CacheValue v = cache.get(cacheKey);
if (v != null) {
if (cache.replace(cacheKey, v, new CacheValue(key, value))) {
return v;
}
} else {
return null;
}
}
}
private CacheValue cacheReplace(CacheKey cacheKey, Object key, Object value) {
if (listener.isDisabled(cacheKey)) {
return null;
@ -1138,6 +1155,34 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
}
@Override
public RFuture<V> putIfExistsAsync(K key, V value) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
CacheValue prevValue = cachePutIfExists(cacheKey, key, value);
if (prevValue != null) {
broadcastLocalCacheStore((V) value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture((V) prevValue.getValue());
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture(null);
}
}
RFuture<V> future = super.putIfExistsAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
return;
}
if (res != null) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
});
return future;
}
@Override
public RFuture<V> putIfAbsentAsync(K key, V value) {

@ -864,7 +864,37 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return commandExecutor.readAsync(getName(), codec, RedisCommands.HGETALL, getName());
}
@Override
public V putIfExists(K key, V value) {
return get(putIfExistsAsync(key, value));
}
@Override
public RFuture<V> putIfExistsAsync(K key, V value) {
checkKey(key);
checkValue(value);
RFuture<V> future = putIfExistsOperationAsync(key, value);
if (hasNoWriter()) {
return future;
}
MapWriterTask.Add task = new MapWriterTask.Add(key, value);
return mapWriterFuture(future, task, Objects::nonNull);
}
protected RFuture<V> putIfExistsOperationAsync(K key, V value) {
String name = getName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if value ~= false then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return value; "
+ "end; "
+ "return nil; ",
Collections.singletonList(name), encodeMapKey(key), encodeMapValue(value));
}
@Override
public V putIfAbsent(K key, V value) {
return get(putIfAbsentAsync(key, value));

@ -612,6 +612,90 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@Override
protected RFuture<V> putIfExistsOperationAsync(K key, V value) {
String name = getName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
+ "return nil;"
+ "end; "
+ "local maxSize = tonumber(redis.call('hget', KEYS[7], 'max-size'));"
+ "local lastAccessTimeSetName = KEYS[5]; "
+ "local currentTime = tonumber(ARGV[1]); "
+ "local t, val;"
+ "if value ~= false then "
+ "t, val = struct.unpack('dLc0', value); "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); "
+ "if expireIdle ~= false then "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then "
+ "if maxSize ~= nil and maxSize ~= 0 then " +
"local mode = redis.call('hget', KEYS[7], 'mode'); " +
"if mode == false or mode == 'LRU' then " +
"redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]); " +
"else " +
"redis.call('zincrby', lastAccessTimeSetName, 1, ARGV[2]); " +
"end; "
+ "end; "
+ "else "
+ "return nil; "
+ "end; "
+ "end; "
+ "local newValue = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[2], newValue); "
// last access time
+ "if maxSize ~= nil and maxSize ~= 0 then " +
"local mode = redis.call('hget', KEYS[7], 'mode'); " +
"if mode == false or mode == 'LRU' then " +
"redis.call('zadd', lastAccessTimeSetName, currentTime, ARGV[2]); " +
"end; " +
" local cacheSize = tonumber(redis.call('hlen', KEYS[1])); " +
" if cacheSize > maxSize then " +
" local lruItems = redis.call('zrange', lastAccessTimeSetName, 0, cacheSize - maxSize - 1); " +
" for index, lruItem in ipairs(lruItems) do " +
" if lruItem and lruItem ~= ARGV[2] then " +
" local lruItemValue = redis.call('hget', KEYS[1], lruItem); " +
" redis.call('hdel', KEYS[1], lruItem); " +
" redis.call('zrem', KEYS[2], lruItem); " +
" redis.call('zrem', KEYS[3], lruItem); " +
" redis.call('zrem', lastAccessTimeSetName, lruItem); " +
" if lruItemValue ~= false then " +
" local removedChannelName = KEYS[6]; " +
"local ttl, obj = struct.unpack('dLc0', lruItemValue);" +
" local msg = struct.pack('Lc0Lc0', string.len(lruItem), lruItem, string.len(obj), obj);" +
" redis.call('publish', removedChannelName, msg); " +
"end; " +
" end; " +
" end; " +
" end; " +
"if mode == 'LFU' then " +
"redis.call('zincrby', lastAccessTimeSetName, 1, ARGV[2]); " +
"end; " +
"end; "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3], string.len(val), val); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return val;",
Arrays.<Object>asList(name, getTimeoutSetName(name), getIdleSetName(name), getUpdatedChannelName(name),
getLastAccessTimeSetName(name), getRemovedChannelName(name), getOptionsName(name)),
System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@Override
protected RFuture<V> putIfAbsentOperationAsync(K key, V value) {
String name = getName(key);

@ -81,7 +81,7 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
V put(K key, V value);
/**
* Stores the specified <code>value</code> mapped by specified <code>key</code>
* Stores the specified <code>value</code> mapped by <code>key</code>
* only if there is no value with specified<code>key</code> stored before.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
@ -93,6 +93,19 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
*/
@Override
V putIfAbsent(K key, V value);
/**
* Stores the specified <code>value</code> mapped by <code>key</code>
* only if mapping already exists.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>null</code> if key is doesn't exists in the hash and value hasn't been set.
* Previous value if key already exists in the hash and new value has been stored.
*/
V putIfExists(K key, V value);
/**
* Returns <code>RMapReduce</code> object associated with this map

@ -363,4 +363,17 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
*/
RFuture<V> putIfAbsentAsync(K key, V value);
/**
* Stores the specified <code>value</code> mapped by <code>key</code>
* only if mapping already exists.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>null</code> if key is doesn't exists in the hash and value hasn't been set.
* Previous value if key already exists in the hash and new value has been stored.
*/
RFuture<V> putIfExistsAsync(K key, V value);
}

@ -334,6 +334,19 @@ public interface RMapReactive<K, V> extends RExpirableReactive {
*/
Mono<V> putIfAbsent(K key, V value);
/**
* Stores the specified <code>value</code> mapped by <code>key</code>
* only if mapping already exists.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>null</code> if key is doesn't exists in the hash and value hasn't been set.
* Previous value if key already exists in the hash and new value has been stored.
*/
Mono<V> putIfExists(K key, V value);
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is <code>10</code>.

@ -337,6 +337,19 @@ public interface RMapRx<K, V> extends RExpirableRx {
*/
Maybe<V> putIfAbsent(K key, V value);
/**
* Stores the specified <code>value</code> mapped by <code>key</code>
* only if mapping already exists.
* <p>
* If {@link MapWriter} is defined then new map entry is stored in write-through mode.
*
* @param key - map key
* @param value - map value
* @return <code>null</code> if key is doesn't exists in the hash and value hasn't been set.
* Previous value if key already exists in the hash and new value has been stored.
*/
Maybe<V> putIfExists(K key, V value);
/**
* Returns iterator over map entries collection.
* Map entries are loaded in batch. Batch size is <code>10</code>.

@ -47,15 +47,7 @@ import org.redisson.transaction.operation.DeleteOperation;
import org.redisson.transaction.operation.TouchOperation;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.UnlinkOperation;
import org.redisson.transaction.operation.map.MapAddAndGetOperation;
import org.redisson.transaction.operation.map.MapFastPutIfAbsentOperation;
import org.redisson.transaction.operation.map.MapFastPutOperation;
import org.redisson.transaction.operation.map.MapFastRemoveOperation;
import org.redisson.transaction.operation.map.MapOperation;
import org.redisson.transaction.operation.map.MapPutIfAbsentOperation;
import org.redisson.transaction.operation.map.MapPutOperation;
import org.redisson.transaction.operation.map.MapRemoveOperation;
import org.redisson.transaction.operation.map.MapReplaceOperation;
import org.redisson.transaction.operation.map.*;
import io.netty.buffer.ByteBuf;
@ -286,7 +278,54 @@ public class BaseTransactionalMap<K, V> {
});
return result;
}
protected RFuture<V> putIfExistsOperationAsync(K key, V value) {
long threadId = Thread.currentThread().getId();
return putIfExistsOperationAsync(key, value, new MapPutIfExistsOperation(map, key, value, transactionId, threadId));
}
protected RFuture<V> putIfExistsOperationAsync(K key, V value, MapOperation mapOperation) {
RPromise<V> result = new RedissonPromise<V>();
executeLocked(result, key, new Runnable() {
@Override
public void run() {
HashValue keyHash = toKeyHash(key);
MapEntry entry = state.get(keyHash);
if (entry != null) {
operations.add(mapOperation);
if (entry != MapEntry.NULL) {
state.put(keyHash, new MapEntry(key, value));
if (deleted != null) {
deleted = false;
}
result.trySuccess((V) entry.getValue());
} else {
result.trySuccess(null);
}
return;
}
map.getAsync(key).onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
operations.add(mapOperation);
if (res != null) {
state.put(keyHash, new MapEntry(key, value));
if (deleted != null) {
deleted = false;
}
}
result.trySuccess(res);
});
}
});
return result;
}
protected RFuture<V> putIfAbsentOperationAsync(K key, V value) {
long threadId = Thread.currentThread().getId();
return putIfAbsentOperationAsync(key, value, new MapPutIfAbsentOperation(map, key, value, transactionId, threadId));

@ -132,7 +132,13 @@ public class RedissonTransactionalMap<K, V> extends RedissonMap<K, V> {
checkState();
return transactionalMap.addAndGetOperationAsync(key, value);
}
@Override
protected RFuture<V> putIfExistsOperationAsync(K key, V value) {
checkState();
return transactionalMap.putIfExistsOperationAsync(key, value);
}
@Override
protected RFuture<V> putIfAbsentOperationAsync(K key, V value) {
checkState();

@ -182,6 +182,12 @@ public class RedissonTransactionalMapCache<K, V> extends RedissonMapCache<K, V>
return transactionalMap.addAndGetOperationAsync(key, value);
}
@Override
protected RFuture<V> putIfExistsOperationAsync(K key, V value) {
checkState();
return transactionalMap.putIfExistsOperationAsync(key, value);
}
@Override
protected RFuture<V> putIfAbsentOperationAsync(K key, V value) {
checkState();

@ -0,0 +1,39 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.transaction.operation.map;
import org.redisson.api.RMap;
/**
*
* @author Nikita Koksharov
*
*/
public class MapPutIfExistsOperation extends MapOperation {
public MapPutIfExistsOperation() {
}
public MapPutIfExistsOperation(RMap<?, ?> map, Object key, Object value, String transactionId, long threadId) {
super(map, key, value, transactionId, threadId);
}
@Override
public void commit(RMap<Object, Object> map) {
map.putIfExistsAsync(key, value);
}
}

@ -492,7 +492,22 @@ public abstract class BaseMapTest extends BaseTest {
assertThat(map.get(key1)).isEqualTo(value1);
destroy(map);
}
@Test
public void testPutIfExists() throws Exception {
RMap<SimpleKey, SimpleValue> map = getMap("simple");
SimpleKey key = new SimpleKey("1");
SimpleValue value = new SimpleValue("2");
assertThat(map.putIfExists(key, new SimpleValue("3"))).isNull();
assertThat(map.get(key)).isNull();
map.put(key, value);
assertThat(map.putIfExists(key, new SimpleValue("3"))).isEqualTo(value);
assertThat(map.get(key)).isEqualTo(new SimpleValue("3"));
destroy(map);
}
@Test(timeout = 5000)
public void testDeserializationErrorReturnsErrorImmediately() throws Exception {
RMap<String, SimpleObjectWithoutDefaultConstructor> map = getMap("deserializationFailure", new JsonJacksonCodec());

Loading…
Cancel
Save