refactoring

pull/5676/head
Nikita Koksharov 11 months ago
parent 90d3668d8b
commit e5a70dfc90

@ -25,9 +25,9 @@ import org.slf4j.LoggerFactory;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function; import java.util.function.Function;
/** /**
@ -121,37 +121,32 @@ public class ConnectionsHolder<T extends RedisConnection> {
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
CompletableFuture<Void> initPromise = new CompletableFuture<>(); CompletableFuture<Void> f = createConnection(minimumIdleSize, 1);
AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize); for (int i = 2; i <= minimumIdleSize; i++) {
createConnection(initPromise, minimumIdleSize, initializedConnections); int k = i;
return initPromise; f = f.thenCompose(r -> createConnection(minimumIdleSize, k));
}
return f.thenAccept(r -> {
log.info("{} connections initialized for {}", minimumIdleSize, client.getAddr());
});
} }
private void createConnection(CompletableFuture<Void> initPromise, int minimumIdleSize, AtomicInteger initializedConnections) { private CompletableFuture<Void> createConnection(int minimumIdleSize, int index) {
CompletableFuture<Void> f = acquireConnection(); CompletableFuture<Void> f = acquireConnection();
f.thenAccept(r -> { return f.thenCompose(r -> {
CompletableFuture<T> promise = new CompletableFuture<>(); CompletableFuture<T> promise = new CompletableFuture<>();
createConnection(promise); createConnection(promise);
promise.whenComplete((conn, e) -> { return promise.handle((conn, e) -> {
if (e == null) { if (e == null) {
if (changeUsage) { if (changeUsage) {
conn.decUsage(); conn.decUsage();
} }
if (!initPromise.isDone()) {
addConnection(conn); addConnection(conn);
} else {
conn.closeAsync();
}
} }
releaseConnection(); releaseConnection();
if (e != null) { if (e != null) {
if (initPromise.isDone()) {
return;
}
for (RedisConnection connection : getAllConnections()) { for (RedisConnection connection : getAllConnections()) {
if (!connection.isClosed()) { if (!connection.isClosed()) {
connection.closeAsync(); connection.closeAsync();
@ -159,7 +154,7 @@ public class ConnectionsHolder<T extends RedisConnection> {
} }
getAllConnections().clear(); getAllConnections().clear();
int totalInitializedConnections = minimumIdleSize - initializedConnections.get(); int totalInitializedConnections = index - 1;
String errorMsg; String errorMsg;
if (totalInitializedConnections == 0) { if (totalInitializedConnections == 0) {
errorMsg = "Unable to connect to Redis server: " + client.getAddr(); errorMsg = "Unable to connect to Redis server: " + client.getAddr();
@ -168,18 +163,9 @@ public class ConnectionsHolder<T extends RedisConnection> {
+ " of " + minimumIdleSize + " were initialized. Redis server: " + client.getAddr(); + " of " + minimumIdleSize + " were initialized. Redis server: " + client.getAddr();
} }
Exception cause = new RedisConnectionException(errorMsg, e); Exception cause = new RedisConnectionException(errorMsg, e);
initPromise.completeExceptionally(cause); throw new CompletionException(cause);
return;
}
int value = initializedConnections.decrementAndGet();
if (value == 0) {
if (initPromise.complete(null)) {
log.info("{} connections initialized for {}", minimumIdleSize, client.getAddr());
}
} else if (value > 0 && !initPromise.isDone()) {
createConnection(initPromise, minimumIdleSize, initializedConnections);
} }
return null;
}); });
}); });
} }

Loading…
Cancel
Save