|
|
|
@ -821,26 +821,43 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
|
|
|
|
|
});
|
|
|
|
|
return new CompletableFutureWrapper<>(f);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value) {
|
|
|
|
|
protected RFuture<Boolean> fastPutIfAbsentOperationAsync(K key, V value) {
|
|
|
|
|
ByteBuf encodedKey = encodeMapKey(key);
|
|
|
|
|
CacheKey cacheKey = localCacheView.toCacheKey(encodedKey);
|
|
|
|
|
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
|
|
|
|
|
ByteBuf mapKey = encodeMapKey(key);
|
|
|
|
|
CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
|
|
|
|
|
CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value);
|
|
|
|
|
if (prevValue == null) {
|
|
|
|
|
broadcastLocalCacheStore(value, mapKey, cacheKey);
|
|
|
|
|
broadcastLocalCacheStore(value, encodedKey, cacheKey);
|
|
|
|
|
return new CompletableFutureWrapper<>(true);
|
|
|
|
|
} else {
|
|
|
|
|
mapKey.release();
|
|
|
|
|
encodedKey.release();
|
|
|
|
|
return new CompletableFutureWrapper<>(false);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RFuture<Boolean> future = super.fastPutIfAbsentAsync(key, value);
|
|
|
|
|
|
|
|
|
|
ByteBuf encodedValue = encodeMapValue(value);
|
|
|
|
|
ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey);
|
|
|
|
|
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
|
|
|
|
|
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
|
"local value = redis.call('hget', KEYS[1], ARGV[1]); "
|
|
|
|
|
+ "if value ~= false then "
|
|
|
|
|
+ "return 0; "
|
|
|
|
|
+ "end; "
|
|
|
|
|
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
|
|
|
|
|
+ "if ARGV[4] == '1' then "
|
|
|
|
|
+ "redis.call(ARGV[7], KEYS[2], ARGV[3]); "
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "if ARGV[4] == '2' then "
|
|
|
|
|
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
|
|
|
|
|
+ "redis.call(ARGV[7], KEYS[2], ARGV[3]); "
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "return 1; ",
|
|
|
|
|
Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
|
|
|
|
|
encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId, publishCommand);
|
|
|
|
|
CompletionStage<Boolean> f = future.thenApply(res -> {
|
|
|
|
|
if (res) {
|
|
|
|
|
CacheKey cacheKey = localCacheView.toCacheKey(key);
|
|
|
|
|
cachePut(cacheKey, key, value);
|
|
|
|
|
}
|
|
|
|
|
return res;
|
|
|
|
@ -865,16 +882,23 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
|
|
|
|
|
|
|
|
|
|
ByteBuf encodedValue = encodeMapValue(value);
|
|
|
|
|
ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey);
|
|
|
|
|
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
|
|
|
|
|
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
|
|
|
|
|
"local value = redis.call('hget', KEYS[1], ARGV[1]); "
|
|
|
|
|
+ "if value ~= false then "
|
|
|
|
|
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
|
|
|
|
|
+ "redis.call(ARGV[4], KEYS[2], ARGV[3]); "
|
|
|
|
|
+ "return 1; "
|
|
|
|
|
+ "if value == false then "
|
|
|
|
|
+ "return 0; "
|
|
|
|
|
+ "end; "
|
|
|
|
|
+ "return 0; ",
|
|
|
|
|
Arrays.asList(getRawName(), listener.getInvalidationTopicName()),
|
|
|
|
|
encodedKey, encodedValue, msg, publishCommand);
|
|
|
|
|
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
|
|
|
|
|
+ "if ARGV[4] == '1' then "
|
|
|
|
|
+ "redis.call(ARGV[7], KEYS[2], ARGV[3]); "
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "if ARGV[4] == '2' then "
|
|
|
|
|
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
|
|
|
|
|
+ "redis.call(ARGV[7], KEYS[2], ARGV[3]); "
|
|
|
|
|
+ "end;"
|
|
|
|
|
+ "return 1; ",
|
|
|
|
|
Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
|
|
|
|
|
encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId, publishCommand);
|
|
|
|
|
CompletionStage<Boolean> f = future.thenApply(res -> {
|
|
|
|
|
if (res) {
|
|
|
|
|
cachePut(cacheKey, key, value);
|
|
|
|
|