Fixed - Connection leak (regression since 3.16.1) #3840

pull/3848/head
Nikita Koksharov 3 years ago
parent 69414a49cb
commit c63bea3625

@ -180,13 +180,11 @@ public class RedisExecutor<V, R> {
}
if (connectionFuture.cancel(false)) {
if (exception == null) {
exception = new RedisTimeoutException("Unable to acquire connection! " + connectionFuture +
"Increase connection pool size. "
+ "Node source: " + source
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts");
}
exception = new RedisTimeoutException("Unable to acquire connection! " + connectionFuture +
"Increase connection pool size. "
+ "Node source: " + source
+ ", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts");
} else {
if (connectionFuture.isSuccess()) {
if (writeFuture == null || !writeFuture.isDone()) {
@ -275,7 +273,7 @@ public class RedisExecutor<V, R> {
if (!future.isSuccess()) {
exception = new WriteRedisConnectionException(
"Unable to write command into connection! Node source: " + source + ", connection: " + connection +
"Unable to write command into connection! Increase connection pool size. Node source: " + source + ", connection: " + connection +
", command: " + LogHelper.toString(command, params)
+ " after " + attempt + " retry attempts", future.cause());
if (attempt == attempts) {
@ -507,6 +505,7 @@ public class RedisExecutor<V, R> {
connectionType, command, LogHelper.toString(params), source, connection.getRedisClient().getAddr(), connection);
}
writeFuture = connection.send(new CommandData<>(attemptPromise, codec, command, params));
release(connection);
}
}
@ -517,10 +516,8 @@ public class RedisExecutor<V, R> {
RedisConnection connection = connectionFuture.getNow();
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
connectionManager.releaseRead(source, connection);
} else {
connectionManager.releaseWrite(source, connection);
if (source.getRedirect() == Redirect.ASK || getClass() != RedisExecutor.class) {
release(connection);
}
if (log.isDebugEnabled()) {
@ -534,6 +531,14 @@ public class RedisExecutor<V, R> {
}
}
private void release(RedisConnection connection) {
if (readOnlyMode) {
connectionManager.releaseRead(source, connection);
} else {
connectionManager.releaseWrite(source, connection);
}
}
public RedisClient getRedisClient() {
return connectionFuture.getNow().getRedisClient();
}

@ -168,64 +168,28 @@ public class ClientConnectionsEntry {
}
public void acquireConnection(Runnable runnable, RedisCommand<?> command) {
if (isPolled(command)) {
freeConnectionsCounter.acquire(runnable);
return;
}
runnable.run();
freeConnectionsCounter.acquire(runnable);
}
public void releaseConnection() {
freeConnectionsCounter.release();
}
AtomicBoolean lock = new AtomicBoolean();
public RedisConnection pollConnection(RedisCommand<?> command) {
if (isPolled(command)) {
while (true) {
if (lock.compareAndSet(false, true)) {
if (!iter.hasNext()) {
iter = freeConnections.iterator();
}
try {
if (iter.hasNext()) {
RedisConnection c = iter.next();
iter.remove();
if (c != null) {
c.incUsage();
c.setPooled(true);
}
return c;
}
return null;
} finally {
lock.set(false);
}
}
}
public void addConnection(RedisConnection conn) {
conn.setLastUsageTime(System.nanoTime());
if (conn instanceof RedisPubSubConnection) {
freeSubscribeConnections.add((RedisPubSubConnection) conn);
} else {
freeConnections.add(conn);
}
}
while (true) {
if (lock.compareAndSet(false, true)) {
if (!iter.hasNext()) {
iter = freeConnections.iterator();
}
try {
if (iter.hasNext()) {
RedisConnection c = iter.next();
if (c != null) {
c.incUsage();
}
return c;
}
return null;
} finally {
lock.set(false);
}
}
public RedisConnection pollConnection(RedisCommand<?> command) {
RedisConnection c = freeConnections.poll();
if (c != null) {
c.incUsage();
}
return c;
}
public void releaseConnection(RedisConnection connection) {
@ -238,15 +202,9 @@ public class ClientConnectionsEntry {
return;
}
connection.decUsage();
connection.setLastUsageTime(System.nanoTime());
if (connection.getUsage() == 0) {
freeConnections.add(connection);
return;
}
if (connection.decUsage() == 0 && connection.isPooled()) {
freeConnections.add(connection);
connection.setPooled(false);
}
freeConnections.add(connection);
}
public RFuture<RedisConnection> connect() {

Loading…
Cancel
Save