Merge pull request #6469 from seakider/fix_localcachemap_fastputifexist

Fixed - RLocalCachedMap sync strategy doesn't work with fastPutIfExist method
pull/6471/head
Nikita Koksharov 1 week ago committed by GitHub
commit a2a4f48c69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -821,26 +821,43 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
});
return new CompletableFutureWrapper<>(f);
}
@Override
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value) {
protected RFuture<Boolean> fastPutIfAbsentOperationAsync(K key, V value) {
ByteBuf encodedKey = encodeMapKey(key);
CacheKey cacheKey = localCacheView.toCacheKey(encodedKey);
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value);
if (prevValue == null) {
broadcastLocalCacheStore(value, mapKey, cacheKey);
broadcastLocalCacheStore(value, encodedKey, cacheKey);
return new CompletableFutureWrapper<>(true);
} else {
mapKey.release();
encodedKey.release();
return new CompletableFutureWrapper<>(false);
}
}
RFuture<Boolean> future = super.fastPutIfAbsentAsync(key, value);
ByteBuf encodedValue = encodeMapValue(value);
ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if value ~= false then "
+ "return 0; "
+ "end; "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[4] == '1' then "
+ "redis.call(ARGV[7], KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call(ARGV[7], KEYS[2], ARGV[3]); "
+ "end;"
+ "return 1; ",
Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId, publishCommand);
CompletionStage<Boolean> f = future.thenApply(res -> {
if (res) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
return res;
@ -849,24 +866,41 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
@Override
public RFuture<Boolean> fastPutIfExistsAsync(K key, V value) {
protected RFuture<Boolean> fastPutIfExistsOperationAsync(K key, V value) {
ByteBuf encodedKey = encodeMapKey(key);
CacheKey cacheKey = localCacheView.toCacheKey(encodedKey);
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
ByteBuf mapKey = encodeMapKey(key);
CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
CacheValue prevValue = cachePutIfExists(cacheKey, key, value);
if (prevValue != null) {
broadcastLocalCacheStore(value, mapKey, cacheKey);
broadcastLocalCacheStore(value, encodedKey, cacheKey);
return new CompletableFutureWrapper<>(true);
} else {
mapKey.release();
encodedKey.release();
return new CompletableFutureWrapper<>(false);
}
}
RFuture<Boolean> future = super.fastPutIfExistsAsync(key, value);
ByteBuf encodedValue = encodeMapValue(value);
ByteBuf msg = createSyncMessage(encodedKey, encodedValue, cacheKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if value == false then "
+ "return 0; "
+ "end; "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[4] == '1' then "
+ "redis.call(ARGV[7], KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call(ARGV[7], KEYS[2], ARGV[3]); "
+ "end;"
+ "return 1; ",
Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId, publishCommand);
CompletionStage<Boolean> f = future.thenApply(res -> {
if (res) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
return res;

Loading…
Cancel
Save