diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java index 67606649e..a78d6df70 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityQueue.java @@ -28,6 +28,7 @@ import java.io.ObjectOutputStream; import java.math.BigInteger; import java.security.MessageDigest; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -282,9 +283,25 @@ public class RedissonPriorityQueue extends BaseRedissonList implements RPr long threadId = Thread.currentThread().getId(); CompletionStage f = lock.lockAsync(threadId).thenCompose(r -> { RFuture callback = callable.get(); - return callback.thenCompose(value -> { - return lock.unlockAsync(threadId).thenApply(res -> value); - }); + return callback.handle((value, ex) -> { + CompletableFuture result = new CompletableFuture<>(); + lock.unlockAsync(threadId) + .whenComplete((r2, ex2) -> { + if (ex2 != null) { + if (ex != null) { + ex2.addSuppressed(ex); + } + result.completeExceptionally(ex2); + return; + } + if (ex != null) { + result.completeExceptionally(ex); + return; + } + result.complete(value); + }); + return result; + }).thenCompose(ff -> ff); }); return new CompletableFutureWrapper<>(f); }