From 678a8ff8b8cf12519390fea789734b52f0c2de67 Mon Sep 17 00:00:00 2001 From: seakider Date: Mon, 3 Mar 2025 21:18:06 +0800 Subject: [PATCH 1/2] Fixed - RLocalCachedMap sync strategy doesn't work with fastPutIfExists method Signed-off-by: seakider --- .../org/redisson/RedissonLocalCachedMap.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index b4322785b..07b96388f 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -849,24 +849,34 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } @Override - public RFuture fastPutIfExistsAsync(K key, V value) { + protected RFuture fastPutIfExistsOperationAsync(K key, V value) { + ByteBuf encodedKey = encodeMapKey(key); + CacheKey cacheKey = localCacheView.toCacheKey(encodedKey); if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - ByteBuf mapKey = encodeMapKey(key); - CacheKey cacheKey = localCacheView.toCacheKey(mapKey); CacheValue prevValue = cachePutIfExists(cacheKey, key, value); if (prevValue != null) { - broadcastLocalCacheStore(value, mapKey, cacheKey); + broadcastLocalCacheStore(value, encodedKey, cacheKey); return new CompletableFutureWrapper<>(true); } else { - mapKey.release(); + encodedKey.release(); return new CompletableFutureWrapper<>(false); } } - - RFuture future = super.fastPutIfExistsAsync(key, value); + + ByteBuf encodedValue = encodeMapValue(value); + ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey); + RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('hget', KEYS[1], ARGV[1]); " + + "if value ~= false then " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "redis.call(ARGV[4], KEYS[2], ARGV[3]); " + + "return 1; " + + "end; " + + "return 0; ", + Arrays.asList(getRawName(), listener.getInvalidationTopicName()), + encodedKey, encodedValue, msg, publishCommand); CompletionStage f = future.thenApply(res -> { if (res) { - CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } return res; From 46a7d117c9a37d3bd43ca7df4b0955a4ad1fcaa1 Mon Sep 17 00:00:00 2001 From: seakider Date: Mon, 3 Mar 2025 22:18:25 +0800 Subject: [PATCH 2/2] Fixed - check value of invalidateEntryOnChange and fix fastPutIfAbsent method Signed-off-by: seakider --- .../org/redisson/RedissonLocalCachedMap.java | 56 +++++++++++++------ 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 07b96388f..71b2c2b65 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -821,26 +821,43 @@ public class RedissonLocalCachedMap extends RedissonMap implements R }); return new CompletableFutureWrapper<>(f); } - + @Override - public RFuture fastPutIfAbsentAsync(K key, V value) { + protected RFuture fastPutIfAbsentOperationAsync(K key, V value) { + ByteBuf encodedKey = encodeMapKey(key); + CacheKey cacheKey = localCacheView.toCacheKey(encodedKey); if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - ByteBuf mapKey = encodeMapKey(key); - CacheKey cacheKey = localCacheView.toCacheKey(mapKey); CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value); if (prevValue == null) { - broadcastLocalCacheStore(value, mapKey, cacheKey); + broadcastLocalCacheStore(value, encodedKey, cacheKey); return new CompletableFutureWrapper<>(true); } else { - mapKey.release(); + encodedKey.release(); return new CompletableFutureWrapper<>(false); } } - - RFuture future = super.fastPutIfAbsentAsync(key, value); + + ByteBuf encodedValue = encodeMapValue(value); + ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); + RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + "local value = redis.call('hget', KEYS[1], ARGV[1]); " + + "if value ~= false then " + + "return 0; " + + "end; " + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "if ARGV[4] == '1' then " + + "redis.call(ARGV[7], KEYS[2], ARGV[3]); " + + "end;" + + "if ARGV[4] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call(ARGV[7], KEYS[2], ARGV[3]); " + + "end;" + + "return 1; ", + Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), + encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId, publishCommand); CompletionStage f = future.thenApply(res -> { if (res) { - CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } return res; @@ -865,16 +882,23 @@ public class RedissonLocalCachedMap extends RedissonMap implements R ByteBuf encodedValue = encodeMapValue(value); ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[1]); " - + "if value ~= false then " - + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " - + "redis.call(ARGV[4], KEYS[2], ARGV[3]); " - + "return 1; " + + "if value == false then " + + "return 0; " + "end; " - + "return 0; ", - Arrays.asList(getRawName(), listener.getInvalidationTopicName()), - encodedKey, encodedValue, msg, publishCommand); + + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "if ARGV[4] == '1' then " + + "redis.call(ARGV[7], KEYS[2], ARGV[3]); " + + "end;" + + "if ARGV[4] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call(ARGV[7], KEYS[2], ARGV[3]); " + + "end;" + + "return 1; ", + Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), + encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId, publishCommand); CompletionStage f = future.thenApply(res -> { if (res) { cachePut(cacheKey, key, value);