Reconnection handling improvements. #262

pull/297/head
Nikita 9 years ago
parent 7c255bc7d4
commit 883df40971

@ -117,11 +117,7 @@ public class ConnectionPool<T extends RedisConnection> {
T conn = poll(entry);
if (conn != null) {
if (!conn.isActive()) {
releaseConnection(entry, conn);
releaseConnection(entry);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
promiseFailure(entry, promise, cause);
promiseFailure(entry, promise, conn);
return;
}
@ -142,11 +138,7 @@ public class ConnectionPool<T extends RedisConnection> {
T conn = future.getNow();
if (!conn.isActive()) {
releaseConnection(entry, conn);
releaseConnection(entry);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
promiseFailure(entry, promise, cause);
promiseFailure(entry, promise, conn);
return;
}
@ -170,6 +162,30 @@ public class ConnectionPool<T extends RedisConnection> {
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
scheduleCheck(entry);
}
promise.setFailure(cause);
}
private void promiseFailure(SubscribesConnectionEntry entry, Promise<T> promise, T conn) {
if (entry.getNodeType() == NodeType.SLAVE) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getSlaveFailedAttempts()) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
scheduleCheck(entry);
} else if (attempts < config.getSlaveFailedAttempts()) {
releaseConnection(entry, conn);
} else {
conn.closeAsync();
}
} else {
releaseConnection(entry, conn);
}
releaseConnection(entry);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
promise.setFailure(cause);
}

Loading…
Cancel
Save