diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 1bc62f061..2015b77e1 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -33,6 +33,7 @@ import org.redisson.RedissonReference; import org.redisson.RedissonShutdownException; import org.redisson.ScanResult; import org.redisson.SlotCallback; +import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.api.RedissonClient; import org.redisson.api.RedissonReactiveClient; @@ -57,6 +58,7 @@ import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.codec.ReferenceCodecProvider; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; +import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; @@ -697,8 +699,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (!future.isSuccess()) { details.setException(new WriteRedisConnectionException( - "Unable to send command! Node source: " + details.getSource() + ", connection: " + future.channel() + - ", command: " + details.getCommand() + ", params: " + LogHelper.toString(details.getParams()), future.cause())); + "Unable to send command! Node source: " + details.getSource() + ", connection: " + connection + + ", command: " + details.getCommand() + ", command params: " + LogHelper.toString(details.getParams()) + + " after " + details.getAttempt() + " retry attempts", future.cause())); if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) { if (!details.getAttemptPromise().tryFailure(details.getException())) { log.error(details.getException().getMessage()); @@ -743,6 +746,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { TimerTask timeoutTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { + MasterSlaveEntry entry = connectionManager.getEntry(connection.getRedisClient()); + ClientConnectionsEntry ee = entry.getEntry(connection.getRedisClient()); + if (ee != null && ee.getNodeType() == NodeType.SLAVE) { + ee.trySetupFistFail(); + } + details.getAttemptPromise().tryFailure( new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand() + " with params: " + LogHelper.toString(details.getParams()) + " channel: " + connection.getChannel())); @@ -898,6 +907,14 @@ public class CommandAsyncService implements CommandAsyncExecutor { free(details.getParams()); + if (!(future.cause() instanceof RedisTimeoutException)) { + MasterSlaveEntry entry = connectionManager.getEntry(details.getConnectionFuture().getNow().getRedisClient()); + ClientConnectionsEntry ee = entry.getEntry(details.getConnectionFuture().getNow().getRedisClient()); + if (ee != null && ee.getNodeType() == NodeType.SLAVE) { + ee.resetFirstFail(); + } + } + if (future.isSuccess()) { R res = future.getNow(); if (res instanceof ScanResult) { diff --git a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java index 0d3f8b99c..921ba0f7d 100644 --- a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java @@ -43,7 +43,7 @@ public class BaseMasterSlaveServersConfigeach slave node @@ -136,7 +136,7 @@ public class BaseMasterSlaveServersConfigslaveFailsInterval value. *

- * Default is 60000 + * Default is 180000 * * @param slaveFailsInterval - time interval in milliseconds * @return config diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index fd8a99849..4da556182 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -131,28 +131,28 @@ public class MasterSlaveEntry { return; } - masterEntry = new ClientConnectionsEntry( - client, - config.getMasterConnectionMinimumIdleSize(), - config.getMasterConnectionPoolSize(), - config.getSubscriptionConnectionMinimumIdleSize(), - config.getSubscriptionConnectionPoolSize(), - connectionManager, + masterEntry = new ClientConnectionsEntry( + client, + config.getMasterConnectionMinimumIdleSize(), + config.getMasterConnectionPoolSize(), + config.getSubscriptionConnectionMinimumIdleSize(), + config.getSubscriptionConnectionPoolSize(), + connectionManager, NodeType.MASTER); - + int counter = 1; if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { counter++; } CountableListener listener = new CountableListener(result, client, counter); - RFuture writeFuture = writeConnectionPool.add(masterEntry); - writeFuture.addListener(listener); - - if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { - RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); - pubSubFuture.addListener(listener); - } + RFuture writeFuture = writeConnectionPool.add(masterEntry); + writeFuture.addListener(listener); + + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + RFuture pubSubFuture = pubSubConnectionPool.add(masterEntry); + pubSubFuture.addListener(listener); + } } }); @@ -200,6 +200,7 @@ public class MasterSlaveEntry { connection.closeAsync(); reattachBlockingQueue(connection); } + entry.getAllConnections().clear(); for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) { connection.closeAsync(); @@ -229,7 +230,7 @@ public class MasterSlaveEntry { } final RedisConnection newConnection = future.getNow(); - + final FutureListener listener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -289,7 +290,7 @@ public class MasterSlaveEntry { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; - } + } ClientConnectionsEntry entry = new ClientConnectionsEntry(client, config.getSlaveConnectionMinimumIdleSize(), @@ -323,6 +324,10 @@ public class MasterSlaveEntry { return slaveBalancer.getEntries(); } + public ClientConnectionsEntry getEntry(RedisClient redisClient) { + return slaveBalancer.getEntry(redisClient); + } + public RedisClient getClient() { return masterEntry.getClient(); } @@ -495,15 +500,15 @@ public class MasterSlaveEntry { } slaveBalancer.returnConnection(connection); } - + public void incReference() { references++; } - + public int decReference() { return --references; } - + public int getReferences() { return references; } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 85beeacc7..ea56b9a48 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -218,7 +218,7 @@ public class LoadBalancerManager { return null; } - private ClientConnectionsEntry getEntry(RedisClient redisClient) { + public ClientConnectionsEntry getEntry(RedisClient redisClient) { return client2Entry.get(redisClient); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 3ffa62505..ecca1bf58 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -225,12 +225,12 @@ abstract class ConnectionPool { return result; } - + protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { if (entry.getNodeType() == NodeType.SLAVE && entry.isFailed()) { checkForReconnect(entry, null); return false; - } + } return true; } @@ -283,9 +283,6 @@ abstract class ConnectionPool { } private void connectedSuccessful(ClientConnectionsEntry entry, RPromise promise, T conn) { - if (entry.getNodeType() == NodeType.SLAVE) { - entry.resetFirstFail(); - } if (!promise.trySuccess(conn)) { releaseConnection(entry, conn); releaseConnection(entry); @@ -296,8 +293,8 @@ abstract class ConnectionPool { if (entry.getNodeType() == NodeType.SLAVE) { entry.trySetupFistFail(); if (entry.isFailed()) { - checkForReconnect(entry, cause); - } + checkForReconnect(entry, cause); + } } releaseConnection(entry); @@ -309,11 +306,12 @@ abstract class ConnectionPool { if (entry.getNodeType() == NodeType.SLAVE) { entry.trySetupFistFail(); if (entry.isFailed()) { - conn.closeAsync(); - checkForReconnect(entry, null); + conn.closeAsync(); + entry.getAllConnections().remove(conn); + checkForReconnect(entry, null); } else { - releaseConnection(entry, conn); - } + releaseConnection(entry, conn); + } } else { releaseConnection(entry, conn); } @@ -427,6 +425,7 @@ abstract class ConnectionPool { public void returnConnection(ClientConnectionsEntry entry, T connection) { if (entry.isFreezed() && !entry.isMasterForRead()) { connection.closeAsync(); + entry.getAllConnections().remove(connection); } else { releaseConnection(entry, connection); }