refactoring

pull/4031/head
Nikita Koksharov 3 years ago
parent cd1f8ca2b8
commit 1f37c27a45

@ -35,9 +35,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -134,29 +136,23 @@ public class MasterSlaveEntry {
connectionManager, connectionManager,
NodeType.MASTER); NodeType.MASTER);
int counter = 1; List<CompletableFuture<Void>> futures = new ArrayList<>();
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
counter++;
}
if (!slaveBalancer.contains(client.getAddr())) {
counter++;
}
CountableListener<RedisClient> listener = new CountableListener<>(result, client, counter);
if (!config.checkSkipSlavesInit() && !slaveBalancer.contains(client.getAddr())) { if (!config.checkSkipSlavesInit() && !slaveBalancer.contains(client.getAddr())) {
RFuture<Void> masterAsSlaveFuture = addSlave(client.getAddr(), client.getConfig().getAddress(), RFuture<Void> masterAsSlaveFuture = addSlave(client.getAddr(), client.getConfig().getAddress(),
false, NodeType.MASTER, client.getConfig().getSslHostname()); false, NodeType.MASTER, client.getConfig().getSslHostname());
masterAsSlaveFuture.onComplete(listener); futures.add(masterAsSlaveFuture.toCompletableFuture());
} }
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry); RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
writeFuture.onComplete(listener); futures.add(writeFuture.toCompletableFuture());
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry); RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
pubSubFuture.onComplete(listener); futures.add(pubSubFuture.toCompletableFuture());
} }
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future.whenComplete(new TransferListener<>(result, client));
}); });
return result; return result;

@ -15,6 +15,7 @@
*/ */
package org.redisson.misc; package org.redisson.misc;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
/** /**
@ -23,23 +24,36 @@ import java.util.function.BiConsumer;
* *
* @param <T> type * @param <T> type
*/ */
public class TransferListener<T> implements BiConsumer<T, Throwable> { public class TransferListener<T> implements BiConsumer<Object, Throwable> {
private final RPromise<T> promise; private final RPromise<T> promise;
private final T value;
public TransferListener(RPromise<T> promise) { public TransferListener(RPromise<T> promise) {
this(promise, null);
}
public TransferListener(RPromise<T> promise, T value) {
super(); super();
this.promise = promise; this.promise = promise;
this.value = value;
} }
@Override @Override
public void accept(T t, Throwable u) { public void accept(Object t, Throwable u) {
if (u != null) { if (u != null) {
if (u instanceof CompletionException) {
u = u.getCause();
}
promise.tryFailure(u); promise.tryFailure(u);
return; return;
} }
promise.trySuccess(t); if (value != null) {
promise.trySuccess(value);
} else {
promise.trySuccess((T)t);
}
} }
} }

Loading…
Cancel
Save