From 7e31aad67a1e08486fbd0f9e79d1364fd426480d Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 11 Sep 2015 19:34:49 +0300 Subject: [PATCH] closeConnectionAfterFailAttempts. #198 --- src/main/java/org/redisson/BaseConfig.java | 36 ++++++++++++++----- .../org/redisson/client/RedisConnection.java | 13 +++++++ .../redisson/connection/BaseLoadBalancer.java | 2 +- .../connection/ClusterConnectionManager.java | 1 + .../MasterSlaveConnectionManager.java | 12 +++++++ .../redisson/connection/MasterSlaveEntry.java | 4 +++ .../connection/SentinelConnectionManager.java | 1 + .../connection/SingleConnectionManager.java | 7 ++-- 8 files changed, 63 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/redisson/BaseConfig.java b/src/main/java/org/redisson/BaseConfig.java index 71b403999..6aac92a5d 100644 --- a/src/main/java/org/redisson/BaseConfig.java +++ b/src/main/java/org/redisson/BaseConfig.java @@ -35,6 +35,8 @@ class BaseConfig> { private int retryInterval = 1000; + private int closeConnectionAfterFailAttempts = -1; + /** * Database index used for Redis connection */ @@ -67,6 +69,7 @@ class BaseConfig> { setTimeout(config.getTimeout()); setClientName(config.getClientName()); setPingTimeout(config.getPingTimeout()); + setCloseConnectionAfterFailAttempts(config.getCloseConnectionAfterFailAttempts()); } /** @@ -159,30 +162,45 @@ class BaseConfig> { } /** - * Name for client connection + * Setup connection name during connection init + * via CLIENT SETNAME command + * + * @param name */ - public String getClientName() { - return clientName; - } - public T setClientName(String clientName) { this.clientName = clientName; return (T) this; } + public String getClientName() { + return clientName; + } + /** * Ping timeout used in Node.ping and Node.pingAll operation * - * @return + * @param ping timeout in milliseconds */ - public int getPingTimeout() { - return pingTimeout; - } public T setPingTimeout(int pingTimeout) { this.pingTimeout = pingTimeout; return (T) this; } + public int getPingTimeout() { + return pingTimeout; + } + /** + * Close connection if it has failAttemptsAmount + * fails in a row during command sending. Turned off by default. + * + * @param failAttemptsAmount + */ + public void setCloseConnectionAfterFailAttempts(int failAttemptsAmount) { + this.closeConnectionAfterFailAttempts = failAttemptsAmount; + } + public int getCloseConnectionAfterFailAttempts() { + return closeConnectionAfterFailAttempts; + } } diff --git a/src/main/java/org/redisson/client/RedisConnection.java b/src/main/java/org/redisson/client/RedisConnection.java index 05fa233d1..837745e18 100644 --- a/src/main/java/org/redisson/client/RedisConnection.java +++ b/src/main/java/org/redisson/client/RedisConnection.java @@ -39,6 +39,7 @@ public class RedisConnection implements RedisCommands { private volatile boolean closed; volatile Channel channel; private ReconnectListener reconnectListener; + private int failAttempts; public RedisConnection(RedisClient redisClient, Channel channel) { super(); @@ -55,6 +56,18 @@ public class RedisConnection implements RedisCommands { return reconnectListener; } + public void resetFailAttempt() { + failAttempts = 0; + } + + public void incFailAttempt() { + failAttempts++; + } + + public int getFailAttempts() { + return failAttempts; + } + public void updateChannel(Channel channel) { this.channel = channel; channel.attr(CONNECTION).set(this); diff --git a/src/main/java/org/redisson/connection/BaseLoadBalancer.java b/src/main/java/org/redisson/connection/BaseLoadBalancer.java index 20976ea83..11d3d89f2 100644 --- a/src/main/java/org/redisson/connection/BaseLoadBalancer.java +++ b/src/main/java/org/redisson/connection/BaseLoadBalancer.java @@ -225,7 +225,7 @@ abstract class BaseLoadBalancer implements LoadBalancer { public void returnConnection(RedisConnection connection) { SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); - if (entry.isFreezed()) { + if (entry.isFreezed() || connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) { connection.closeAsync(); } else { entry.getConnections().add(connection); diff --git a/src/main/java/org/redisson/connection/ClusterConnectionManager.java b/src/main/java/org/redisson/connection/ClusterConnectionManager.java index 9b9929d8c..a1fe7edb3 100644 --- a/src/main/java/org/redisson/connection/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/connection/ClusterConnectionManager.java @@ -244,6 +244,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { c.setPassword(cfg.getPassword()); c.setDatabase(cfg.getDatabase()); c.setClientName(cfg.getClientName()); + c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 08f7928ee..b64de1e78 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -166,6 +166,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + conn.incFailAttempt(); + } else { + conn.resetFailAttempt(); + } + shutdownLatch.release(); timeout.cancel(); releaseWrite(slot, conn); @@ -179,6 +185,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return new FutureListener() { @Override public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + conn.incFailAttempt(); + } else { + conn.resetFailAttempt(); + } + shutdownLatch.release(); timeout.cancel(); releaseRead(slot, conn); diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index dde924220..097cbe20f 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -175,6 +175,10 @@ public class MasterSlaveEntry { if (!entry.getClient().equals(connection.getRedisClient())) { connection.closeAsync(); return; + } else if (connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) { + connection.closeAsync(); + entry.getConnectionsSemaphore().release(); + return; } entry.getConnections().add(connection); diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 03ae9122c..eb78e4572 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -61,6 +61,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { c.setPassword(cfg.getPassword()); c.setDatabase(cfg.getDatabase()); c.setClientName(cfg.getClientName()); + c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index f267fc41c..18baafdf5 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -37,7 +37,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { private final AtomicReference currentMaster = new AtomicReference(); private ScheduledFuture monitorFuture; - + public SingleConnectionManager(SingleServerConfig cfg, Config config) { MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig(); String addr = cfg.getAddress().getHost() + ":" + cfg.getAddress().getPort(); @@ -48,13 +48,14 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { newconfig.setPassword(cfg.getPassword()); newconfig.setDatabase(cfg.getDatabase()); newconfig.setClientName(cfg.getClientName()); + newconfig.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts()); newconfig.setMasterAddress(addr); newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize()); newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); init(newconfig, config); - + if (cfg.isDnsMonitoring()) { try { this.currentMaster.set(InetAddress.getByName(cfg.getAddress().getHost())); @@ -71,7 +72,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { SingleEntry entry = new SingleEntry(0, MAX_SLOT, this, config); entries.put(MAX_SLOT, entry); } - + private void monitorDnsChange(final SingleServerConfig cfg) { monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() { @Override