|
|
|
@ -77,31 +77,27 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
return initConnections(entry, false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private CompletableFuture<Void> initConnections(ClientConnectionsEntry entry, boolean checkFreezed) {
|
|
|
|
|
private CompletableFuture<Void> initConnections(ClientConnectionsEntry entry, boolean checkFrozen) {
|
|
|
|
|
int minimumIdleSize = getMinimumIdleSize(entry);
|
|
|
|
|
|
|
|
|
|
if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {
|
|
|
|
|
if (minimumIdleSize == 0 || (checkFrozen && entry.isFreezed())) {
|
|
|
|
|
return CompletableFuture.completedFuture(null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompletableFuture<Void> initPromise = new CompletableFuture<>();
|
|
|
|
|
AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
|
|
|
|
|
int startAmount = Math.min(1, minimumIdleSize);
|
|
|
|
|
AtomicInteger requests = new AtomicInteger(startAmount);
|
|
|
|
|
for (int i = 0; i < startAmount; i++) {
|
|
|
|
|
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
|
|
|
|
|
}
|
|
|
|
|
createConnection(checkFrozen, entry, initPromise, minimumIdleSize, initializedConnections);
|
|
|
|
|
return initPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry,
|
|
|
|
|
private void createConnection(boolean checkFrozen, ClientConnectionsEntry entry,
|
|
|
|
|
CompletableFuture<Void> initPromise, int minimumIdleSize, AtomicInteger initializedConnections) {
|
|
|
|
|
|
|
|
|
|
if (checkFreezed && (entry.isFreezed() || !isHealthy(entry))) {
|
|
|
|
|
if (checkFrozen && (entry.isFreezed() || !isHealthy(entry))) {
|
|
|
|
|
int totalInitializedConnections = minimumIdleSize - initializedConnections.get();
|
|
|
|
|
Throwable cause = new RedisConnectionException(
|
|
|
|
|
"Unable to init enough connections amount! Only " + totalInitializedConnections + " of " + minimumIdleSize + " were initialized. Server: "
|
|
|
|
|
+ entry.getClient().getAddr());
|
|
|
|
|
Exception cause = new RedisConnectionException(
|
|
|
|
|
"Unable to init enough connections amount! Only " + totalInitializedConnections
|
|
|
|
|
+ " of " + minimumIdleSize + " were initialized. Redis node info: " + entry);
|
|
|
|
|
initPromise.completeExceptionally(cause);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -151,7 +147,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
errorMsg = "Unable to init enough connections amount! Only " + totalInitializedConnections
|
|
|
|
|
+ " of " + minimumIdleSize + " were initialized. Redis server: " + entry.getClient().getAddr();
|
|
|
|
|
}
|
|
|
|
|
Throwable cause = new RedisConnectionException(errorMsg, e);
|
|
|
|
|
Exception cause = new RedisConnectionException(errorMsg, e);
|
|
|
|
|
initPromise.completeExceptionally(cause);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -162,9 +158,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
|
|
|
|
|
}
|
|
|
|
|
} else if (value > 0 && !initPromise.isDone()) {
|
|
|
|
|
if (requests.incrementAndGet() <= minimumIdleSize) {
|
|
|
|
|
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
|
|
|
|
|
}
|
|
|
|
|
createConnection(checkFrozen, entry, initPromise, minimumIdleSize, initializedConnections);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|