From 36bafe493568383ac0d35be696d8b818e23b5faf Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 12 Jan 2022 16:49:02 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/RedissonList.java | 19 +- .../main/java/org/redisson/RedissonMap.java | 274 +++++++----------- 2 files changed, 120 insertions(+), 173 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonList.java b/redisson/src/main/java/org/redisson/RedissonList.java index 90652114c..cd3413d00 100644 --- a/redisson/src/main/java/org/redisson/RedissonList.java +++ b/redisson/src/main/java/org/redisson/RedissonList.java @@ -32,11 +32,12 @@ import org.redisson.iterator.RedissonBaseIterator; import org.redisson.iterator.RedissonListIterator; import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.function.Predicate; import static org.redisson.client.protocol.RedisCommands.*; @@ -391,25 +392,21 @@ public class RedissonList extends RedissonExpirable implements RList { @Override public RFuture setAsync(int index, V element) { - RPromise result = new RedissonPromise(); RFuture future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT, "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + "return v", - Collections.singletonList(getRawName()), index, encode(element)); - future.onComplete((res, e) -> { + Collections.singletonList(getRawName()), index, encode(element)); + CompletionStage f = future.handle((res, e) -> { if (e != null) { if (e.getMessage().contains("ERR index out of range")) { - result.tryFailure(new IndexOutOfBoundsException("index out of range")); - return; + throw new CompletionException(new IndexOutOfBoundsException("index out of range")); } - result.tryFailure(e); - return; + throw new CompletionException(e); } - - result.trySuccess(res); + return res; }); - return result; + return new CompletableFutureWrapper<>(f); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 0579d9ac4..ef0ec4265 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -32,7 +32,6 @@ import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.iterator.RedissonMapIterator; import org.redisson.mapreduce.RedissonMapReduce; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -192,7 +191,6 @@ public class RedissonMap extends RedissonExpirable implements RMap { newValuePromise.complete(value); } return newValuePromise - .exceptionally(e -> null) .thenCompose(newValue -> { RFuture future; if (newValue != null) { @@ -200,17 +198,11 @@ public class RedissonMap extends RedissonExpirable implements RMap { } else { future = fastRemoveAsync(key); } - return future.thenApply(res -> { - lock.unlockAsync(threadId); - return newValue; - }); + return future.thenApply(res -> newValue); }); - }); - }); - f.whenComplete((r, e) -> { - if (e != null) { + }).whenComplete((c, e) -> { lock.unlockAsync(threadId); - } + }); }); return new CompletableFutureWrapper<>(f); } @@ -221,64 +213,53 @@ public class RedissonMap extends RedissonExpirable implements RMap { Objects.requireNonNull(remappingFunction); RLock lock = getLock(key); - RPromise result = new RedissonPromise<>(); long threadId = Thread.currentThread().getId(); - lock.lockAsync(threadId).onComplete((r, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - - RFuture oldValueFuture = getAsync(key); - oldValueFuture.onComplete((oldValue, ex) -> { - if (ex != null) { - lock.unlockAsync(threadId); - result.tryFailure(ex); - return; - } + CompletionStage f = lock.lockAsync(threadId) + .thenCompose(r -> { + RFuture oldValueFuture = getAsync(key); + return oldValueFuture.thenCompose(oldValue -> { + CompletableFuture result = new CompletableFuture<>(); + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + V newValue; + try { + newValue = remappingFunction.apply(key, oldValue); + } catch (Exception exception) { + result.completeExceptionally(exception); + return; + } - commandExecutor.getConnectionManager().getExecutor().execute(() -> { - V newValue; - try { - newValue = remappingFunction.apply(key, oldValue); - } catch (Exception exception) { - lock.unlockAsync(threadId); - result.tryFailure(exception); - return; - } + if (newValue == null) { + if (oldValue != null) { + fastRemoveAsync(key).whenComplete((res, exc) -> { + if (exc != null) { + result.completeExceptionally(exc); + return; + } - if (newValue == null) { - if (oldValue != null) { - fastRemoveAsync(key).onComplete((res, exc) -> { - lock.unlockAsync(threadId); + result.complete(newValue); + }); + return; + } + } else { + fastPutAsync(key, newValue).whenComplete((res, exc) -> { if (exc != null) { - result.tryFailure(exc); + result.completeExceptionally(exc); return; } - result.trySuccess(newValue); + result.complete(newValue); }); return; } - } else { - fastPutAsync(key, newValue).onComplete((res, exc) -> { - lock.unlockAsync(threadId); - if (exc != null) { - result.tryFailure(exc); - return; - } - - result.trySuccess(newValue); - }); - return; - } + result.complete(newValue); + }); + return result; + }).whenComplete((c, e) -> { lock.unlockAsync(threadId); - result.trySuccess(newValue); }); - }); }); - return result; + return new CompletableFutureWrapper<>(f); } @@ -312,56 +293,40 @@ public class RedissonMap extends RedissonExpirable implements RMap { Objects.requireNonNull(mappingFunction); RLock lock = getLock(key); - RPromise result = new RedissonPromise<>(); long threadId = Thread.currentThread().getId(); - lock.lockAsync(threadId).onComplete((r, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - - RFuture oldValueFuture = getAsync(key); - oldValueFuture.onComplete((oldValue, ex) -> { - if (ex != null) { - lock.unlockAsync(threadId); - result.tryFailure(ex); - return; - } - - if (oldValue != null) { - lock.unlockAsync(threadId); - result.trySuccess(oldValue); - return; - } + CompletionStage f = lock.lockAsync(threadId) + .thenCompose(r -> { + RFuture oldValueFuture = getAsync(key); + return oldValueFuture.thenCompose(oldValue -> { + if (oldValue != null) { + return CompletableFuture.completedFuture(oldValue); + } - commandExecutor.getConnectionManager().getExecutor().execute(() -> { - V newValue; - try { - newValue = mappingFunction.apply(key); - } catch (Exception exception) { - lock.unlockAsync(threadId); - result.tryFailure(exception); - return; - } - if (newValue != null) { - fastPutAsync(key, newValue).onComplete((res, exc) -> { - lock.unlockAsync(threadId); - if (exc != null) { - result.tryFailure(exc); + CompletableFuture result = new CompletableFuture<>(); + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + V newValue; + try { + newValue = mappingFunction.apply(key); + } catch (Exception exception) { + result.completeExceptionally(exception); + return; + } + if (newValue != null) { + fastPutAsync(key, newValue).thenAccept(res -> { + result.complete(newValue); + }); return; } - result.trySuccess(newValue); + result.complete(null); }); - return; - } - - lock.unlockAsync(threadId); - result.trySuccess(null); + return result; + }).whenComplete((c, e) -> { + lock.unlockAsync(threadId); + }); }); - }); - }); - return result; + + return new CompletableFutureWrapper<>(f); } @Override @@ -393,63 +358,53 @@ public class RedissonMap extends RedissonExpirable implements RMap { Objects.requireNonNull(remappingFunction); RLock lock = getLock(key); - RPromise result = new RedissonPromise<>(); long threadId = Thread.currentThread().getId(); - lock.lockAsync(threadId).onComplete((r, e) -> { - if (e != null) { - result.tryFailure(e); - return; - } - RFuture oldValueFuture = getAsync(key); - oldValueFuture.onComplete((oldValue, ex) -> { - if (ex != null) { - lock.unlockAsync(threadId); - result.tryFailure(e); - return; - } - - if (oldValue == null) { - lock.unlockAsync(threadId); - result.trySuccess(null); - return; - } + CompletionStage f = lock.lockAsync(threadId) + .thenCompose(r -> { + RFuture oldValueFuture = getAsync(key); + return oldValueFuture.thenCompose(oldValue -> { + if (oldValue == null) { + return CompletableFuture.completedFuture(null); + } - commandExecutor.getConnectionManager().getExecutor().execute(() -> { - V newValue; - try { - newValue = remappingFunction.apply(key, oldValue); - } catch (Exception exception) { - lock.unlockAsync(threadId); - result.tryFailure(exception); - return; - } - if (newValue != null) { - RFuture fastPutFuture = fastPutAsync(key, newValue); - fastPutFuture.onComplete((re, ex1) -> { - lock.unlockAsync(threadId); - if (ex1 != null) { - result.tryFailure(ex1); + CompletableFuture result = new CompletableFuture<>(); + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + V newValue; + try { + newValue = remappingFunction.apply(key, oldValue); + } catch (Exception exception) { + result.completeExceptionally(exception); return; } + if (newValue != null) { + RFuture fastPutFuture = fastPutAsync(key, newValue); + fastPutFuture.whenComplete((re, ex1) -> { + if (ex1 != null) { + result.completeExceptionally(ex1); + return; + } - result.trySuccess(newValue); - }); - } else { - RFuture removeFuture = fastRemoveAsync(key); - removeFuture.onComplete((re, ex1) -> { - lock.unlockAsync(threadId); - if (ex1 != null) { - result.tryFailure(ex1); - return; - } + result.complete(newValue); + }); + } else { + RFuture removeFuture = fastRemoveAsync(key); + removeFuture.whenComplete((re, ex1) -> { + if (ex1 != null) { + result.completeExceptionally(ex1); + return; + } - result.trySuccess(null); + result.complete(null); + }); + } }); - } + return result; + }).whenComplete((c, e) -> { + lock.unlockAsync(threadId); + }); }); - }); - }); - return result; + + return new CompletableFutureWrapper<>(f); } @Override @@ -702,22 +657,17 @@ public class RedissonMap extends RedissonExpirable implements RMap { protected final RFuture mapWriterFuture(RFuture future, MapWriterTask task, Function condition) { if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) { - future.onComplete((res, e) -> { + CompletionStage f = future.whenComplete((res, e) -> { if (e == null && condition.apply(res)) { writeBehindTask.addTask(task); } }); - return future; - } - - final RPromise promise = new RedissonPromise<>(); - future.onComplete((res, e) -> { - if (e != null) { - promise.tryFailure(e); - return; - } + return new CompletableFutureWrapper<>(f); + } + CompletionStage f = future.thenCompose(res -> { if (condition.apply(res)) { + CompletableFuture promise = new CompletableFuture<>(); commandExecutor.getConnectionManager().getExecutor().execute(() -> { try { if (task instanceof MapWriterTask.Add) { @@ -726,17 +676,17 @@ public class RedissonMap extends RedissonExpirable implements RMap { options.getWriter().delete(task.getKeys()); } } catch (Exception ex) { - promise.tryFailure(ex); + promise.completeExceptionally(ex); return; } - promise.trySuccess(res); + promise.complete(res); }); - } else { - promise.trySuccess(res); + return promise; } + return CompletableFuture.completedFuture(res); }); - return promise; + return new CompletableFutureWrapper<>(f); } protected RFuture putAllOperationAsync(Map map) {