Fixed - get operation before put may cause inconsistent state of local cache. #3978

pull/4031/head
Nikita Koksharov 3 years ago
parent 70a55a1cc3
commit 5b9a3b65d3

@ -268,8 +268,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
RPromise<V> result = new RedissonPromise<>();
loadValue((K) key, result, false);
result.onComplete((value, ex) -> {
RPromise<V> valuePromise = new RedissonPromise<>();
loadValue((K) key, valuePromise, false);
valuePromise.onComplete((value, ex) -> {
if (ex != null) {
result.tryFailure(ex);
return;
@ -278,21 +279,25 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (storeCacheMiss || value != null) {
cachePut(cacheKey, key, value);
}
result.trySuccess(value);
});
return result;
}
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.getAsync((K) key);
future.onComplete((value, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (storeCacheMiss || value != null) {
cachePut(cacheKey, key, value);
}
result.trySuccess(value);
});
return future;
return result;
}
protected static byte[] generateLogEntryId(byte[] keyHash) {
@ -765,6 +770,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(keyState);
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
"local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
@ -779,12 +786,18 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
if (res != null) {
CacheKey cKey = localCacheView.toCacheKey(key);
cachePut(cKey, key, res);
}
result.trySuccess(res);
});
return future;
return result;
}
@Override
@ -802,9 +815,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> future = super.fastPutIfAbsentAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
@ -812,8 +827,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
});
return future;
return result;
}
@Override
@ -831,9 +847,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> future = super.fastPutIfExistsAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
@ -841,8 +859,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
});
return future;
return result;
}
@Override
@ -1033,9 +1052,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> future = super.fastReplaceAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
@ -1043,9 +1064,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
});
return future;
return result;
}
@Override
@ -1120,9 +1142,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.replaceAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
@ -1130,9 +1154,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
});
return future;
return result;
}
@Override
@ -1176,9 +1201,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> future = super.replaceAsync(key, oldValue, newValue);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
@ -1186,9 +1213,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, newValue);
}
result.trySuccess(res);
});
return future;
return result;
}
@Override
@ -1261,9 +1289,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.putIfExistsAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
@ -1271,8 +1301,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
});
return future;
return result;
}
@Override
@ -1290,9 +1321,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.putIfAbsentAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
@ -1300,8 +1333,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
});
return future;
return result;
}
@Override

@ -706,7 +706,23 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
assertThat(cache.size()).isEqualTo(3);
assertThat(cache1.size()).isEqualTo(3);
}
@Test
public void testGetBeforePut() {
RLocalCachedMap<String, String> map1 = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
for (int i = 0; i < 1_000; i++) {
map1.put("key" + i, "val");
}
RMap<String, String> map2 = redisson.getLocalCachedMap("test", LocalCachedMapOptions.defaults());
for (int i = 0; i < 1_000; i++) {
map2.get("key" + i);
map2.put("key" + i, "value" + i);
String cachedValue = map2.get("key" + i);
assertThat(cachedValue).isEqualTo("value" + i);
}
}
@Test
public void testAddAndGet() throws InterruptedException {
RLocalCachedMap<Integer, Integer> map = redisson.getLocalCachedMap("getAll", new CompositeCodec(redisson.getConfig().getCodec(), IntegerCodec.INSTANCE), LocalCachedMapOptions.defaults());

Loading…
Cancel
Save