Revert "refactoring"

This reverts commit c916686a61.
pull/4031/head
Nikita Koksharov 3 years ago
parent cc4c5b25a0
commit 3a4eaf2f4e

@ -34,11 +34,15 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder; import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.AsyncCountDownLatch; import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise; import org.redisson.misc.RedissonPromise;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -404,15 +408,17 @@ public class CommandBatchService extends CommandAsyncService {
return; return;
} }
RPromise<Map<MasterSlaveEntry, List<Object>>> mainPromise = new RedissonPromise<>();
Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<>(); Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<>();
List<CompletableFuture<?>> futures = new ArrayList<>(); CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<>(mainPromise, result);
listener.setCounter(connections.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) { for (Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
RPromise<List<Object>> execPromise = new RedissonPromise<>(); RPromise<List<Object>> execPromise = new RedissonPromise<>();
async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC, async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC,
new Object[] {}, execPromise, false, false); new Object[] {}, execPromise, false, false);
execPromise.onComplete((r, ex) -> { execPromise.onComplete((r, ex) -> {
if (ex != null) { if (ex != null) {
futures.add(execPromise.toCompletableFuture()); mainPromise.tryFailure(ex);
return; return;
} }
@ -421,21 +427,19 @@ public class CommandBatchService extends CommandAsyncService {
if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) { if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) {
lastCommand.getPromise().onComplete((res, e) -> { lastCommand.getPromise().onComplete((res, e) -> {
if (e != null) { if (e != null) {
futures.add(lastCommand.getPromise().toCompletableFuture()); mainPromise.tryFailure(e);
return; return;
} }
futures.add(execPromise.toCompletableFuture()); execPromise.onComplete(listener);
}); });
} else { } else {
futures.add(execPromise.toCompletableFuture()); execPromise.onComplete(listener);
} }
}); });
} }
CompletableFuture<?> mainPromise = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); mainPromise.onComplete((res, ex) -> {
mainPromise.thenApply(s -> result)
.whenComplete((res, ex) -> {
executed.set(true); executed.set(true);
if (ex != null) { if (ex != null) {
resultPromise.tryFailure(ex); resultPromise.tryFailure(ex);

Loading…
Cancel
Save