From 31bf43b633a9734df5848e3f8c3ec0b017cda760 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 28 Jan 2022 09:22:41 +0300 Subject: [PATCH] refactoring --- .../org/redisson/RedissonLocalCachedMap.java | 290 ++++++------------ .../misc/CompletableFutureWrapper.java | 4 + 2 files changed, 95 insertions(+), 199 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 31153f0c5..78a18867b 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -36,16 +36,12 @@ import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; import java.io.IOException; import java.math.BigDecimal; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; + @SuppressWarnings("serial") public class RedissonLocalCachedMap extends RedissonMap implements RLocalCachedMap { @@ -192,7 +188,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public RFuture sizeAsync() { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - return RedissonPromise.newSucceededFuture(cache.size()); + return new CompletableFutureWrapper<>(cache.size()); } return super.sizeAsync(); } @@ -206,7 +202,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R if (cacheValue == null) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (hasNoLoader()) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } CompletableFuture future = loadValue((K) key, false); @@ -228,7 +224,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return containsKeyAsync(key, promise); } - return RedissonPromise.newSucceededFuture(cacheValue.getValue() != null); + return new CompletableFutureWrapper<>(cacheValue.getValue() != null); } @Override @@ -238,12 +234,12 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheValue cacheValue = new CacheValue(null, value); if (!cache.containsValue(cacheValue)) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } return super.containsValueAsync(value); } - return RedissonPromise.newSucceededFuture(true); + return new CompletableFutureWrapper<>(true); } @Override @@ -253,12 +249,12 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheKey cacheKey = localCacheView.toCacheKey(key); CacheValue cacheValue = cache.get(cacheKey); if (cacheValue != null && (storeCacheMiss || cacheValue.getValue() != null)) { - return RedissonPromise.newSucceededFuture((V) cacheValue.getValue()); + return new CompletableFutureWrapper<>((V) cacheValue.getValue()); } if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (hasNoLoader()) { - return RedissonPromise.newSucceededFuture(null); + return new CompletableFutureWrapper<>(null); } CompletableFuture future = loadValue((K) key, false); @@ -271,20 +267,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return new CompletableFutureWrapper<>(f); } - RPromise result = new RedissonPromise<>(); RFuture future = super.getAsync((K) key); - future.onComplete((value, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage result = future.thenApply(value -> { if (storeCacheMiss || value != null) { cachePut(cacheKey, key, value); } - result.trySuccess(value); + return value; }); - return result; + return new CompletableFutureWrapper<>(result); } protected static byte[] generateLogEntryId(byte[] keyHash) { @@ -311,7 +301,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R if (prevValue != null) { val = (V) prevValue.getValue(); } - return RedissonPromise.newSucceededFuture(val); + return new CompletableFutureWrapper<>(val); } ByteBuf mapValue = encodeMapValue(value); @@ -347,7 +337,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheValue prevValue = cachePut(cacheKey, key, value); broadcastLocalCacheStore(value, encodedKey, cacheKey); if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - return RedissonPromise.newSucceededFuture(prevValue == null); + return new CompletableFutureWrapper<>(prevValue == null); } ByteBuf encodedValue = encodeMapValue(value); @@ -391,7 +381,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R if (value != null) { val = (V) value.getValue(); } - return RedissonPromise.newSucceededFuture(val); + return new CompletableFutureWrapper<>(val); } byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); @@ -416,7 +406,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override protected RFuture> fastRemoveOperationBatchAsync(@SuppressWarnings("unchecked") K... keys) { if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - return RedissonPromise.newSucceededFuture(Collections.emptyList()); + return new CompletableFutureWrapper<>(Collections.emptyList()); } if (invalidateEntryOnChange == 1) { @@ -510,7 +500,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R listener.getInvalidationTopic().publishAsync(msg); } } - return RedissonPromise.newSucceededFuture(count); + return new CompletableFutureWrapper<>(count); } if (invalidateEntryOnChange == 1) { @@ -604,7 +594,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R @Override public RFuture> getAllAsync(Set keys) { if (keys.isEmpty()) { - return RedissonPromise.newSucceededFuture(Collections.emptyMap()); + return new CompletableFutureWrapper<>(Collections.emptyMap()); } Map result = new HashMap(); @@ -619,44 +609,29 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } } - RPromise> promise = new RedissonPromise<>(); if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { if (hasNoLoader()) { - return RedissonPromise.newSucceededFuture(result); + return new CompletableFutureWrapper<>(result); } Set newKeys = new HashSet<>(keys); newKeys.removeAll(result.keySet()); if (!newKeys.isEmpty()) { - loadAllAsync(newKeys, false, 1, result).onComplete((r, ex) -> { - if (ex != null) { - promise.tryFailure(ex); - return; - } - promise.trySuccess(result); - }); - } else { - promise.trySuccess(result); + CompletionStage> f = loadAllAsync(newKeys, false, 1, result) + .thenApply(r -> result); + return new CompletableFutureWrapper<>(f); } - - return promise; + return new CompletableFutureWrapper<>(result); } RFuture> future = super.getAllAsync(mapKeys); - future.onComplete((map, e) -> { - if (e != null) { - promise.tryFailure(e); - return; - } - + CompletionStage> f = future.thenApply(map -> { result.putAll(map); - cacheMap(map); - - promise.trySuccess(result); + return result; }); - return promise; + return new CompletableFutureWrapper<>(f); } private void cacheMap(Map map) { @@ -676,7 +651,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R broadcastLocalCacheStore(entry.getValue(), keyEncoded, cacheKey); } - return RedissonPromise.newSucceededFuture(null); + return new CompletableFutureWrapper<>(null); } List params = new ArrayList(map.size()*3); @@ -722,7 +697,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R params.add(msgEncoded); } - RPromise result = new RedissonPromise(); RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID, "for i=3, tonumber(ARGV[2]) + 2, 5000 do " + "redis.call('hmset', KEYS[1], unpack(ARGV, i, math.min(i+4999, tonumber(ARGV[2]) + 2))); " @@ -736,19 +710,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end; " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + "end;", - Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), + Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), params.toArray()); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { cacheMap(map); - result.trySuccess(null); + return null; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -758,7 +727,6 @@ public class RedissonLocalCachedMap extends RedissonMap implements R ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); - RPromise result = new RedissonPromise<>(); RFuture future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, new RedisCommand("EVAL", new NumberConvertor(value.getClass())), "local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); " + "if ARGV[3] == '1' then " @@ -772,19 +740,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()), keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { if (res != null) { CacheKey cKey = localCacheView.toCacheKey(key); cachePut(cKey, key, res); } - result.trySuccess(res); + return res; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -795,28 +758,22 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value); if (prevValue == null) { broadcastLocalCacheStore(value, mapKey, cacheKey); - return RedissonPromise.newSucceededFuture(true); + return new CompletableFutureWrapper<>(true); } else { mapKey.release(); - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } } - RPromise result = new RedissonPromise<>(); RFuture future = super.fastPutIfAbsentAsync(key, value); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { if (res) { CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } - result.trySuccess(res); + return res; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -827,28 +784,22 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheValue prevValue = cachePutIfExists(cacheKey, key, value); if (prevValue != null) { broadcastLocalCacheStore(value, mapKey, cacheKey); - return RedissonPromise.newSucceededFuture(true); + return new CompletableFutureWrapper<>(true); } else { mapKey.release(); - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } } - RPromise result = new RedissonPromise<>(); RFuture future = super.fastPutIfExistsAsync(key, value); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { if (res) { CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } - result.trySuccess(res); + return res; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -865,10 +816,9 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - return RedissonPromise.newSucceededFuture(result); + return new CompletableFutureWrapper<>(result); } - RPromise> promise = new RedissonPromise>(); RFuture> future = commandExecutor.evalReadAsync(getRawName(), codec, ALL_VALUES, "local entries = redis.call('hgetall', KEYS[1]); " + "local result = {};" @@ -886,20 +836,14 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "end; " + "end; " + "return result; ", - Arrays.asList(getRawName()), + Arrays.asList(getRawName()), mapKeys.toArray()); - - future.onComplete((res, e) -> { - if (e != null) { - promise.tryFailure(e); - return; - } - + + CompletionStage> f = future.thenApply(res -> { result.addAll(res); - promise.trySuccess(result); + return result; }); - - return promise; + return new CompletableFutureWrapper<>(f); } @Override @@ -915,27 +859,20 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - return RedissonPromise.newSucceededFuture(result); + return new CompletableFutureWrapper<>(result); } - RPromise> promise = new RedissonPromise>(); RFuture> future = readAll(ALL_MAP, mapKeys, result); - - future.onComplete((res, e) -> { - if (e != null) { - promise.tryFailure(e); - return; - } - - for (java.util.Map.Entry entry : res.entrySet()) { + + CompletionStage> f = future.thenApply(res -> { + for (Entry entry : res.entrySet()) { CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey()); cachePut(cacheKey, entry.getKey(), entry.getValue()); } result.putAll(res); - promise.trySuccess(result); + return result; }); - - return promise; + return new CompletableFutureWrapper<>(f); } @Override @@ -978,27 +915,19 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) { - return RedissonPromise.newSucceededFuture(result); + return new CompletableFutureWrapper<>(result); } - RPromise>> promise = new RedissonPromise>>(); RFuture>> future = readAll(ALL_ENTRIES, mapKeys, result); - - future.onComplete((res, e) -> { - if (e != null) { - promise.tryFailure(e); - return; - } - - for (java.util.Map.Entry entry : res) { + CompletionStage>> f = future.thenApply(res -> { + for (Entry entry : res) { CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey()); cachePut(cacheKey, entry.getKey(), entry.getValue()); } result.addAll(res); - promise.trySuccess(result); + return result; }); - - return promise; + return new CompletableFutureWrapper<>(f); } private RFuture readAll(RedisCommand evalCommandType, List mapKeys, R result) { @@ -1032,29 +961,22 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheValue prevValue = cacheReplace(cacheKey, key, value); if (prevValue != null) { broadcastLocalCacheStore(value, mapKey, cacheKey); - return RedissonPromise.newSucceededFuture(true); + return new CompletableFutureWrapper<>(true); } else { mapKey.release(); - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } } - RPromise result = new RedissonPromise<>(); RFuture future = super.fastReplaceAsync(key, value); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { if (res) { CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } - result.trySuccess(res); + return res; }); - - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -1122,29 +1044,22 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheValue prevValue = cacheReplace(cacheKey, key, value); if (prevValue != null) { broadcastLocalCacheStore(value, mapKey, cacheKey); - return RedissonPromise.newSucceededFuture((V) prevValue.getValue()); + return new CompletableFutureWrapper<>((V) prevValue.getValue()); } else { mapKey.release(); - return RedissonPromise.newSucceededFuture(null); + return new CompletableFutureWrapper<>(null); } } - RPromise result = new RedissonPromise<>(); RFuture future = super.replaceAsync(key, value); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { if (res != null) { CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } - result.trySuccess(res); + return res; }); - - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -1181,29 +1096,22 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheKey cacheKey = localCacheView.toCacheKey(mapKey); if (cacheReplace(cacheKey, key, oldValue, newValue)) { broadcastLocalCacheStore(newValue, mapKey, cacheKey); - return RedissonPromise.newSucceededFuture(true); + return new CompletableFutureWrapper<>(true); } else { mapKey.release(); - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } } - RPromise result = new RedissonPromise<>(); RFuture future = super.replaceAsync(key, oldValue, newValue); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { if (res) { CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, newValue); } - result.trySuccess(res); + return res; }); - - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -1239,26 +1147,22 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheKey cacheKey = localCacheView.toCacheKey(mapKey); if (cacheRemove(cacheKey, key, value)) { broadcastLocalCacheStore((V) value, mapKey, cacheKey); - return RedissonPromise.newSucceededFuture(true); + return new CompletableFutureWrapper<>(true); } else { mapKey.release(); - return RedissonPromise.newSucceededFuture(false); + return new CompletableFutureWrapper<>(false); } } RFuture future = super.removeAsync(key, value); - - future.onComplete((res, e) -> { - if (e != null) { - return; - } - + CompletionStage f = future.thenApply(res -> { if (res) { CacheKey cacheKey = localCacheView.toCacheKey(key); cache.remove(cacheKey); } + return res; }); - return future; + return new CompletableFutureWrapper<>(f); } @Override @@ -1269,28 +1173,22 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheValue prevValue = cachePutIfExists(cacheKey, key, value); if (prevValue != null) { broadcastLocalCacheStore((V) value, mapKey, cacheKey); - return RedissonPromise.newSucceededFuture((V) prevValue.getValue()); + return new CompletableFutureWrapper<>((V) prevValue.getValue()); } else { mapKey.release(); - return RedissonPromise.newSucceededFuture(null); + return new CompletableFutureWrapper<>(null); } } - RPromise result = new RedissonPromise<>(); RFuture future = super.putIfExistsAsync(key, value); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { if (res != null) { CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } - result.trySuccess(res); + return res; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override @@ -1301,28 +1199,22 @@ public class RedissonLocalCachedMap extends RedissonMap implements R CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value); if (prevValue == null) { broadcastLocalCacheStore((V) value, mapKey, cacheKey); - return RedissonPromise.newSucceededFuture(null); + return new CompletableFutureWrapper<>(null); } else { mapKey.release(); - return RedissonPromise.newSucceededFuture((V) prevValue.getValue()); + return new CompletableFutureWrapper<>((V) prevValue.getValue()); } } - RPromise result = new RedissonPromise<>(); RFuture future = super.putIfAbsentAsync(key, value); - future.onComplete((res, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - + CompletionStage f = future.thenApply(res -> { if (res == null) { CacheKey cacheKey = localCacheView.toCacheKey(key); cachePut(cacheKey, key, value); } - result.trySuccess(res); + return res; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override diff --git a/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java b/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java index 0c76ef880..ddcfe587c 100644 --- a/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java +++ b/redisson/src/main/java/org/redisson/misc/CompletableFutureWrapper.java @@ -34,6 +34,10 @@ public class CompletableFutureWrapper implements RFuture { private final CompletableFuture future; private CompletableFuture lastFuture; + public CompletableFutureWrapper(V value) { + this(CompletableFuture.completedFuture(value)); + } + public CompletableFutureWrapper(CompletionStage stage) { this.future = stage.toCompletableFuture(); this.lastFuture = future;