'slaveSubscriptionConnectionMinimumIdleSize', 'slaveConnectionMinimumIdleSize' and 'masterConnectionMinimumIdleSize' params were added. #234

pull/297/head
Nikita 9 years ago
parent 7515e3b5d1
commit 323ab1e57e

@ -205,10 +205,12 @@ class BaseConfig<T extends BaseConfig<T>> {
* *
* @param failAttemptsAmount * @param failAttemptsAmount
*/ */
@Deprecated
public T setRefreshConnectionAfterFails(int failAttemptsAmount) { public T setRefreshConnectionAfterFails(int failAttemptsAmount) {
this.closeConnectionAfterFailAttempts = failAttemptsAmount; this.closeConnectionAfterFailAttempts = failAttemptsAmount;
return (T) this; return (T) this;
} }
@Deprecated
public int getRefreshConnectionAfterFails() { public int getRefreshConnectionAfterFails() {
return closeConnectionAfterFailAttempts; return closeConnectionAfterFailAttempts;
} }

@ -26,22 +26,37 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer(); private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
/** /**
* Redis 'slave' servers subscription (pub/sub) connection pool size for <b>each</b> slave node * Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node
*/
private int slaveSubscriptionConnectionMinimumIdleSize = 1;
/**
* Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node
*/ */
private int slaveSubscriptionConnectionPoolSize = 25; private int slaveSubscriptionConnectionPoolSize = 25;
/** /**
* Redis 'slave' servers connection pool size for <b>each</b> slave node * Redis 'slave' node minimum idle connection amount for <b>each</b> slave node
*/
private int slaveConnectionMinimumIdleSize = 5;
/**
* Redis 'slave' node maximum connection pool size for <b>each</b> slave node
*/ */
private int slaveConnectionPoolSize = 100; private int slaveConnectionPoolSize = 100;
/** /**
* Redis 'master' server connection pool size * Redis 'master' node minimum idle connection amount for <b>each</b> slave node
*/
private int masterConnectionMinimumIdleSize = 5;
/**
* Redis 'master' node maximum connection pool size
*/ */
private int masterConnectionPoolSize = 100; private int masterConnectionPoolSize = 100;
/** /**
* Redis 'slave' server reconnection attempt timeout * Redis 'slave' server reconnection attempt timeout in milliseconds
* used then server excluded from the list of available slave nodes * used then server excluded from the list of available slave nodes
* due to reach limit of sequential unsuccessful execution attempts * due to reach limit of sequential unsuccessful execution attempts
* *
@ -67,10 +82,14 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
setSlaveSubscriptionConnectionPoolSize(config.getSlaveSubscriptionConnectionPoolSize()); setSlaveSubscriptionConnectionPoolSize(config.getSlaveSubscriptionConnectionPoolSize());
setSlaveFailedAttempts(config.getSlaveFailedAttempts()); setSlaveFailedAttempts(config.getSlaveFailedAttempts());
setSlaveReconnectionTimeout(config.getSlaveReconnectionTimeout()); setSlaveReconnectionTimeout(config.getSlaveReconnectionTimeout());
setMasterConnectionMinimumIdleSize(config.getMasterConnectionMinimumIdleSize());
setSlaveConnectionMinimumIdleSize(config.getSlaveConnectionMinimumIdleSize());
setSlaveSubscriptionConnectionMinimumIdleSize(config.getSlaveSubscriptionConnectionMinimumIdleSize());
} }
/** /**
* Redis 'slave' servers connection pool size for <b>each</b> slave node * Redis 'slave' servers connection pool size for <b>each</b> slave node.
*
* Default is 100 * Default is 100
* *
* @param slaveConnectionPoolSize * @param slaveConnectionPoolSize
@ -85,11 +104,10 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
} }
/** /**
* Redis 'master' server connection pool size * Redis 'master' server connection pool size.
*
* Default is 100 * Default is 100
* *
* @param masterConnectionPoolSize
* @return
*/ */
public T setMasterConnectionPoolSize(int masterConnectionPoolSize) { public T setMasterConnectionPoolSize(int masterConnectionPoolSize) {
this.masterConnectionPoolSize = masterConnectionPoolSize; this.masterConnectionPoolSize = masterConnectionPoolSize;
@ -118,11 +136,10 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
} }
/** /**
* Redis 'slave' servers subscription connection pool size for <b>each</b> slave node * Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node
*
* Default is 25 * Default is 25
* *
* @param slaveSubscriptionConnectionPoolSize
* @return
*/ */
public T setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) { public T setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) {
this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize; this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize;
@ -133,9 +150,11 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
} }
/** /**
* Redis 'slave' server reconnection attempt timeout * Redis 'slave' server reconnection attempt timeout in milliseconds
* used then server excluded from the list of available slave nodes * used then server excluded from the list of available slave nodes
* due to reach limit of sequential unsuccessful execution attempts * due to reach limit of sequential unsuccessful execution attempts.
*
* Default is 3000
* *
* @see #slaveFailedAttempts * @see #slaveFailedAttempts
* *
@ -150,7 +169,10 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
/** /**
* Redis 'slave' server will be excluded from the list of available slave nodes * Redis 'slave' server will be excluded from the list of available slave nodes
* when sequential unsuccessful execution attempts of any Redis command on slave node reaches <code>slaveFailedAttempts</code> * when sequential unsuccessful execution attempts of any Redis command on slave node reaches <code>slaveFailedAttempts</code>.
*
* Default is 3
*
*/ */
public T setSlaveFailedAttempts(int slaveFailedAttempts) { public T setSlaveFailedAttempts(int slaveFailedAttempts) {
this.slaveFailedAttempts = slaveFailedAttempts; this.slaveFailedAttempts = slaveFailedAttempts;
@ -160,4 +182,45 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return slaveFailedAttempts; return slaveFailedAttempts;
} }
/**
* Redis 'slave' node minimum idle connection amount for <b>each</b> slave node
*
* Default is 5
*
*/
public T setSlaveConnectionMinimumIdleSize(int slaveConnectionMinimumIdleSize) {
this.slaveConnectionMinimumIdleSize = slaveConnectionMinimumIdleSize;
return (T) this;
}
public int getSlaveConnectionMinimumIdleSize() {
return slaveConnectionMinimumIdleSize;
}
/**
* Redis 'master' node minimum idle connection amount for <b>each</b> slave node
*
* Default is 5
*
*/
public T setMasterConnectionMinimumIdleSize(int masterConnectionMinimumIdleSize) {
this.masterConnectionMinimumIdleSize = masterConnectionMinimumIdleSize;
return (T) this;
}
public int getMasterConnectionMinimumIdleSize() {
return masterConnectionMinimumIdleSize;
}
/**
* Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node
* Default is 1
*
*/
public T setSlaveSubscriptionConnectionMinimumIdleSize(int slaveSubscriptionConnectionMinimumIdleSize) {
this.slaveSubscriptionConnectionMinimumIdleSize = slaveSubscriptionConnectionMinimumIdleSize;
return (T) this;
}
public int getSlaveSubscriptionConnectionMinimumIdleSize() {
return slaveSubscriptionConnectionMinimumIdleSize;
}
} }

