refactoring

pull/4104/head
Nikita Koksharov 3 years ago
parent 2c25d3241d
commit 31bf43b633

@ -36,16 +36,12 @@ import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
@SuppressWarnings("serial")
public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements RLocalCachedMap<K, V> {
@ -192,7 +188,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<Integer> sizeAsync() {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return RedissonPromise.newSucceededFuture(cache.size());
return new CompletableFutureWrapper<>(cache.size());
}
return super.sizeAsync();
}
@ -206,7 +202,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (cacheValue == null) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
if (hasNoLoader()) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
CompletableFuture<V> future = loadValue((K) key, false);
@ -228,7 +224,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return containsKeyAsync(key, promise);
}
return RedissonPromise.newSucceededFuture(cacheValue.getValue() != null);
return new CompletableFutureWrapper<>(cacheValue.getValue() != null);
}
@Override
@ -238,12 +234,12 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue cacheValue = new CacheValue(null, value);
if (!cache.containsValue(cacheValue)) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
return super.containsValueAsync(value);
}
return RedissonPromise.newSucceededFuture(true);
return new CompletableFutureWrapper<>(true);
}
@Override
@ -253,12 +249,12 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(key);
CacheValue cacheValue = cache.get(cacheKey);
if (cacheValue != null && (storeCacheMiss || cacheValue.getValue() != null)) {
return RedissonPromise.newSucceededFuture((V) cacheValue.getValue());
return new CompletableFutureWrapper<>((V) cacheValue.getValue());
}
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
if (hasNoLoader()) {
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>(null);
}
CompletableFuture<V> future = loadValue((K) key, false);
@ -271,20 +267,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return new CompletableFutureWrapper<>(f);
}
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.getAsync((K) key);
future.onComplete((value, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<V> result = future.thenApply(value -> {
if (storeCacheMiss || value != null) {
cachePut(cacheKey, key, value);
}
result.trySuccess(value);
return value;
});
return result;
return new CompletableFutureWrapper<>(result);
}
protected static byte[] generateLogEntryId(byte[] keyHash) {
@ -311,7 +301,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (prevValue != null) {
val = (V) prevValue.getValue();
}
return RedissonPromise.newSucceededFuture(val);
return new CompletableFutureWrapper<>(val);
}
ByteBuf mapValue = encodeMapValue(value);
@ -347,7 +337,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue prevValue = cachePut(cacheKey, key, value);
broadcastLocalCacheStore(value, encodedKey, cacheKey);
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return RedissonPromise.newSucceededFuture(prevValue == null);
return new CompletableFutureWrapper<>(prevValue == null);
}
ByteBuf encodedValue = encodeMapValue(value);
@ -391,7 +381,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (value != null) {
val = (V) value.getValue();
}
return RedissonPromise.newSucceededFuture(val);
return new CompletableFutureWrapper<>(val);
}
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
@ -416,7 +406,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
protected RFuture<List<Long>> fastRemoveOperationBatchAsync(@SuppressWarnings("unchecked") K... keys) {
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return RedissonPromise.newSucceededFuture(Collections.emptyList());
return new CompletableFutureWrapper<>(Collections.emptyList());
}
if (invalidateEntryOnChange == 1) {
@ -510,7 +500,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
listener.getInvalidationTopic().publishAsync(msg);
}
}
return RedissonPromise.newSucceededFuture(count);
return new CompletableFutureWrapper<>(count);
}
if (invalidateEntryOnChange == 1) {
@ -604,7 +594,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<Map<K, V>> getAllAsync(Set<K> keys) {
if (keys.isEmpty()) {
return RedissonPromise.newSucceededFuture(Collections.<K, V>emptyMap());
return new CompletableFutureWrapper<>(Collections.<K, V>emptyMap());
}
Map<K, V> result = new HashMap<K, V>();
@ -619,44 +609,29 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
}
RPromise<Map<K, V>> promise = new RedissonPromise<>();
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
if (hasNoLoader()) {
return RedissonPromise.newSucceededFuture(result);
return new CompletableFutureWrapper<>(result);
}
Set<K> newKeys = new HashSet<>(keys);
newKeys.removeAll(result.keySet());
if (!newKeys.isEmpty()) {
loadAllAsync(newKeys, false, 1, result).onComplete((r, ex) -> {
if (ex != null) {
promise.tryFailure(ex);
return;
}
promise.trySuccess(result);
});
} else {
promise.trySuccess(result);
CompletionStage<Map<K, V>> f = loadAllAsync(newKeys, false, 1, result)
.thenApply(r -> result);
return new CompletableFutureWrapper<>(f);
}
return promise;
return new CompletableFutureWrapper<>(result);
}
RFuture<Map<K, V>> future = super.getAllAsync(mapKeys);
future.onComplete((map, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
CompletionStage<Map<K, V>> f = future.thenApply(map -> {
result.putAll(map);
cacheMap(map);
promise.trySuccess(result);
return result;
});
return promise;
return new CompletableFutureWrapper<>(f);
}
private void cacheMap(Map<?, ?> map) {
@ -676,7 +651,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
broadcastLocalCacheStore(entry.getValue(), keyEncoded, cacheKey);
}
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>(null);
}
List<Object> params = new ArrayList<Object>(map.size()*3);
@ -722,7 +697,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
params.add(msgEncoded);
}
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"for i=3, tonumber(ARGV[2]) + 2, 5000 do "
+ "redis.call('hmset', KEYS[1], unpack(ARGV, i, math.min(i+4999, tonumber(ARGV[2]) + 2))); "
@ -736,19 +710,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end; "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;",
Arrays.<Object>asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
Arrays.asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
params.toArray());
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<Void> f = future.thenApply(res -> {
cacheMap(map);
result.trySuccess(null);
return null;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -758,7 +727,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
ByteBuf msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
"local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
@ -772,19 +740,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
Arrays.<Object>asList(getRawName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<V> f = future.thenApply(res -> {
if (res != null) {
CacheKey cKey = localCacheView.toCacheKey(key);
cachePut(cKey, key, res);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -795,28 +758,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value);
if (prevValue == null) {
broadcastLocalCacheStore(value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture(true);
return new CompletableFutureWrapper<>(true);
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
}
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> future = super.fastPutIfAbsentAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<Boolean> f = future.thenApply(res -> {
if (res) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -827,28 +784,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue prevValue = cachePutIfExists(cacheKey, key, value);
if (prevValue != null) {
broadcastLocalCacheStore(value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture(true);
return new CompletableFutureWrapper<>(true);
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
}
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> future = super.fastPutIfExistsAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<Boolean> f = future.thenApply(res -> {
if (res) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -865,10 +816,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return RedissonPromise.newSucceededFuture(result);
return new CompletableFutureWrapper<>(result);
}
RPromise<Collection<V>> promise = new RedissonPromise<Collection<V>>();
RFuture<Collection<V>> future = commandExecutor.evalReadAsync(getRawName(), codec, ALL_VALUES,
"local entries = redis.call('hgetall', KEYS[1]); "
+ "local result = {};"
@ -886,20 +836,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end; "
+ "end; "
+ "return result; ",
Arrays.<Object>asList(getRawName()),
Arrays.asList(getRawName()),
mapKeys.toArray());
future.onComplete((res, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
CompletionStage<Collection<V>> f = future.thenApply(res -> {
result.addAll(res);
promise.trySuccess(result);
return result;
});
return promise;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -915,27 +859,20 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return RedissonPromise.newSucceededFuture(result);
return new CompletableFutureWrapper<>(result);
}
RPromise<Map<K, V>> promise = new RedissonPromise<Map<K, V>>();
RFuture<Map<K, V>> future = readAll(ALL_MAP, mapKeys, result);
future.onComplete((res, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
for (java.util.Map.Entry<K, V> entry : res.entrySet()) {
CompletionStage<Map<K, V>> f = future.thenApply(res -> {
for (Entry<K, V> entry : res.entrySet()) {
CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey());
cachePut(cacheKey, entry.getKey(), entry.getValue());
}
result.putAll(res);
promise.trySuccess(result);
return result;
});
return promise;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -978,27 +915,19 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
return RedissonPromise.newSucceededFuture(result);
return new CompletableFutureWrapper<>(result);
}
RPromise<Set<Entry<K, V>>> promise = new RedissonPromise<Set<Entry<K, V>>>();
RFuture<Set<Entry<K, V>>> future = readAll(ALL_ENTRIES, mapKeys, result);
future.onComplete((res, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
for (java.util.Map.Entry<K, V> entry : res) {
CompletionStage<Set<Entry<K, V>>> f = future.thenApply(res -> {
for (Entry<K, V> entry : res) {
CacheKey cacheKey = localCacheView.toCacheKey(entry.getKey());
cachePut(cacheKey, entry.getKey(), entry.getValue());
}
result.addAll(res);
promise.trySuccess(result);
return result;
});
return promise;
return new CompletableFutureWrapper<>(f);
}
private <R> RFuture<R> readAll(RedisCommand<?> evalCommandType, List<Object> mapKeys, R result) {
@ -1032,29 +961,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue prevValue = cacheReplace(cacheKey, key, value);
if (prevValue != null) {
broadcastLocalCacheStore(value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture(true);
return new CompletableFutureWrapper<>(true);
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
}
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> future = super.fastReplaceAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<Boolean> f = future.thenApply(res -> {
if (res) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -1122,29 +1044,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue prevValue = cacheReplace(cacheKey, key, value);
if (prevValue != null) {
broadcastLocalCacheStore(value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture((V) prevValue.getValue());
return new CompletableFutureWrapper<>((V) prevValue.getValue());
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>(null);
}
}
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.replaceAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<V> f = future.thenApply(res -> {
if (res != null) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -1181,29 +1096,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
if (cacheReplace(cacheKey, key, oldValue, newValue)) {
broadcastLocalCacheStore(newValue, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture(true);
return new CompletableFutureWrapper<>(true);
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
}
RPromise<Boolean> result = new RedissonPromise<>();
RFuture<Boolean> future = super.replaceAsync(key, oldValue, newValue);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<Boolean> f = future.thenApply(res -> {
if (res) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, newValue);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -1239,26 +1147,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheKey cacheKey = localCacheView.toCacheKey(mapKey);
if (cacheRemove(cacheKey, key, value)) {
broadcastLocalCacheStore((V) value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture(true);
return new CompletableFutureWrapper<>(true);
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture(false);
return new CompletableFutureWrapper<>(false);
}
}
RFuture<Boolean> future = super.removeAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
return;
}
CompletionStage<Boolean> f = future.thenApply(res -> {
if (res) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cache.remove(cacheKey);
}
return res;
});
return future;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -1269,28 +1173,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue prevValue = cachePutIfExists(cacheKey, key, value);
if (prevValue != null) {
broadcastLocalCacheStore((V) value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture((V) prevValue.getValue());
return new CompletableFutureWrapper<>((V) prevValue.getValue());
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>(null);
}
}
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.putIfExistsAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<V> f = future.thenApply(res -> {
if (res != null) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override
@ -1301,28 +1199,22 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
CacheValue prevValue = cachePutIfAbsent(cacheKey, key, value);
if (prevValue == null) {
broadcastLocalCacheStore((V) value, mapKey, cacheKey);
return RedissonPromise.newSucceededFuture(null);
return new CompletableFutureWrapper<>(null);
} else {
mapKey.release();
return RedissonPromise.newSucceededFuture((V) prevValue.getValue());
return new CompletableFutureWrapper<>((V) prevValue.getValue());
}
}
RPromise<V> result = new RedissonPromise<>();
RFuture<V> future = super.putIfAbsentAsync(key, value);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
return;
}
CompletionStage<V> f = future.thenApply(res -> {
if (res == null) {
CacheKey cacheKey = localCacheView.toCacheKey(key);
cachePut(cacheKey, key, value);
}
result.trySuccess(res);
return res;
});
return result;
return new CompletableFutureWrapper<>(f);
}
@Override

@ -34,6 +34,10 @@ public class CompletableFutureWrapper<V> implements RFuture<V> {
private final CompletableFuture<V> future;
private CompletableFuture<V> lastFuture;
public CompletableFutureWrapper(V value) {
this(CompletableFuture.completedFuture(value));
}
public CompletableFutureWrapper(CompletionStage<V> stage) {
this.future = stage.toCompletableFuture();
this.lastFuture = future;

Loading…
Cancel
Save