fixed connection leak after refactoring

pull/5771/head
Nikita Koksharov 12 months ago
parent c93363a3db
commit 0ea4dd8512

@ -110,7 +110,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry, boolean trackChanges) { protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry, boolean trackChanges) {
ConnectionsHolder<T> handler = getConnectionHolder(entry, trackChanges); ConnectionsHolder<T> handler = getConnectionHolder(entry, trackChanges);
CompletableFuture<T> result = handler.acquireConnection(command); CompletableFuture<T> result = handler.acquireConnection(command);
return result.whenComplete((r, e) -> { CompletableFuture<T> cancelableFuture = new CompletableFuture<>();
result.whenComplete((r, e) -> {
if (e != null) { if (e != null) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
entry.getClient().getConfig().getFailedNodeDetector().onConnectFailed(); entry.getClient().getConfig().getFailedNodeDetector().onConnectFailed();
@ -118,6 +119,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
masterSlaveEntry.shutdownAndReconnectAsync(entry.getClient(), e); masterSlaveEntry.shutdownAndReconnectAsync(entry.getClient(), e);
} }
} }
cancelableFuture.completeExceptionally(e);
return; return;
} }
@ -126,7 +128,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
entry.getClient().getConfig().getFailedNodeDetector().onConnectSuccessful(); entry.getClient().getConfig().getFailedNodeDetector().onConnectSuccessful();
} }
if (!cancelableFuture.complete(r)) {
entry.returnConnection(r);
}
}); });
return cancelableFuture;
} }
private boolean isHealthy(ClientConnectionsEntry entry) { private boolean isHealthy(ClientConnectionsEntry entry) {

Loading…
Cancel
Save