|
|
|
@ -41,6 +41,13 @@ import io.netty.util.TimerTask;
|
|
|
|
|
import io.netty.util.concurrent.Future;
|
|
|
|
|
import io.netty.util.concurrent.FutureListener;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Base connection pool class
|
|
|
|
|
*
|
|
|
|
|
* @author Nikita Koksharov
|
|
|
|
|
*
|
|
|
|
|
* @param <T> - connection type
|
|
|
|
|
*/
|
|
|
|
|
abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
|
|
|
|
|
private final Logger log = LoggerFactory.getLogger(getClass());
|
|
|
|
@ -211,8 +218,6 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<T> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
releaseConnection(entry);
|
|
|
|
|
|
|
|
|
|
promiseFailure(entry, promise, future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -223,13 +228,13 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
promiseSuccessful(entry, promise, conn);
|
|
|
|
|
connectedSuccessful(entry, promise, conn);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
return promise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void promiseSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
|
|
|
|
|
private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
|
|
|
|
|
entry.resetFailedAttempts();
|
|
|
|
|
if (!promise.trySuccess(conn)) {
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
@ -247,15 +252,20 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
checkForReconnect(entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
releaseConnection(entry);
|
|
|
|
|
|
|
|
|
|
promise.tryFailure(cause);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
|
|
|
|
|
int attempts = entry.incFailedAttempts();
|
|
|
|
|
if (attempts == config.getFailedAttempts()) {
|
|
|
|
|
conn.closeAsync();
|
|
|
|
|
checkForReconnect(entry);
|
|
|
|
|
} else if (attempts < config.getFailedAttempts()) {
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
|
} else {
|
|
|
|
|
conn.closeAsync();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
releaseConnection(entry);
|
|
|
|
@ -267,9 +277,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
private RFuture<T> promiseFailure(ClientConnectionsEntry entry, T conn) {
|
|
|
|
|
int attempts = entry.incFailedAttempts();
|
|
|
|
|
if (attempts == config.getFailedAttempts()) {
|
|
|
|
|
conn.closeAsync();
|
|
|
|
|
checkForReconnect(entry);
|
|
|
|
|
} else if (attempts < config.getFailedAttempts()) {
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
|
} else {
|
|
|
|
|
conn.closeAsync();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
releaseConnection(entry);
|
|
|
|
|