refactoring

pull/5676/head
Nikita Koksharov 11 months ago
parent d974338deb
commit 92058d22cf

@ -102,7 +102,7 @@ public class ConnectionsHolder<T extends RedisConnection> {
return; return;
} }
if (client != connection.getRedisClient()) { if (client != null && client != connection.getRedisClient()) {
connection.closeAsync(); connection.closeAsync();
return; return;
} }
@ -112,19 +112,6 @@ public class ConnectionsHolder<T extends RedisConnection> {
connection.decUsage(); connection.decUsage();
} }
private CompletionStage<T> connect() {
CompletionStage<T> future = connectionCallback.apply(client);
return future.whenComplete((conn, e) -> {
if (e != null) {
return;
}
log.debug("new connection created: {}", conn);
allConnections.add(conn);
});
}
public Queue<T> getAllConnections() { public Queue<T> getAllConnections() {
return allConnections; return allConnections;
} }
@ -198,13 +185,19 @@ public class ConnectionsHolder<T extends RedisConnection> {
} }
private void createConnection(CompletableFuture<T> promise) { private void createConnection(CompletableFuture<T> promise) {
CompletionStage<T> connFuture = connect(); CompletionStage<T> connFuture = connectionCallback.apply(client);
connFuture.whenComplete((conn, e) -> { connFuture.whenComplete((conn, e) -> {
if (e != null) { if (e != null) {
promiseFailure(promise, e); releaseConnection();
promise.completeExceptionally(e);
return; return;
} }
log.debug("new connection created: {}", conn);
allConnections.add(conn);
if (changeUsage) { if (changeUsage) {
promise.thenApply(c -> c.incUsage()); promise.thenApply(c -> c.incUsage());
} }
@ -212,12 +205,6 @@ public class ConnectionsHolder<T extends RedisConnection> {
}); });
} }
private void promiseFailure(CompletableFuture<T> promise, Throwable cause) {
releaseConnection();
promise.completeExceptionally(cause);
}
private void connectedSuccessful(CompletableFuture<T> promise, T conn) { private void connectedSuccessful(CompletableFuture<T> promise, T conn) {
if (!promise.complete(conn)) { if (!promise.complete(conn)) {
releaseConnection(conn); releaseConnection(conn);

Loading…
Cancel
Save