diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index b4322785b..71b2c2b65 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -821,26 +821,43 @@ public class RedissonLocalCachedMap extends RedissonMap implements R }); return new CompletableFutureWrapper<>(f); } - + @Override - public RFuture fastPutIfAbsentAsync(K key, V value) { + protected RFuture fastPutIfAbsentOperationAsync(K key, V value) { + ByteBuf encodedKey = encodeMapKey(key); + CacheKey cacheKey = localCacheView.toCacheKey(encodedKey); if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - ByteBuf mapKey = encodeMapKey(key); - CacheKey cacheKey = localCacheView.toCacheKey(mapKey); CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value); if (prevValue == null) { - broadcastLocalCacheStore(value, mapKey, cacheKey); + broadcastLocalCacheStore(value, encodedKey, cacheKey); return new CompletableFutureWrapper<>(true); } else { - mapKey.release(); + encodedKey.release(); return new CompletableFutureWrapper<>(false); } } - - RFuture future = super.fastPutIfAbsentAsync(key, value); + + ByteBuf encodedValue = encodeMapValue(value); + ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); + RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('hget', KEYS[1], ARGV[1]); " + + "if value ~= false then " + + "return 0; " + + "end; " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "if ARGV[4] == '1' then " + + "redis.call(ARGV[7], KEYS[2], ARGV[3]); " + + "end;" + + "if ARGV[4] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call(ARGV[7], KEYS[2], ARGV[3]); " + + "end;" + + "return 1; ", + Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), + encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId, publishCommand); CompletionStage f = future.thenApply(res -> { if (res) { - CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } return res; @@ -849,24 +866,41 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture fastPutIfExistsAsync(K key, V value) { + protected RFuture fastPutIfExistsOperationAsync(K key, V value) { + ByteBuf encodedKey = encodeMapKey(key); + CacheKey cacheKey = localCacheView.toCacheKey(encodedKey); if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - ByteBuf mapKey = encodeMapKey(key); - CacheKey cacheKey = localCacheView.toCacheKey(mapKey); CacheValue prevValue = cachePutIfExists(cacheKey, key, value); if (prevValue != null) { - broadcastLocalCacheStore(value, mapKey, cacheKey); + broadcastLocalCacheStore(value, encodedKey, cacheKey); return new CompletableFutureWrapper<>(true); } else { - mapKey.release(); + encodedKey.release(); return new CompletableFutureWrapper<>(false); } } - - RFuture future = super.fastPutIfExistsAsync(key, value); + + ByteBuf encodedValue = encodeMapValue(value); + ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); + RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('hget', KEYS[1], ARGV[1]); " + + "if value == false then " + + "return 0; " + + "end; " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "if ARGV[4] == '1' then " + + "redis.call(ARGV[7], KEYS[2], ARGV[3]); " + + "end;" + + "if ARGV[4] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call(ARGV[7], KEYS[2], ARGV[3]); " + + "end;" + + "return 1; ", + Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), + encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId, publishCommand); CompletionStage f = future.thenApply(res -> { if (res) { - CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } return res;