refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent e2b3973c3d
commit c916686a61

@ -34,15 +34,11 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -408,17 +404,15 @@ public class CommandBatchService extends CommandAsyncService {
return;
}
RPromise<Map<MasterSlaveEntry, List<Object>>> mainPromise = new RedissonPromise<>();
Map<MasterSlaveEntry, List<Object>> result = new ConcurrentHashMap<>();
CountableListener<Map<MasterSlaveEntry, List<Object>>> listener = new CountableListener<>(mainPromise, result);
listener.setCounter(connections.size());
List<CompletableFuture<?>> futures = new ArrayList<>();
for (Map.Entry<MasterSlaveEntry, Entry> entry : commands.entrySet()) {
RPromise<List<Object>> execPromise = new RedissonPromise<>();
async(entry.getValue().isReadOnlyMode(), new NodeSource(entry.getKey()), connectionManager.getCodec(), RedisCommands.EXEC,
new Object[] {}, execPromise, false, false);
execPromise.onComplete((r, ex) -> {
if (ex != null) {
mainPromise.tryFailure(ex);
futures.add(execPromise.toCompletableFuture());
return;
}
@ -427,19 +421,21 @@ public class CommandBatchService extends CommandAsyncService {
if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) {
lastCommand.getPromise().onComplete((res, e) -> {
if (e != null) {
mainPromise.tryFailure(e);
futures.add(lastCommand.getPromise().toCompletableFuture());
return;
}
execPromise.onComplete(listener);
futures.add(execPromise.toCompletableFuture());
});
} else {
execPromise.onComplete(listener);
futures.add(execPromise.toCompletableFuture());
}
});
}
mainPromise.onComplete((res, ex) -> {
CompletableFuture<?> mainPromise = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
mainPromise.thenApply(s -> result)
.whenComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
resultPromise.tryFailure(ex);

Loading…
Cancel
Save