PubSubConnection handling via SingleConnectionManager fixed.

pull/38/head
Nikita 11 years ago
parent d0fda34eb9
commit 4ec975c29e

@ -26,7 +26,7 @@ import org.redisson.connection.RoundRobinLoadBalancer;
public class MasterSlaveConnectionConfig extends BaseConfig<MasterSlaveConnectionConfig> {
/**
* Сonnection load balancer to use multiple Redis servers
* Сonnection load balancer for multiple slave Redis servers
*/
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
@ -37,12 +37,12 @@ public class MasterSlaveConnectionConfig extends BaseConfig<MasterSlaveConnectio
/**
* Redis 'slave' servers subscription (pub/sub) connection pool size for <b>each</b> slave node
*/
private int slaveSubscriptionConnectionPoolSize = 50;
private int slaveSubscriptionConnectionPoolSize = 25;
/**
* Redis 'slave' servers connection pool size for <b>each</b> slave node
*/
private int slaveConnectionPoolSize = 50;
private int slaveConnectionPoolSize = 100;
/**
* Redis 'master' server connection pool size limit
@ -140,11 +140,12 @@ public class MasterSlaveConnectionConfig extends BaseConfig<MasterSlaveConnectio
return loadBalancer;
}
public MasterSlaveConnectionConfig setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) {
this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize;
return this;
}
public int getSlaveSubscriptionConnectionPoolSize() {
return slaveSubscriptionConnectionPoolSize;
}
public void setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) {
this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize;
}
}

@ -110,6 +110,7 @@ public class RedissonLock extends RedissonObject implements RLock {
private final AtomicReference<Promise<Boolean>> promise = new AtomicReference<Promise<Boolean>>();
// TODO use lazy init map
private final Semaphore msg = new Semaphore(1);
private PubSubConnectionEntry pubSubEntry;

@ -26,6 +26,8 @@ public class SingleConnectionConfig extends BaseConfig<SingleConnectionConfig> {
*/
private URI address;
private int subscriptionConnectionPoolSize = 25;
/**
* Redis connection pool size limit
*/
@ -38,6 +40,7 @@ public class SingleConnectionConfig extends BaseConfig<SingleConnectionConfig> {
super(config);
setAddress(config.getAddress());
setConnectionPoolSize(config.getConnectionPoolSize());
setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize());
}
/**
@ -54,6 +57,21 @@ public class SingleConnectionConfig extends BaseConfig<SingleConnectionConfig> {
return connectionPoolSize;
}
/**
* Redis subscription-connection pool size limit
* Default is 25
*
* @param connectionPoolSize
* @return
*/
public SingleConnectionConfig setSubscriptionConnectionPoolSize(int subscriptionConnectionPoolSize) {
this.subscriptionConnectionPoolSize = subscriptionConnectionPoolSize;
return this;
}
public int getSubscriptionConnectionPoolSize() {
return subscriptionConnectionPoolSize;
}
/**
* Set server address. Use follow format -- host:port
*
@ -70,7 +88,7 @@ public class SingleConnectionConfig extends BaseConfig<SingleConnectionConfig> {
public URI getAddress() {
return address;
}
public void setAddress(URI address) {
void setAddress(URI address) {
this.address = address;
}

@ -49,7 +49,7 @@ import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class MasterSlaveConnectionManager implements ConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private RedisCodec codec;
protected RedisCodec codec;
private EventLoopGroup group;
@ -59,13 +59,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final Queue<PubSubConnectionEntry> pubSubConnections = new ConcurrentLinkedQueue<PubSubConnectionEntry>();
private final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
private LoadBalancer balancer;
private final List<RedisClient> slaveClients = new ArrayList<RedisClient>();
private RedisClient masterClient;
protected RedisClient masterClient;
private Semaphore masterConnectionsSemaphore;
private MasterSlaveConnectionConfig config;
private LoadBalancer balancer;
protected MasterSlaveConnectionConfig config;
MasterSlaveConnectionManager() {
}
@ -87,8 +87,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
masterClient = new RedisClient(group, this.config.getMasterAddress().getHost(), this.config.getMasterAddress().getPort());
codec = new RedisCodecWrapper(cfg.getCodec());
balancer = config.getLoadBalancer();
balancer.init(slaveConnections, codec, config.getPassword());
if (!slaveConnections.isEmpty()) {
balancer = config.getLoadBalancer();
balancer.init(slaveConnections, codec, config.getPassword());
}
masterConnectionsSemaphore = new Semaphore(this.config.getMasterConnectionPoolSize());
}
@ -150,12 +152,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
acquireMasterConnection();
RedisPubSubConnection<K, V> conn = balancer.nextPubSubConnection();
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
RedisPubSubConnection<K, V> conn = nextPubSubConnection();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
@ -168,6 +165,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return entry;
}
RedisPubSubConnection nextPubSubConnection() {
return balancer.nextPubSubConnection();
}
public <K, V> PubSubConnectionEntry subscribe(RedisPubSubAdapter<K, V> listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
@ -186,12 +187,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
acquireMasterConnection();
RedisPubSubConnection<K, V> conn = balancer.nextPubSubConnection();
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
RedisPubSubConnection<K, V> conn = nextPubSubConnection();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
@ -227,10 +223,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
log.debug("unsubscribed from '{}' channel", channelName);
if (entry.tryClose()) {
pubSubConnections.remove(entry);
balancer.returnSubscribeConnection(entry.getConnection());
returnSubscribeConnection(entry);
}
}
protected void returnSubscribeConnection(PubSubConnectionEntry entry) {
balancer.returnSubscribeConnection(entry.getConnection());
}
public void releaseWrite(RedisConnection сonnection) {
masterConnections.add(сonnection);
releaseMasterConnection();

@ -15,6 +15,8 @@
*/
package org.redisson.connection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.redisson.Config;
@ -23,42 +25,62 @@ import org.redisson.SingleConnectionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
public class SingleConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private Semaphore connections;
private final Semaphore subscribeConnectionsSemaphore;
private final Queue<RedisPubSubConnection> subscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
public SingleConnectionManager(SingleConnectionConfig cfg, Config config) {
MasterSlaveConnectionConfig newconfig = new MasterSlaveConnectionConfig();
String addr = cfg.getAddress().getHost() + ":" + cfg.getAddress().getPort();
newconfig.setMasterAddress(addr);
newconfig.addSlaveAddress(addr);
init(newconfig, config);
newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize());
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
connections = new Semaphore(cfg.getConnectionPoolSize());
subscribeConnectionsSemaphore = new Semaphore(cfg.getSubscriptionConnectionPoolSize());
init(newconfig, config);
}
void acquireMasterConnection() {
if (!connections.tryAcquire()) {
log.warn("Master connection pool gets exhausted! Trying to acquire connection ...");
private void acquireSubscribeConnection() {
if (!subscribeConnectionsSemaphore.tryAcquire()) {
log.warn("Subscribe connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
connections.acquireUninterruptibly();
subscribeConnectionsSemaphore.acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Connection acquired, time spended: {} ms", endTime);
log.warn("Subscribe connection acquired, time spended: {} ms", endTime);
}
}
@Override
RedisPubSubConnection nextPubSubConnection() {
acquireSubscribeConnection();
RedisPubSubConnection conn = masterClient.connectPubSub(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
return conn;
}
void releaseMasterConnection() {
connections.release();
@Override
protected void returnSubscribeConnection(PubSubConnectionEntry entry) {
subscribeConnections.add(entry.getConnection());
subscribeConnectionsSemaphore.release();
}
void acquireSlaveConnection() {
acquireMasterConnection();
@Override
public <K, V> RedisConnection<K, V> connectionReadOp() {
return super.connectionWriteOp();
}
void releaseSlaveConnection() {
releaseMasterConnection();
@Override
public void releaseRead(RedisConnection сonnection) {
super.releaseWrite(сonnection);
}
}

Loading…
Cancel
Save