subscriptionMode setting added

pull/787/head
Nikita 8 years ago
parent 94a4dc9183
commit da960060b0

@ -38,6 +38,11 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisConnection implements RedisCommands {
private static final AttributeKey<RedisConnection> CONNECTION = AttributeKey.valueOf("connection");

@ -40,6 +40,11 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisPubSubConnection extends RedisConnection {
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();

@ -31,16 +31,6 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
*/
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
/**
* Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node
*/
private int slaveSubscriptionConnectionMinimumIdleSize = 1;
/**
* Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node
*/
private int slaveSubscriptionConnectionPoolSize = 50;
/**
* Redis 'slave' node minimum idle connection amount for <b>each</b> slave node
*/
@ -62,6 +52,18 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
private int masterConnectionPoolSize = 64;
private ReadMode readMode = ReadMode.SLAVE;
private SubscriptionMode subscriptionMode = SubscriptionMode.SLAVE;
/**
* Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node
*/
private int subscriptionConnectionMinimumIdleSize = 1;
/**
* Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node
*/
private int subscriptionConnectionPoolSize = 50;
public BaseMasterSlaveServersConfig() {
}
@ -71,11 +73,12 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
setLoadBalancer(config.getLoadBalancer());
setMasterConnectionPoolSize(config.getMasterConnectionPoolSize());
setSlaveConnectionPoolSize(config.getSlaveConnectionPoolSize());
setSlaveSubscriptionConnectionPoolSize(config.getSlaveSubscriptionConnectionPoolSize());
setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize());
setMasterConnectionMinimumIdleSize(config.getMasterConnectionMinimumIdleSize());
setSlaveConnectionMinimumIdleSize(config.getSlaveConnectionMinimumIdleSize());
setSlaveSubscriptionConnectionMinimumIdleSize(config.getSlaveSubscriptionConnectionMinimumIdleSize());
setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize());
setReadMode(config.getReadMode());
setSubscriptionMode(config.getSubscriptionMode());
}
/**
@ -134,24 +137,40 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return loadBalancer;
}
/**
* @deprecated use {@link #setSubscriptionConnectionPoolSize(int)}
*
* @param slaveSubscriptionConnectionPoolSize - pool size
* @return config
*/
@Deprecated
public T setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) {
return setSubscriptionConnectionPoolSize(slaveSubscriptionConnectionPoolSize);
}
@Deprecated
public int getSlaveSubscriptionConnectionPoolSize() {
return getSubscriptionConnectionPoolSize();
}
/**
* Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node
* <p>
* Default is <code>50</code>
* <p>
* @see #setSlaveSubscriptionConnectionMinimumIdleSize(int)
* @see #setSubscriptionConnectionMinimumIdleSize(int)
*
* @param slaveSubscriptionConnectionPoolSize - pool size
* @return config
*/
public T setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) {
this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize;
public T setSubscriptionConnectionPoolSize(int subscriptionConnectionPoolSize) {
this.subscriptionConnectionPoolSize = subscriptionConnectionPoolSize;
return (T)this;
}
public int getSlaveSubscriptionConnectionPoolSize() {
return slaveSubscriptionConnectionPoolSize;
public int getSubscriptionConnectionPoolSize() {
return subscriptionConnectionPoolSize;
}
/**
* Redis 'slave' node minimum idle connection amount for <b>each</b> slave node
* <p>
@ -188,24 +207,40 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return masterConnectionMinimumIdleSize;
}
/**
* @deprecated use {@link #setSubscriptionConnectionMinimumIdleSize(int)}
*
* @param slaveSubscriptionConnectionMinimumIdleSize - pool size
* @return config
*/
@Deprecated
public T setSlaveSubscriptionConnectionMinimumIdleSize(int slaveSubscriptionConnectionMinimumIdleSize) {
return setSubscriptionConnectionMinimumIdleSize(slaveSubscriptionConnectionMinimumIdleSize);
}
@Deprecated
public int getSlaveSubscriptionConnectionMinimumIdleSize() {
return getSubscriptionConnectionMinimumIdleSize();
}
/**
* Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node.
* <p>
* Default is <code>1</code>
* <p>
* @see #setSlaveSubscriptionConnectionPoolSize(int)
* @see #setSubscriptionConnectionPoolSize(int)
*
* @param slaveSubscriptionConnectionMinimumIdleSize - pool size
* @return config
*/
public T setSlaveSubscriptionConnectionMinimumIdleSize(int slaveSubscriptionConnectionMinimumIdleSize) {
this.slaveSubscriptionConnectionMinimumIdleSize = slaveSubscriptionConnectionMinimumIdleSize;
public T setSubscriptionConnectionMinimumIdleSize(int subscriptionConnectionMinimumIdleSize) {
this.subscriptionConnectionMinimumIdleSize = subscriptionConnectionMinimumIdleSize;
return (T) this;
}
public int getSlaveSubscriptionConnectionMinimumIdleSize() {
return slaveSubscriptionConnectionMinimumIdleSize;
public int getSubscriptionConnectionMinimumIdleSize() {
return subscriptionConnectionMinimumIdleSize;
}
/**
* Set node type used for read operation.
* <p>
@ -222,4 +257,21 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return readMode;
}
/**
* Set node type used for subscription operation.
* <p>
* Default is <code>SLAVE</code>
*
* @param subscriptionMode param
* @return config
*/
public T setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
return (T) this;
}
public SubscriptionMode getSubscriptionMode() {
return subscriptionMode;
}
}

@ -266,7 +266,7 @@ public class ConfigSupport {
if (config.getMasterConnectionPoolSize() < config.getMasterConnectionMinimumIdleSize()) {
throw new IllegalArgumentException("masterConnectionPoolSize can't be lower than masterConnectionMinimumIdleSize");
}
if (config.getSlaveSubscriptionConnectionPoolSize() < config.getSlaveSubscriptionConnectionMinimumIdleSize()) {
if (config.getSubscriptionConnectionPoolSize() < config.getSubscriptionConnectionMinimumIdleSize()) {
throw new IllegalArgumentException("slaveSubscriptionConnectionMinimumIdleSize can't be lower than slaveSubscriptionConnectionPoolSize");
}
}

@ -15,6 +15,11 @@
*/
package org.redisson.config;
/**
*
* @author Nikita Koksharov
*
*/
public enum ReadMode {
/**

@ -0,0 +1,35 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.config;
/**
*
* @author Nikita Koksharov
*
*/
public enum SubscriptionMode {
/**
* Subscribe to slave nodes
*/
SLAVE,
/**
* Subscribe to master node
*/
MASTER
}

@ -0,0 +1,64 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class CountListener implements FutureListener<Void> {
private final RPromise<Void> res;
private final AtomicInteger counter;
public static RPromise<Void> create(RFuture<Void>... futures) {
RPromise<Void> result = new RedissonPromise<Void>();
FutureListener<Void> listener = new CountListener(result, futures.length);
for (RFuture<Void> future : futures) {
future.addListener(listener);
}
return result;
}
public CountListener(RPromise<Void> res, int amount) {
super();
this.res = res;
this.counter = new AtomicInteger(amount);
}
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
res.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
res.trySuccess(null);
}
}
}

@ -320,7 +320,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
c.setClientName(cfg.getClientName());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
c.setConnectTimeout(cfg.getConnectTimeout());
c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
@ -329,8 +329,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
c.setReconnectionTimeout(cfg.getReconnectionTimeout());
c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize());
c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize());
c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize());
c.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
c.setReadMode(cfg.getReadMode());
c.setSubscriptionMode(cfg.getSubscriptionMode());
return c;
}

