diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 21bfcca0a..810b5bed7 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -35,9 +35,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -134,29 +136,23 @@ public class MasterSlaveEntry { connectionManager, NodeType.MASTER); - int counter = 1; - if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - counter++; - } - if (!slaveBalancer.contains(client.getAddr())) { - counter++; - } - - CountableListener listener = new CountableListener<>(result, client, counter); - + List> futures = new ArrayList<>(); if (!config.checkSkipSlavesInit() && !slaveBalancer.contains(client.getAddr())) { RFuture masterAsSlaveFuture = addSlave(client.getAddr(), client.getConfig().getAddress(), false, NodeType.MASTER, client.getConfig().getSslHostname()); - masterAsSlaveFuture.onComplete(listener); + futures.add(masterAsSlaveFuture.toCompletableFuture()); } RFuture writeFuture = writeConnectionPool.add(masterEntry); - writeFuture.onComplete(listener); - + futures.add(writeFuture.toCompletableFuture()); + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); - pubSubFuture.onComplete(listener); + futures.add(pubSubFuture.toCompletableFuture()); } + + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + future.whenComplete(new TransferListener<>(result, client)); }); return result; diff --git a/redisson/src/main/java/org/redisson/misc/TransferListener.java b/redisson/src/main/java/org/redisson/misc/TransferListener.java index 4bd8df4d9..e4b75dcdd 100644 --- a/redisson/src/main/java/org/redisson/misc/TransferListener.java +++ b/redisson/src/main/java/org/redisson/misc/TransferListener.java @@ -15,6 +15,7 @@ */ package org.redisson.misc; +import java.util.concurrent.CompletionException; import java.util.function.BiConsumer; /** @@ -23,23 +24,36 @@ import java.util.function.BiConsumer; * * @param type */ -public class TransferListener implements BiConsumer { +public class TransferListener implements BiConsumer { private final RPromise promise; - + private final T value; + public TransferListener(RPromise promise) { + this(promise, null); + } + + public TransferListener(RPromise promise, T value) { super(); this.promise = promise; + this.value = value; } @Override - public void accept(T t, Throwable u) { + public void accept(Object t, Throwable u) { if (u != null) { + if (u instanceof CompletionException) { + u = u.getCause(); + } promise.tryFailure(u); return; } - - promise.trySuccess(t); + + if (value != null) { + promise.trySuccess(value); + } else { + promise.trySuccess((T)t); + } } }