From c916686a6161e9e4c2cd6ee5dbb8efd62f44e046 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 13 Dec 2021 15:19:40 +0300 Subject: [PATCH] refactoring --- .../redisson/command/CommandBatchService.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index d60323320..fcc120bc7 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -34,15 +34,11 @@ import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.misc.AsyncCountDownLatch; -import org.redisson.misc.CountableListener; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -408,17 +404,15 @@ public class CommandBatchService extends CommandAsyncService { return; } - RPromise>> mainPromise = new RedissonPromise<>(); Map> result = new ConcurrentHashMap<>(); - CountableListener>> listener = new CountableListener<>(mainPromise, result); - listener.setCounter(connections.size()); + List> futures = new ArrayList<>(); for (Map.Entry entry : commands.entrySet()) { RPromise> execPromise = new RedissonPromise<>(); async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, new Object[] {}, execPromise, false, false); execPromise.onComplete((r, ex) -> { if (ex != null) { - mainPromise.tryFailure(ex); + futures.add(execPromise.toCompletableFuture()); return; } @@ -427,19 +421,21 @@ public class CommandBatchService extends CommandAsyncService { if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) { lastCommand.getPromise().onComplete((res, e) -> { if (e != null) { - mainPromise.tryFailure(e); + futures.add(lastCommand.getPromise().toCompletableFuture()); return; } - - execPromise.onComplete(listener); + + futures.add(execPromise.toCompletableFuture()); }); } else { - execPromise.onComplete(listener); + futures.add(execPromise.toCompletableFuture()); } }); } - - mainPromise.onComplete((res, ex) -> { + + CompletableFuture mainPromise = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + mainPromise.thenApply(s -> result) + .whenComplete((res, ex) -> { executed.set(true); if (ex != null) { resultPromise.tryFailure(ex);