Fixed - pollAsync() and removeAsync() methods of RPriorityQueue and RPriorityDeque objects aren't guarded properly with lock.

pull/6262/head
Nikita Koksharov 3 months ago
parent cd4a9cbb48
commit 3a32180cf8

@ -273,19 +273,19 @@ public class RedissonPriorityQueue<V> extends BaseRedissonList<V> implements RPr
return wrapLockedAsync(RedisCommands.LPOP, getRawName()); return wrapLockedAsync(RedisCommands.LPOP, getRawName());
} }
protected <T> RFuture<V> wrapLockedAsync(RedisCommand<T> command, Object... params) { protected final <T> RFuture<V> wrapLockedAsync(RedisCommand<T> command, Object... params) {
return wrapLockedAsync(() -> { return wrapLockedAsync(() -> {
return commandExecutor.writeAsync(getRawName(), codec, command, params); return commandExecutor.writeAsync(getRawName(), codec, command, params);
}); });
} }
protected final <T, R> RFuture<R> wrapLockedAsync(Supplier<RFuture<R>> callable) { protected final <T, R> RFuture<R> wrapLockedAsync(Supplier<RFuture<R>> callable) {
long threadId = Thread.currentThread().getId(); long randomId = getServiceManager().generateValue();
CompletionStage<R> f = lock.lockAsync(threadId).thenCompose(r -> { CompletionStage<R> f = lock.lockAsync(randomId).thenCompose(r -> {
RFuture<R> callback = callable.get(); RFuture<R> callback = callable.get();
return callback.handle((value, ex) -> { return callback.handle((value, ex) -> {
CompletableFuture<R> result = new CompletableFuture<>(); CompletableFuture<R> result = new CompletableFuture<>();
lock.unlockAsync(threadId) lock.unlockAsync(randomId)
.whenComplete((r2, ex2) -> { .whenComplete((r2, ex2) -> {
if (ex2 != null) { if (ex2 != null) {
if (ex != null) { if (ex != null) {

Loading…
Cancel
Save