@ -28,13 +28,23 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
private URI address; private URI address;
/** /**
* Redis subscription connection pool size * Minimum idle subscription connection amount
*/
private int subscriptionConnectionMinimumIdleSize = 1;
/**
* Redis subscription connection maximum pool size
* *
*/ */
private int subscriptionConnectionPoolSize = 25; private int subscriptionConnectionPoolSize = 25;
/** /**
* Redis connection pool size * Minimum idle Redis connection amount
*/
private int connectionMinimumIdleSize = 5;
/**
* Redis connection maximum pool size
*/ */
private int connectionPoolSize = 100; private int connectionPoolSize = 100;
@ -63,6 +73,8 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize()); setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize());
setDnsMonitoring(config.isDnsMonitoring()); setDnsMonitoring(config.isDnsMonitoring());
setDnsMonitoringInterval(config.getDnsMonitoringInterval()); setDnsMonitoringInterval(config.getDnsMonitoringInterval());
setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize());
setConnectionMinimumIdleSize(config.getConnectionMinimumIdleSize());
} }
/** /**
@ -112,7 +124,8 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
/** /**
* Monitoring of the endpoint address for DNS changes. * Monitoring of the endpoint address for DNS changes.
* Default is false. *
* Default is false
* *
* @param dnsMonitoring * @param dnsMonitoring
* @return * @return
@ -127,7 +140,8 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
/** /**
* Interval in milliseconds to check the endpoint DNS if {@link #isDnsMonitoring()} is true. * Interval in milliseconds to check the endpoint DNS if {@link #isDnsMonitoring()} is true.
* Default is 5000. *
* Default is 5000
* *
* @param dnsMonitoringInterval * @param dnsMonitoringInterval
* @return * @return
@ -139,4 +153,33 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
public long getDnsMonitoringInterval() { public long getDnsMonitoringInterval() {
return dnsMonitoringInterval; return dnsMonitoringInterval;
} }
/**
* Minimum idle subscription connection amount.
*
* Default is 1
*
*/
public SingleServerConfig setSubscriptionConnectionMinimumIdleSize(int subscriptionConnectionMinimumIdleSize) {
this.subscriptionConnectionMinimumIdleSize = subscriptionConnectionMinimumIdleSize;
return this;
}
public int getSubscriptionConnectionMinimumIdleSize() {
return subscriptionConnectionMinimumIdleSize;
}
/**
* Minimum idle Redis connection amount.
*
* Default is 5
*
*/
public SingleServerConfig setConnectionMinimumIdleSize(int connectionMinimumIdleSize) {
this.connectionMinimumIdleSize = connectionMinimumIdleSize;
return this;
}
public int getConnectionMinimumIdleSize() {
return connectionMinimumIdleSize;
}
} }

