failed slaves handling improvement

pull/1705/head
Nikita 7 years ago
parent d5348b150f
commit bdb2a0e229

@ -33,6 +33,7 @@ import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException; import org.redisson.RedissonShutdownException;
import org.redisson.ScanResult; import org.redisson.ScanResult;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient; import org.redisson.api.RedissonReactiveClient;
@ -57,6 +58,7 @@ import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.codec.ReferenceCodecProvider; import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
@ -697,8 +699,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (!future.isSuccess()) { if (!future.isSuccess()) {
details.setException(new WriteRedisConnectionException( details.setException(new WriteRedisConnectionException(
"Unable to send command! Node source: " + details.getSource() + ", connection: " + future.channel() + "Unable to send command! Node source: " + details.getSource() + ", connection: " + connection +
", command: " + details.getCommand() + ", params: " + LogHelper.toString(details.getParams()), future.cause())); ", command: " + details.getCommand() + ", command params: " + LogHelper.toString(details.getParams())
+ " after " + details.getAttempt() + " retry attempts", future.cause()));
if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) { if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
if (!details.getAttemptPromise().tryFailure(details.getException())) { if (!details.getAttemptPromise().tryFailure(details.getException())) {
log.error(details.getException().getMessage()); log.error(details.getException().getMessage());
@ -743,6 +746,12 @@ public class CommandAsyncService implements CommandAsyncExecutor {
TimerTask timeoutTask = new TimerTask() { TimerTask timeoutTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
MasterSlaveEntry entry = connectionManager.getEntry(connection.getRedisClient());
ClientConnectionsEntry ee = entry.getEntry(connection.getRedisClient());
if (ee != null && ee.getNodeType() == NodeType.SLAVE) {
ee.trySetupFistFail();
}
details.getAttemptPromise().tryFailure( details.getAttemptPromise().tryFailure(
new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand() new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand()
+ " with params: " + LogHelper.toString(details.getParams()) + " channel: " + connection.getChannel())); + " with params: " + LogHelper.toString(details.getParams()) + " channel: " + connection.getChannel()));
@ -898,6 +907,14 @@ public class CommandAsyncService implements CommandAsyncExecutor {
free(details.getParams()); free(details.getParams());
if (!(future.cause() instanceof RedisTimeoutException)) {
MasterSlaveEntry entry = connectionManager.getEntry(details.getConnectionFuture().getNow().getRedisClient());
ClientConnectionsEntry ee = entry.getEntry(details.getConnectionFuture().getNow().getRedisClient());
if (ee != null && ee.getNodeType() == NodeType.SLAVE) {
ee.resetFirstFail();
}
}
if (future.isSuccess()) { if (future.isSuccess()) {
R res = future.getNow(); R res = future.getNow();
if (res instanceof ScanResult) { if (res instanceof ScanResult) {

@ -43,7 +43,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
private int failedSlaveReconnectionInterval = 3000; private int failedSlaveReconnectionInterval = 3000;
private int failedSlaveCheckInterval = 60000; private int failedSlaveCheckInterval = 180000;
/** /**
* Redis 'master' node minimum idle connection amount for <b>each</b> slave node * Redis 'master' node minimum idle connection amount for <b>each</b> slave node
@ -136,7 +136,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
* when the time interval from the moment of first Redis command execution failure * when the time interval from the moment of first Redis command execution failure
* on this server reaches <code>slaveFailsInterval</code> value. * on this server reaches <code>slaveFailsInterval</code> value.
* <p> * <p>
* Default is <code>60000</code> * Default is <code>180000</code>
* *
* @param slaveFailsInterval - time interval in milliseconds * @param slaveFailsInterval - time interval in milliseconds
* @return config * @return config

@ -200,6 +200,7 @@ public class MasterSlaveEntry {
connection.closeAsync(); connection.closeAsync();
reattachBlockingQueue(connection); reattachBlockingQueue(connection);
} }
entry.getAllConnections().clear();
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
connection.closeAsync(); connection.closeAsync();
@ -323,6 +324,10 @@ public class MasterSlaveEntry {
return slaveBalancer.getEntries(); return slaveBalancer.getEntries();
} }
public ClientConnectionsEntry getEntry(RedisClient redisClient) {
return slaveBalancer.getEntry(redisClient);
}
public RedisClient getClient() { public RedisClient getClient() {
return masterEntry.getClient(); return masterEntry.getClient();
} }

@ -218,7 +218,7 @@ public class LoadBalancerManager {
return null; return null;
} }
private ClientConnectionsEntry getEntry(RedisClient redisClient) { public ClientConnectionsEntry getEntry(RedisClient redisClient) {
return client2Entry.get(redisClient); return client2Entry.get(redisClient);
} }

@ -283,9 +283,6 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) { private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
if (entry.getNodeType() == NodeType.SLAVE) {
entry.resetFirstFail();
}
if (!promise.trySuccess(conn)) { if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn); releaseConnection(entry, conn);
releaseConnection(entry); releaseConnection(entry);
@ -310,6 +307,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
entry.trySetupFistFail(); entry.trySetupFistFail();
if (entry.isFailed()) { if (entry.isFailed()) {
conn.closeAsync(); conn.closeAsync();
entry.getAllConnections().remove(conn);
checkForReconnect(entry, null); checkForReconnect(entry, null);
} else { } else {
releaseConnection(entry, conn); releaseConnection(entry, conn);
@ -427,6 +425,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
public void returnConnection(ClientConnectionsEntry entry, T connection) { public void returnConnection(ClientConnectionsEntry entry, T connection) {
if (entry.isFreezed() && !entry.isMasterForRead()) { if (entry.isFreezed() && !entry.isMasterForRead()) {
connection.closeAsync(); connection.closeAsync();
entry.getAllConnections().remove(connection);
} else { } else {
releaseConnection(entry, connection); releaseConnection(entry, connection);
} }

Loading…
Cancel
Save