|
|
|
@ -238,7 +238,10 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
|
|
|
|
|
return entry.getFailedAttempts() < config.getFailedAttempts();
|
|
|
|
|
if (entry.getNodeType() == NodeType.SLAVE) {
|
|
|
|
|
return entry.getFailedAttempts() < config.getFailedAttempts();
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected T poll(ClientConnectionsEntry entry) {
|
|
|
|
@ -256,11 +259,6 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
T conn = poll(entry);
|
|
|
|
|
if (conn != null) {
|
|
|
|
|
if (!conn.isActive()) {
|
|
|
|
|
promiseFailure(entry, promise, conn);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
connectedSuccessful(entry, promise, conn);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
@ -290,7 +288,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
|
|
|
|
|
entry.resetFailedAttempts();
|
|
|
|
|
if (entry.getNodeType() == NodeType.SLAVE) {
|
|
|
|
|
entry.resetFailedAttempts();
|
|
|
|
|
}
|
|
|
|
|
if (!promise.trySuccess(conn)) {
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
|
releaseConnection(entry);
|
|
|
|
@ -298,7 +298,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) {
|
|
|
|
|
if (entry.incFailedAttempts() == config.getFailedAttempts()) {
|
|
|
|
|
if (entry.getNodeType() == NodeType.SLAVE
|
|
|
|
|
&& entry.incFailedAttempts() == config.getFailedAttempts()) {
|
|
|
|
|
checkForReconnect(entry, cause);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -308,14 +309,18 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
|
|
|
|
|
int attempts = entry.incFailedAttempts();
|
|
|
|
|
if (attempts == config.getFailedAttempts()) {
|
|
|
|
|
conn.closeAsync();
|
|
|
|
|
checkForReconnect(entry, null);
|
|
|
|
|
} else if (attempts < config.getFailedAttempts()) {
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
|
if (entry.getNodeType() == NodeType.SLAVE) {
|
|
|
|
|
int attempts = entry.incFailedAttempts();
|
|
|
|
|
if (attempts == config.getFailedAttempts()) {
|
|
|
|
|
conn.closeAsync();
|
|
|
|
|
checkForReconnect(entry, null);
|
|
|
|
|
} else if (attempts < config.getFailedAttempts()) {
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
|
} else {
|
|
|
|
|
conn.closeAsync();
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
conn.closeAsync();
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
releaseConnection(entry);
|
|
|
|
@ -325,15 +330,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) {
|
|
|
|
|
if (entry.getNodeType() == NodeType.SLAVE) {
|
|
|
|
|
masterSlaveEntry.slaveDown(entry, FreezeReason.RECONNECT);
|
|
|
|
|
if (masterSlaveEntry.slaveDown(entry, FreezeReason.RECONNECT)) {
|
|
|
|
|
log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
|
|
|
|
|
scheduleCheck(entry);
|
|
|
|
|
} else {
|
|
|
|
|
if (entry.freezeMaster(FreezeReason.RECONNECT)) {
|
|
|
|
|
log.error("host " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
|
|
|
|
|
scheduleCheck(entry);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|