@ -394,6 +394,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
c.setConnectTimeout(cfg.getConnectTimeout());
c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts());
c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout());
c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize());
c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize());
c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize());
return c; return c;
} }

@ -159,6 +159,14 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
c.setConnectTimeout(cfg.getConnectTimeout());
c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts());
c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout());
c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize());
c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize());
c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize());
return c; return c;
} }

@ -66,13 +66,17 @@ public class MasterSlaveEntry<E extends ConnectionEntry> {
slaveBalancer = config.getLoadBalancer(); slaveBalancer = config.getLoadBalancer();
slaveBalancer.init(config, connectionManager, this); slaveBalancer.init(config, connectionManager, this);
initSlaveBalancer(config);
writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager, this);
}
protected void initSlaveBalancer(MasterSlaveServersConfig config) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty(); boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty();
addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER); addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
for (URI address : config.getSlaveAddresses()) { for (URI address : config.getSlaveAddresses()) {
addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE); addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE);
} }
writeConnectionHolder = new ConnectionPool<RedisConnection>(config, null, connectionManager, this);
} }
public void setupMasterEntry(String host, int port) { public void setupMasterEntry(String host, int port) {

@ -67,6 +67,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
c.setConnectTimeout(cfg.getConnectTimeout());
c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts());
c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout());
c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize());
c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize());
c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize());
List<String> disconnectedSlaves = new ArrayList<String>(); List<String> disconnectedSlaves = new ArrayList<String>();
for (URI addr : cfg.getSentinelAddresses()) { for (URI addr : cfg.getSentinelAddresses()) {

@ -17,7 +17,6 @@ package org.redisson.connection;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -56,6 +55,10 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize()); newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize());
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
newconfig.setConnectTimeout(cfg.getConnectTimeout());
newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize());
newconfig.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
init(newconfig, config); init(newconfig, config);

@ -47,6 +47,10 @@ public class SingleEntry extends MasterSlaveEntry<SubscribesConnectionEntry> {
pubSubConnectionHolder.add(masterEntry); pubSubConnectionHolder.add(masterEntry);
} }
@Override
protected void initSlaveBalancer(MasterSlaveServersConfig config) {
}
@Override @Override
Future<RedisPubSubConnection> nextPubSubConnection() { Future<RedisPubSubConnection> nextPubSubConnection() {
return pubSubConnectionHolder.get(); return pubSubConnectionHolder.get();

@ -19,6 +19,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
@ -59,10 +60,49 @@ public class ConnectionPool<T extends RedisConnection> {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
} }
public void add(SubscribesConnectionEntry entry) { public void add(final SubscribesConnectionEntry entry) {
// is it a master connection pool?
int minimumIdleSize = getMinimumIdleSize(entry);
if (minimumIdleSize == 0) {
entries.add(entry);
handleQueue(entry);
return;
}
final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize);
for (int i = 0; i < minimumIdleSize; i++) {
if (entry.isFreezed() || !tryAcquireConnection(entry)) {
continue;
}
Promise<T> promise = connectionManager.newPromise();
connect(entry, promise);
promise.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (future.isSuccess()) {
T conn = future.getNow();
releaseConnection(entry, conn);
}
releaseConnection(entry);
if (completedConnections.decrementAndGet() == 0) {
entries.add(entry); entries.add(entry);
handleQueue(entry); handleQueue(entry);
} }
}
});
}
}
protected int getMinimumIdleSize(SubscribesConnectionEntry entry) {
int minimumIdleSize = config.getSlaveConnectionMinimumIdleSize();
if (entry.getNodeType() == NodeType.MASTER && loadBalancer == null) {
minimumIdleSize = config.getMasterConnectionMinimumIdleSize();
}
return minimumIdleSize;
}
public void remove(SubscribesConnectionEntry entry) { public void remove(SubscribesConnectionEntry entry) {
entries.remove(entry); entries.remove(entry);

@ -36,6 +36,11 @@ public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection>
return entry.pollFreeSubscribeConnection(); return entry.pollFreeSubscribeConnection();
} }
@Override
protected int getMinimumIdleSize(SubscribesConnectionEntry entry) {
return config.getSlaveSubscriptionConnectionMinimumIdleSize();
}
@Override @Override
protected Future<RedisPubSubConnection> connect(SubscribesConnectionEntry entry) { protected Future<RedisPubSubConnection> connect(SubscribesConnectionEntry entry) {
return entry.connectPubSub(config); return entry.connectPubSub(config);

@ -77,7 +77,7 @@ public class RedissonTopicPatternTest {
// topic1 = redisson.getPatternTopic("topic1.*"); // topic1 = redisson.getPatternTopic("topic1.*");
redisson.getTopic("topic1.t3").publish(new Message("123")); redisson.getTopic("topic1.t3").publish(new Message("123"));
messageRecieved.await(); Assert.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS));
redisson.shutdown(); redisson.shutdown();
} }

Loading…
Cancel
Save