@ -37,9 +37,11 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -65,6 +67,8 @@ public class MasterSlaveEntry {
final MasterConnectionPool writeConnectionHolder;
final Set<Integer> slots = new HashSet<Integer>();
final MasterPubSubConnectionPool pubSubConnectionHolder;
final AtomicBoolean active = new AtomicBoolean(true);
@ -79,6 +83,7 @@ public class MasterSlaveEntry {
slaveBalancer = new LoadBalancerManager(config, connectionManager, this);
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this);
}
public List<RFuture<Void>> initSlaveBalancer(Collection<URL> disconnectedNodes) {
@ -98,8 +103,20 @@ public class MasterSlaveEntry {
public RFuture<Void> setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, host, port);
masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(),
0, 0, connectionManager, NodeType.MASTER);
masterEntry = new ClientConnectionsEntry(
client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> f = writeConnectionHolder.add(masterEntry);
RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);
return CountListener.create(s, f);
}
return writeConnectionHolder.add(masterEntry);
}
@ -307,8 +324,8 @@ public class MasterSlaveEntry {
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionMinimumIdleSize(),
this.config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, mode);
this.config.getSubscriptionConnectionMinimumIdleSize(),
this.config.getSubscriptionConnectionPoolSize(), connectionManager, mode);
if (freezed) {
synchronized (entry) {
entry.setFreezed(freezed);
@ -352,6 +369,7 @@ public class MasterSlaveEntry {
@Override
public void operationComplete(Future<Void> future) throws Exception {
writeConnectionHolder.remove(oldMaster);
pubSubConnectionHolder.remove(oldMaster);
slaveDown(oldMaster, FreezeReason.MANAGER);
// more than one slave available, so master can be removed from slaves
@ -406,10 +424,18 @@ public class MasterSlaveEntry {
}
RFuture<RedisPubSubConnection> nextPubSubConnection() {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
return pubSubConnectionHolder.get();
}
return slaveBalancer.nextPubSubConnection();
}
public void returnPubSubConnection(PubSubConnectionEntry entry) {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection());
return;
}
slaveBalancer.returnPubSubConnection(entry.getConnection());
}

@ -25,6 +25,7 @@ import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.SingleServerConfig;
import org.redisson.config.SubscriptionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -71,15 +72,16 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setMasterAddress(addr);
newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize());
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
newconfig.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
newconfig.setConnectTimeout(cfg.getConnectTimeout());
newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
newconfig.setFailedAttempts(cfg.getFailedAttempts());
newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout());
newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize());
newconfig.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
newconfig.setReadMode(ReadMode.MASTER);
newconfig.setSubscriptionMode(SubscriptionMode.MASTER);
return newconfig;
}

