diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index 23c64625c..003324b2f 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -38,6 +38,11 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; +/** + * + * @author Nikita Koksharov + * + */ public class RedisConnection implements RedisCommands { private static final AttributeKey CONNECTION = AttributeKey.valueOf("connection"); diff --git a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java index 7103d1af0..ea8566ed2 100644 --- a/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisPubSubConnection.java @@ -40,6 +40,11 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; +/** + * + * @author Nikita Koksharov + * + */ public class RedisPubSubConnection extends RedisConnection { final Queue> listeners = new ConcurrentLinkedQueue>(); diff --git a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java index 9965501c8..c79869783 100644 --- a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java @@ -31,16 +31,6 @@ public class BaseMasterSlaveServersConfigeach slave node - */ - private int slaveSubscriptionConnectionMinimumIdleSize = 1; - - /** - * Redis 'slave' node maximum subscription (pub/sub) connection pool size for each slave node - */ - private int slaveSubscriptionConnectionPoolSize = 50; - /** * Redis 'slave' node minimum idle connection amount for each slave node */ @@ -62,6 +52,18 @@ public class BaseMasterSlaveServersConfigeach slave node + */ + private int subscriptionConnectionMinimumIdleSize = 1; + + /** + * Redis 'slave' node maximum subscription (pub/sub) connection pool size for each slave node + */ + private int subscriptionConnectionPoolSize = 50; public BaseMasterSlaveServersConfig() { } @@ -71,11 +73,12 @@ public class BaseMasterSlaveServersConfigeach slave node *

* Default is 50 *

- * @see #setSlaveSubscriptionConnectionMinimumIdleSize(int) + * @see #setSubscriptionConnectionMinimumIdleSize(int) * * @param slaveSubscriptionConnectionPoolSize - pool size * @return config */ - public T setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) { - this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize; + public T setSubscriptionConnectionPoolSize(int subscriptionConnectionPoolSize) { + this.subscriptionConnectionPoolSize = subscriptionConnectionPoolSize; return (T)this; } - public int getSlaveSubscriptionConnectionPoolSize() { - return slaveSubscriptionConnectionPoolSize; + public int getSubscriptionConnectionPoolSize() { + return subscriptionConnectionPoolSize; } + /** * Redis 'slave' node minimum idle connection amount for each slave node *

@@ -188,24 +207,40 @@ public class BaseMasterSlaveServersConfigeach slave node. *

* Default is 1 *

- * @see #setSlaveSubscriptionConnectionPoolSize(int) + * @see #setSubscriptionConnectionPoolSize(int) * * @param slaveSubscriptionConnectionMinimumIdleSize - pool size * @return config */ - public T setSlaveSubscriptionConnectionMinimumIdleSize(int slaveSubscriptionConnectionMinimumIdleSize) { - this.slaveSubscriptionConnectionMinimumIdleSize = slaveSubscriptionConnectionMinimumIdleSize; + public T setSubscriptionConnectionMinimumIdleSize(int subscriptionConnectionMinimumIdleSize) { + this.subscriptionConnectionMinimumIdleSize = subscriptionConnectionMinimumIdleSize; return (T) this; } - public int getSlaveSubscriptionConnectionMinimumIdleSize() { - return slaveSubscriptionConnectionMinimumIdleSize; + public int getSubscriptionConnectionMinimumIdleSize() { + return subscriptionConnectionMinimumIdleSize; } + /** * Set node type used for read operation. *

@@ -222,4 +257,21 @@ public class BaseMasterSlaveServersConfig + * Default is SLAVE + * + * @param subscriptionMode param + * @return config + */ + public T setSubscriptionMode(SubscriptionMode subscriptionMode) { + this.subscriptionMode = subscriptionMode; + return (T) this; + } + public SubscriptionMode getSubscriptionMode() { + return subscriptionMode; + } + + } diff --git a/redisson/src/main/java/org/redisson/config/ConfigSupport.java b/redisson/src/main/java/org/redisson/config/ConfigSupport.java index 8d08feb1d..6535cf813 100644 --- a/redisson/src/main/java/org/redisson/config/ConfigSupport.java +++ b/redisson/src/main/java/org/redisson/config/ConfigSupport.java @@ -266,7 +266,7 @@ public class ConfigSupport { if (config.getMasterConnectionPoolSize() < config.getMasterConnectionMinimumIdleSize()) { throw new IllegalArgumentException("masterConnectionPoolSize can't be lower than masterConnectionMinimumIdleSize"); } - if (config.getSlaveSubscriptionConnectionPoolSize() < config.getSlaveSubscriptionConnectionMinimumIdleSize()) { + if (config.getSubscriptionConnectionPoolSize() < config.getSubscriptionConnectionMinimumIdleSize()) { throw new IllegalArgumentException("slaveSubscriptionConnectionMinimumIdleSize can't be lower than slaveSubscriptionConnectionPoolSize"); } } diff --git a/redisson/src/main/java/org/redisson/config/ReadMode.java b/redisson/src/main/java/org/redisson/config/ReadMode.java index 51cfe3421..320c7dc82 100644 --- a/redisson/src/main/java/org/redisson/config/ReadMode.java +++ b/redisson/src/main/java/org/redisson/config/ReadMode.java @@ -15,6 +15,11 @@ */ package org.redisson.config; +/** + * + * @author Nikita Koksharov + * + */ public enum ReadMode { /** diff --git a/redisson/src/main/java/org/redisson/config/SubscriptionMode.java b/redisson/src/main/java/org/redisson/config/SubscriptionMode.java new file mode 100644 index 000000000..ec1f0495c --- /dev/null +++ b/redisson/src/main/java/org/redisson/config/SubscriptionMode.java @@ -0,0 +1,35 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.config; + +/** + * + * @author Nikita Koksharov + * + */ +public enum SubscriptionMode { + + /** + * Subscribe to slave nodes + */ + SLAVE, + + /** + * Subscribe to master node + */ + MASTER + +} diff --git a/redisson/src/main/java/org/redisson/connection/CountListener.java b/redisson/src/main/java/org/redisson/connection/CountListener.java new file mode 100644 index 000000000..ec449beb3 --- /dev/null +++ b/redisson/src/main/java/org/redisson/connection/CountListener.java @@ -0,0 +1,64 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.connection; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.redisson.api.RFuture; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +/** + * + * @author Nikita Koksharov + * + */ +public class CountListener implements FutureListener { + + private final RPromise res; + + private final AtomicInteger counter; + + public static RPromise create(RFuture... futures) { + RPromise result = new RedissonPromise(); + FutureListener listener = new CountListener(result, futures.length); + for (RFuture future : futures) { + future.addListener(listener); + } + return result; + } + + public CountListener(RPromise res, int amount) { + super(); + this.res = res; + this.counter = new AtomicInteger(amount); + } + + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + res.tryFailure(future.cause()); + return; + } + if (counter.decrementAndGet() == 0) { + res.trySuccess(null); + } + } + +} diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 1069f2885..de0392b13 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -320,7 +320,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { c.setClientName(cfg.getClientName()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); - c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); + c.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setConnectTimeout(cfg.getConnectTimeout()); c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); @@ -329,8 +329,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { c.setReconnectionTimeout(cfg.getReconnectionTimeout()); c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize()); c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize()); - c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize()); + c.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); c.setReadMode(cfg.getReadMode()); + c.setSubscriptionMode(cfg.getSubscriptionMode()); return c; } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index e8d940300..2fab36ea6 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -37,9 +37,11 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; +import org.redisson.config.SubscriptionMode; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.pool.MasterConnectionPool; +import org.redisson.connection.pool.MasterPubSubConnectionPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +67,8 @@ public class MasterSlaveEntry { final MasterConnectionPool writeConnectionHolder; final Set slots = new HashSet(); + + final MasterPubSubConnectionPool pubSubConnectionHolder; final AtomicBoolean active = new AtomicBoolean(true); @@ -79,6 +83,7 @@ public class MasterSlaveEntry { slaveBalancer = new LoadBalancerManager(config, connectionManager, this); writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); + pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this); } public List> initSlaveBalancer(Collection disconnectedNodes) { @@ -98,8 +103,20 @@ public class MasterSlaveEntry { public RFuture setupMasterEntry(String host, int port) { RedisClient client = connectionManager.createClient(NodeType.MASTER, host, port); - masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(), - 0, 0, connectionManager, NodeType.MASTER); + masterEntry = new ClientConnectionsEntry( + client, + config.getMasterConnectionMinimumIdleSize(), + config.getMasterConnectionPoolSize(), + config.getSubscriptionConnectionMinimumIdleSize(), + config.getSubscriptionConnectionPoolSize(), + connectionManager, + NodeType.MASTER); + + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + RFuture f = writeConnectionHolder.add(masterEntry); + RFuture s = pubSubConnectionHolder.add(masterEntry); + return CountListener.create(s, f); + } return writeConnectionHolder.add(masterEntry); } @@ -307,8 +324,8 @@ public class MasterSlaveEntry { ClientConnectionsEntry entry = new ClientConnectionsEntry(client, this.config.getSlaveConnectionMinimumIdleSize(), this.config.getSlaveConnectionPoolSize(), - this.config.getSlaveSubscriptionConnectionMinimumIdleSize(), - this.config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, mode); + this.config.getSubscriptionConnectionMinimumIdleSize(), + this.config.getSubscriptionConnectionPoolSize(), connectionManager, mode); if (freezed) { synchronized (entry) { entry.setFreezed(freezed); @@ -352,6 +369,7 @@ public class MasterSlaveEntry { @Override public void operationComplete(Future future) throws Exception { writeConnectionHolder.remove(oldMaster); + pubSubConnectionHolder.remove(oldMaster); slaveDown(oldMaster, FreezeReason.MANAGER); // more than one slave available, so master can be removed from slaves @@ -406,10 +424,18 @@ public class MasterSlaveEntry { } RFuture nextPubSubConnection() { + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + return pubSubConnectionHolder.get(); + } + return slaveBalancer.nextPubSubConnection(); } public void returnPubSubConnection(PubSubConnectionEntry entry) { + if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { + pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection()); + return; + } slaveBalancer.returnPubSubConnection(entry.getConnection()); } diff --git a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java index 8cb088bf3..b965b54b4 100644 --- a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -25,6 +25,7 @@ import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; import org.redisson.config.SingleServerConfig; +import org.redisson.config.SubscriptionMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,15 +72,16 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { newconfig.setMasterAddress(addr); newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize()); newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); - newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); + newconfig.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); newconfig.setConnectTimeout(cfg.getConnectTimeout()); newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); newconfig.setFailedAttempts(cfg.getFailedAttempts()); newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout()); newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize()); - newconfig.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); + newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); newconfig.setReadMode(ReadMode.MASTER); + newconfig.setSubscriptionMode(SubscriptionMode.MASTER); return newconfig; } diff --git a/redisson/src/main/java/org/redisson/connection/SingleEntry.java b/redisson/src/main/java/org/redisson/connection/SingleEntry.java index c5b0c7a5a..5fffe82f6 100644 --- a/redisson/src/main/java/org/redisson/connection/SingleEntry.java +++ b/redisson/src/main/java/org/redisson/connection/SingleEntry.java @@ -17,69 +17,22 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import org.redisson.api.NodeType; import org.redisson.api.RFuture; -import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; -import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.RedisCommand; import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.MasterSlaveServersConfig; -import org.redisson.connection.pool.PubSubConnectionPool; -import org.redisson.connection.pool.SinglePubSubConnectionPool; -import org.redisson.misc.RPromise; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; +/** + * + * @author Nikita Koksharov + * + */ public class SingleEntry extends MasterSlaveEntry { - final PubSubConnectionPool pubSubConnectionHolder; - public SingleEntry(Set slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { super(slotRanges, connectionManager, config); - pubSubConnectionHolder = new SinglePubSubConnectionPool(config, connectionManager, this); - } - - @Override - public RFuture setupMasterEntry(String host, int port) { - RedisClient masterClient = connectionManager.createClient(NodeType.MASTER, host, port); - masterEntry = new ClientConnectionsEntry(masterClient, - config.getMasterConnectionMinimumIdleSize(), - config.getMasterConnectionPoolSize(), - config.getSlaveConnectionMinimumIdleSize(), - config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, NodeType.MASTER); - final RPromise res = connectionManager.newPromise(); - RFuture f = writeConnectionHolder.add(masterEntry); - RFuture s = pubSubConnectionHolder.add(masterEntry); - FutureListener listener = new FutureListener() { - AtomicInteger counter = new AtomicInteger(2); - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - res.tryFailure(future.cause()); - return; - } - if (counter.decrementAndGet() == 0) { - res.trySuccess(null); - } - } - }; - f.addListener(listener); - s.addListener(listener); - return res; - } - - @Override - RFuture nextPubSubConnection() { - return pubSubConnectionHolder.get(); - } - - @Override - public void returnPubSubConnection(PubSubConnectionEntry entry) { - pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection()); } @Override diff --git a/redisson/src/main/java/org/redisson/connection/pool/SinglePubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/MasterPubSubConnectionPool.java similarity index 83% rename from redisson/src/main/java/org/redisson/connection/pool/SinglePubSubConnectionPool.java rename to redisson/src/main/java/org/redisson/connection/pool/MasterPubSubConnectionPool.java index 08741a488..00cbac51d 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/SinglePubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/MasterPubSubConnectionPool.java @@ -26,15 +26,20 @@ import org.redisson.connection.MasterSlaveEntry; * @author Nikita Koksharov * */ -public class SinglePubSubConnectionPool extends PubSubConnectionPool { +public class MasterPubSubConnectionPool extends PubSubConnectionPool { - public SinglePubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, + public MasterPubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { super(config, connectionManager, masterSlaveEntry); } + @Override protected ClientConnectionsEntry getEntry() { return entries.get(0); } + public void remove(ClientConnectionsEntry entry) { + entries.remove(entry); + } + } diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index 859623a9b..7051d55a6 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -47,7 +47,7 @@ public class PubSubConnectionPool extends ConnectionPool @Override protected int getMinimumIdleSize(ClientConnectionsEntry entry) { - return config.getSlaveSubscriptionConnectionMinimumIdleSize(); + return config.getSubscriptionConnectionMinimumIdleSize(); } @Override