diff --git a/src/main/java/org/redisson/BaseConfig.java b/src/main/java/org/redisson/BaseConfig.java index 421229a5f..30b3e9e89 100644 --- a/src/main/java/org/redisson/BaseConfig.java +++ b/src/main/java/org/redisson/BaseConfig.java @@ -205,10 +205,12 @@ class BaseConfig> { * * @param failAttemptsAmount */ + @Deprecated public T setRefreshConnectionAfterFails(int failAttemptsAmount) { this.closeConnectionAfterFailAttempts = failAttemptsAmount; return (T) this; } + @Deprecated public int getRefreshConnectionAfterFails() { return closeConnectionAfterFailAttempts; } diff --git a/src/main/java/org/redisson/BaseMasterSlaveServersConfig.java b/src/main/java/org/redisson/BaseMasterSlaveServersConfig.java index f67ff7cc2..1bf82f1db 100644 --- a/src/main/java/org/redisson/BaseMasterSlaveServersConfig.java +++ b/src/main/java/org/redisson/BaseMasterSlaveServersConfig.java @@ -26,22 +26,37 @@ public class BaseMasterSlaveServersConfigeach slave node + * Redis 'slave' node minimum idle subscription (pub/sub) connection amount for each slave node + */ + private int slaveSubscriptionConnectionMinimumIdleSize = 1; + + /** + * Redis 'slave' node maximum subscription (pub/sub) connection pool size for each slave node */ private int slaveSubscriptionConnectionPoolSize = 25; /** - * Redis 'slave' servers connection pool size for each slave node + * Redis 'slave' node minimum idle connection amount for each slave node + */ + private int slaveConnectionMinimumIdleSize = 5; + + /** + * Redis 'slave' node maximum connection pool size for each slave node */ private int slaveConnectionPoolSize = 100; /** - * Redis 'master' server connection pool size + * Redis 'master' node minimum idle connection amount for each slave node + */ + private int masterConnectionMinimumIdleSize = 5; + + /** + * Redis 'master' node maximum connection pool size */ private int masterConnectionPoolSize = 100; /** - * Redis 'slave' server reconnection attempt timeout + * Redis 'slave' server reconnection attempt timeout in milliseconds * used then server excluded from the list of available slave nodes * due to reach limit of sequential unsuccessful execution attempts * @@ -67,10 +82,14 @@ public class BaseMasterSlaveServersConfigeach slave node + * Redis 'slave' servers connection pool size for each slave node. + * * Default is 100 * * @param slaveConnectionPoolSize @@ -85,11 +104,10 @@ public class BaseMasterSlaveServersConfigeach slave node + * Redis 'slave' node maximum subscription (pub/sub) connection pool size for each slave node + * * Default is 25 * - * @param slaveSubscriptionConnectionPoolSize - * @return */ public T setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) { this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize; @@ -133,9 +150,11 @@ public class BaseMasterSlaveServersConfigslaveFailedAttempts + * when sequential unsuccessful execution attempts of any Redis command on slave node reaches slaveFailedAttempts. + * + * Default is 3 + * */ public T setSlaveFailedAttempts(int slaveFailedAttempts) { this.slaveFailedAttempts = slaveFailedAttempts; @@ -160,4 +182,45 @@ public class BaseMasterSlaveServersConfigeach slave node + * + * Default is 5 + * + */ + public T setSlaveConnectionMinimumIdleSize(int slaveConnectionMinimumIdleSize) { + this.slaveConnectionMinimumIdleSize = slaveConnectionMinimumIdleSize; + return (T) this; + } + public int getSlaveConnectionMinimumIdleSize() { + return slaveConnectionMinimumIdleSize; + } + + /** + * Redis 'master' node minimum idle connection amount for each slave node + * + * Default is 5 + * + */ + public T setMasterConnectionMinimumIdleSize(int masterConnectionMinimumIdleSize) { + this.masterConnectionMinimumIdleSize = masterConnectionMinimumIdleSize; + return (T) this; + } + public int getMasterConnectionMinimumIdleSize() { + return masterConnectionMinimumIdleSize; + } + + /** + * Redis 'slave' node minimum idle subscription (pub/sub) connection amount for each slave node + * Default is 1 + * + */ + public T setSlaveSubscriptionConnectionMinimumIdleSize(int slaveSubscriptionConnectionMinimumIdleSize) { + this.slaveSubscriptionConnectionMinimumIdleSize = slaveSubscriptionConnectionMinimumIdleSize; + return (T) this; + } + public int getSlaveSubscriptionConnectionMinimumIdleSize() { + return slaveSubscriptionConnectionMinimumIdleSize; + } + } diff --git a/src/main/java/org/redisson/SingleServerConfig.java b/src/main/java/org/redisson/SingleServerConfig.java index dfa30f453..57a6b848e 100644 --- a/src/main/java/org/redisson/SingleServerConfig.java +++ b/src/main/java/org/redisson/SingleServerConfig.java @@ -28,22 +28,32 @@ public class SingleServerConfig extends BaseConfig { private URI address; /** - * Redis subscription connection pool size + * Minimum idle subscription connection amount + */ + private int subscriptionConnectionMinimumIdleSize = 1; + + /** + * Redis subscription connection maximum pool size * */ private int subscriptionConnectionPoolSize = 25; /** - * Redis connection pool size + * Minimum idle Redis connection amount + */ + private int connectionMinimumIdleSize = 5; + + /** + * Redis connection maximum pool size */ private int connectionPoolSize = 100; - + /** - * Should the server address be monitored for changes in DNS? Useful for + * Should the server address be monitored for changes in DNS? Useful for * AWS ElastiCache where the client is pointed at the endpoint for a replication group * which is a DNS alias to the current master node.
- * NB: applications must ensure the JVM DNS cache TTL is low enough to support this. + * NB: applications must ensure the JVM DNS cache TTL is low enough to support this. * e.g., http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-jvm-ttl.html */ private boolean dnsMonitoring = false; @@ -52,7 +62,7 @@ public class SingleServerConfig extends BaseConfig { * Interval in milliseconds to check DNS */ private long dnsMonitoringInterval = 5000; - + SingleServerConfig() { } @@ -63,6 +73,8 @@ public class SingleServerConfig extends BaseConfig { setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize()); setDnsMonitoring(config.isDnsMonitoring()); setDnsMonitoringInterval(config.getDnsMonitoringInterval()); + setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize()); + setConnectionMinimumIdleSize(config.getConnectionMinimumIdleSize()); } /** @@ -112,8 +124,9 @@ public class SingleServerConfig extends BaseConfig { /** * Monitoring of the endpoint address for DNS changes. - * Default is false. - * + * + * Default is false + * * @param dnsMonitoring * @return */ @@ -127,8 +140,9 @@ public class SingleServerConfig extends BaseConfig { /** * Interval in milliseconds to check the endpoint DNS if {@link #isDnsMonitoring()} is true. - * Default is 5000. - * + * + * Default is 5000 + * * @param dnsMonitoringInterval * @return */ @@ -139,4 +153,33 @@ public class SingleServerConfig extends BaseConfig { public long getDnsMonitoringInterval() { return dnsMonitoringInterval; } + + /** + * Minimum idle subscription connection amount. + * + * Default is 1 + * + */ + public SingleServerConfig setSubscriptionConnectionMinimumIdleSize(int subscriptionConnectionMinimumIdleSize) { + this.subscriptionConnectionMinimumIdleSize = subscriptionConnectionMinimumIdleSize; + return this; + } + public int getSubscriptionConnectionMinimumIdleSize() { + return subscriptionConnectionMinimumIdleSize; + } + + /** + * Minimum idle Redis connection amount. + * + * Default is 5 + * + */ + public SingleServerConfig setConnectionMinimumIdleSize(int connectionMinimumIdleSize) { + this.connectionMinimumIdleSize = connectionMinimumIdleSize; + return this; + } + public int getConnectionMinimumIdleSize() { + return connectionMinimumIdleSize; + } + } diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index b0064de55..d6ad12f8d 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -394,6 +394,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); + c.setConnectTimeout(cfg.getConnectTimeout()); + + c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts()); + c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout()); + c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize()); + c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize()); + c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize()); + return c; } diff --git a/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java b/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java index e48bc419b..d5bf79cf0 100644 --- a/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java +++ b/src/main/java/org/redisson/connection/ElasticacheConnectionManager.java @@ -159,6 +159,14 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager { c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); + c.setConnectTimeout(cfg.getConnectTimeout()); + + c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts()); + c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout()); + c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize()); + c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize()); + c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize()); + return c; } diff --git a/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/src/main/java/org/redisson/connection/MasterSlaveEntry.java index b837d878c..06ce6ec80 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -66,13 +66,17 @@ public class MasterSlaveEntry { slaveBalancer = config.getLoadBalancer(); slaveBalancer.init(config, connectionManager, this); + initSlaveBalancer(config); + + writeConnectionHolder = new ConnectionPool(config, null, connectionManager, this); + } + + protected void initSlaveBalancer(MasterSlaveServersConfig config) { boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty(); addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER); for (URI address : config.getSlaveAddresses()) { addSlave(address.getHost(), address.getPort(), false, NodeType.SLAVE); } - - writeConnectionHolder = new ConnectionPool(config, null, connectionManager, this); } public void setupMasterEntry(String host, int port) { diff --git a/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 12b3fdc8a..defdaba71 100755 --- a/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -67,6 +67,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); + c.setConnectTimeout(cfg.getConnectTimeout()); + + c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts()); + c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout()); + c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize()); + c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize()); + c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize()); List disconnectedSlaves = new ArrayList(); for (URI addr : cfg.getSentinelAddresses()) { diff --git a/src/main/java/org/redisson/connection/SingleConnectionManager.java b/src/main/java/org/redisson/connection/SingleConnectionManager.java index 57a95fd06..707024b3a 100644 --- a/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -17,7 +17,6 @@ package org.redisson.connection; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.Collections; import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +55,10 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize()); newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); + newconfig.setConnectTimeout(cfg.getConnectTimeout()); + + newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize()); + newconfig.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); init(newconfig, config); diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index 31fc2b257..3bac89d68 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -47,6 +47,10 @@ public class SingleEntry extends MasterSlaveEntry { pubSubConnectionHolder.add(masterEntry); } + @Override + protected void initSlaveBalancer(MasterSlaveServersConfig config) { + } + @Override Future nextPubSubConnection() { return pubSubConnectionHolder.get(); diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index 3003e68c1..02bef6fce 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisConnection; @@ -59,9 +60,48 @@ public class ConnectionPool { this.connectionManager = connectionManager; } - public void add(SubscribesConnectionEntry entry) { - entries.add(entry); - handleQueue(entry); + public void add(final SubscribesConnectionEntry entry) { + // is it a master connection pool? + int minimumIdleSize = getMinimumIdleSize(entry); + + if (minimumIdleSize == 0) { + entries.add(entry); + handleQueue(entry); + return; + } + + final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize); + for (int i = 0; i < minimumIdleSize; i++) { + if (entry.isFreezed() || !tryAcquireConnection(entry)) { + continue; + } + + Promise promise = connectionManager.newPromise(); + connect(entry, promise); + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + T conn = future.getNow(); + releaseConnection(entry, conn); + } + releaseConnection(entry); + + if (completedConnections.decrementAndGet() == 0) { + entries.add(entry); + handleQueue(entry); + } + } + }); + } + } + + protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { + int minimumIdleSize = config.getSlaveConnectionMinimumIdleSize(); + if (entry.getNodeType() == NodeType.MASTER && loadBalancer == null) { + minimumIdleSize = config.getMasterConnectionMinimumIdleSize(); + } + return minimumIdleSize; } public void remove(SubscribesConnectionEntry entry) { diff --git a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java index 30bea5163..483c6cdbf 100644 --- a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java +++ b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java @@ -36,6 +36,11 @@ public class PubSubConnectionPoll extends ConnectionPool return entry.pollFreeSubscribeConnection(); } + @Override + protected int getMinimumIdleSize(SubscribesConnectionEntry entry) { + return config.getSlaveSubscriptionConnectionMinimumIdleSize(); + } + @Override protected Future connect(SubscribesConnectionEntry entry) { return entry.connectPubSub(config); diff --git a/src/test/java/org/redisson/RedissonTopicPatternTest.java b/src/test/java/org/redisson/RedissonTopicPatternTest.java index fe5718035..f4dea44a7 100644 --- a/src/test/java/org/redisson/RedissonTopicPatternTest.java +++ b/src/test/java/org/redisson/RedissonTopicPatternTest.java @@ -77,7 +77,7 @@ public class RedissonTopicPatternTest { // topic1 = redisson.getPatternTopic("topic1.*"); redisson.getTopic("topic1.t3").publish(new Message("123")); - messageRecieved.await(); + Assert.assertTrue(messageRecieved.await(5, TimeUnit.SECONDS)); redisson.shutdown(); }