@ -17,69 +17,22 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SinglePubSubConnectionPool;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class SingleEntry extends MasterSlaveEntry {
final PubSubConnectionPool pubSubConnectionHolder;
public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(slotRanges, connectionManager, config);
pubSubConnectionHolder = new SinglePubSubConnectionPool(config, connectionManager, this);
}
@Override
public RFuture<Void> setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(NodeType.MASTER, host, port);
masterEntry = new ClientConnectionsEntry(masterClient,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSlaveConnectionMinimumIdleSize(),
config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, NodeType.MASTER);
final RPromise<Void> res = connectionManager.newPromise();
RFuture<Void> f = writeConnectionHolder.add(masterEntry);
RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);
FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2);
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
res.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
res.trySuccess(null);
}
}
};
f.addListener(listener);
s.addListener(listener);
return res;
}
@Override
RFuture<RedisPubSubConnection> nextPubSubConnection() {
return pubSubConnectionHolder.get();
}
@Override
public void returnPubSubConnection(PubSubConnectionEntry entry) {
pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection());
}
@Override

@ -26,15 +26,20 @@ import org.redisson.connection.MasterSlaveEntry;
* @author Nikita Koksharov
*
*/
public class SinglePubSubConnectionPool extends PubSubConnectionPool {
public class MasterPubSubConnectionPool extends PubSubConnectionPool {
public SinglePubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager,
public MasterPubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager,
MasterSlaveEntry masterSlaveEntry) {
super(config, connectionManager, masterSlaveEntry);
}
@Override
protected ClientConnectionsEntry getEntry() {
return entries.get(0);
}
public void remove(ClientConnectionsEntry entry) {
entries.remove(entry);
}
}

@ -47,7 +47,7 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
@Override
protected int getMinimumIdleSize(ClientConnectionsEntry entry) {
return config.getSlaveSubscriptionConnectionMinimumIdleSize();
return config.getSubscriptionConnectionMinimumIdleSize();
}
@Override

Loading…
Cancel
Save