Fixed - PriorityQueue methods may hang due to unreleased lock after exception. #5354

pull/5520/head
Nikita Koksharov 1 year ago
parent 64de46785a
commit 825d23ed11

@ -28,6 +28,7 @@ import java.io.ObjectOutputStream;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@ -282,9 +283,25 @@ public class RedissonPriorityQueue<V> extends BaseRedissonList<V> implements RPr
long threadId = Thread.currentThread().getId();
CompletionStage<R> f = lock.lockAsync(threadId).thenCompose(r -> {
RFuture<R> callback = callable.get();
return callback.thenCompose(value -> {
return lock.unlockAsync(threadId).thenApply(res -> value);
});
return callback.handle((value, ex) -> {
CompletableFuture<R> result = new CompletableFuture<>();
lock.unlockAsync(threadId)
.whenComplete((r2, ex2) -> {
if (ex2 != null) {
if (ex != null) {
ex2.addSuppressed(ex);
}
result.completeExceptionally(ex2);
return;
}
if (ex != null) {
result.completeExceptionally(ex);
return;
}
result.complete(value);
});
return result;
}).thenCompose(ff -> ff);
});
return new CompletableFutureWrapper<>(f);
}

Loading…
Cancel
Save