closeConnectionAfterFailAttempts. #198

pull/255/head
Nikita 10 years ago
parent 7ea7e82e40
commit 7e31aad67a

@ -35,6 +35,8 @@ class BaseConfig<T extends BaseConfig<T>> {
private int retryInterval = 1000; private int retryInterval = 1000;
private int closeConnectionAfterFailAttempts = -1;
/** /**
* Database index used for Redis connection * Database index used for Redis connection
*/ */
@ -67,6 +69,7 @@ class BaseConfig<T extends BaseConfig<T>> {
setTimeout(config.getTimeout()); setTimeout(config.getTimeout());
setClientName(config.getClientName()); setClientName(config.getClientName());
setPingTimeout(config.getPingTimeout()); setPingTimeout(config.getPingTimeout());
setCloseConnectionAfterFailAttempts(config.getCloseConnectionAfterFailAttempts());
} }
/** /**
@ -159,30 +162,45 @@ class BaseConfig<T extends BaseConfig<T>> {
} }
/** /**
* Name for client connection * Setup connection name during connection init
* via CLIENT SETNAME command
*
* @param name
*/ */
public String getClientName() {
return clientName;
}
public T setClientName(String clientName) { public T setClientName(String clientName) {
this.clientName = clientName; this.clientName = clientName;
return (T) this; return (T) this;
} }
public String getClientName() {
return clientName;
}
/** /**
* Ping timeout used in <code>Node.ping</code> and <code>Node.pingAll<code> operation * Ping timeout used in <code>Node.ping</code> and <code>Node.pingAll<code> operation
* *
* @return * @param ping timeout in milliseconds
*/ */
public int getPingTimeout() {
return pingTimeout;
}
public T setPingTimeout(int pingTimeout) { public T setPingTimeout(int pingTimeout) {
this.pingTimeout = pingTimeout; this.pingTimeout = pingTimeout;
return (T) this; return (T) this;
} }
public int getPingTimeout() {
return pingTimeout;
}
/**
* Close connection if it has <code>failAttemptsAmount</code>
* fails in a row during command sending. Turned off by default.
*
* @param failAttemptsAmount
*/
public void setCloseConnectionAfterFailAttempts(int failAttemptsAmount) {
this.closeConnectionAfterFailAttempts = failAttemptsAmount;
}
public int getCloseConnectionAfterFailAttempts() {
return closeConnectionAfterFailAttempts;
}
} }

@ -39,6 +39,7 @@ public class RedisConnection implements RedisCommands {
private volatile boolean closed; private volatile boolean closed;
volatile Channel channel; volatile Channel channel;
private ReconnectListener reconnectListener; private ReconnectListener reconnectListener;
private int failAttempts;
public RedisConnection(RedisClient redisClient, Channel channel) { public RedisConnection(RedisClient redisClient, Channel channel) {
super(); super();
@ -55,6 +56,18 @@ public class RedisConnection implements RedisCommands {
return reconnectListener; return reconnectListener;
} }
public void resetFailAttempt() {
failAttempts = 0;
}
public void incFailAttempt() {
failAttempts++;
}
public int getFailAttempts() {
return failAttempts;
}
public void updateChannel(Channel channel) { public void updateChannel(Channel channel) {
this.channel = channel; this.channel = channel;
channel.attr(CONNECTION).set(this); channel.attr(CONNECTION).set(this);

@ -225,7 +225,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public void returnConnection(RedisConnection connection) { public void returnConnection(RedisConnection connection) {
SubscribesConnectionEntry entry = clients.get(connection.getRedisClient()); SubscribesConnectionEntry entry = clients.get(connection.getRedisClient());
if (entry.isFreezed()) { if (entry.isFreezed() || connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) {
connection.closeAsync(); connection.closeAsync();
} else { } else {
entry.getConnections().add(connection); entry.getConnections().add(connection);

@ -244,6 +244,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
c.setPassword(cfg.getPassword()); c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase()); c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName()); c.setClientName(cfg.getClientName());
c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());

@ -166,6 +166,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new FutureListener<T>() { return new FutureListener<T>() {
@Override @Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception { public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
if (!future.isSuccess()) {
conn.incFailAttempt();
} else {
conn.resetFailAttempt();
}
shutdownLatch.release(); shutdownLatch.release();
timeout.cancel(); timeout.cancel();
releaseWrite(slot, conn); releaseWrite(slot, conn);
@ -179,6 +185,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new FutureListener<T>() { return new FutureListener<T>() {
@Override @Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception { public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
if (!future.isSuccess()) {
conn.incFailAttempt();
} else {
conn.resetFailAttempt();
}
shutdownLatch.release(); shutdownLatch.release();
timeout.cancel(); timeout.cancel();
releaseRead(slot, conn); releaseRead(slot, conn);

@ -175,6 +175,10 @@ public class MasterSlaveEntry {
if (!entry.getClient().equals(connection.getRedisClient())) { if (!entry.getClient().equals(connection.getRedisClient())) {
connection.closeAsync(); connection.closeAsync();
return; return;
} else if (connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) {
connection.closeAsync();
entry.getConnectionsSemaphore().release();
return;
} }
entry.getConnections().add(connection); entry.getConnections().add(connection);

@ -61,6 +61,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.setPassword(cfg.getPassword()); c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase()); c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName()); c.setClientName(cfg.getClientName());
c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());

@ -48,6 +48,7 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setPassword(cfg.getPassword()); newconfig.setPassword(cfg.getPassword());
newconfig.setDatabase(cfg.getDatabase()); newconfig.setDatabase(cfg.getDatabase());
newconfig.setClientName(cfg.getClientName()); newconfig.setClientName(cfg.getClientName());
newconfig.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
newconfig.setMasterAddress(addr); newconfig.setMasterAddress(addr);
newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize()); newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize());
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());

Loading…
Cancel
Save