From 3a4eaf2f4eb15aec1cf91b923cc11bfac4049655 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 14 Dec 2021 12:38:22 +0300 Subject: [PATCH] Revert "refactoring" This reverts commit c916686a6161e9e4c2cd6ee5dbb8efd62f44e046. --- .../redisson/command/CommandBatchService.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index fcc120bc7..d60323320 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -34,11 +34,15 @@ import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.misc.AsyncCountDownLatch; +import org.redisson.misc.CountableListener; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -404,15 +408,17 @@ public class CommandBatchService extends CommandAsyncService { return; } + RPromise>> mainPromise = new RedissonPromise<>(); Map> result = new ConcurrentHashMap<>(); - List> futures = new ArrayList<>(); + CountableListener>> listener = new CountableListener<>(mainPromise, result); + listener.setCounter(connections.size()); for (Map.Entry entry : commands.entrySet()) { RPromise> execPromise = new RedissonPromise<>(); async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, new Object[] {}, execPromise, false, false); execPromise.onComplete((r, ex) -> { if (ex != null) { - futures.add(execPromise.toCompletableFuture()); + mainPromise.tryFailure(ex); return; } @@ -421,21 +427,19 @@ public class CommandBatchService extends CommandAsyncService { if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) { lastCommand.getPromise().onComplete((res, e) -> { if (e != null) { - futures.add(lastCommand.getPromise().toCompletableFuture()); + mainPromise.tryFailure(e); return; } - - futures.add(execPromise.toCompletableFuture()); + + execPromise.onComplete(listener); }); } else { - futures.add(execPromise.toCompletableFuture()); + execPromise.onComplete(listener); } }); } - - CompletableFuture mainPromise = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - mainPromise.thenApply(s -> result) - .whenComplete((res, ex) -> { + + mainPromise.onComplete((res, ex) -> { executed.set(true); if (ex != null) { resultPromise.tryFailure(ex);