idleConnectionTimeout param introduced. #234

pull/297/head
Nikita 9 years ago
parent 556b0b2a79
commit 7212849b72

@ -18,6 +18,15 @@ package org.redisson;
class BaseConfig<T extends BaseConfig<T>> { class BaseConfig<T extends BaseConfig<T>> {
/**
* If pooled connection not used for a <code>timeout</code> time
* and current connections amount bigger than minimum idle connections pool size,
* then it will closed and removed from pool.
* Value in milliseconds.
*
*/
private int idleConnectionTimeout = 10000;
/** /**
* Ping timeout used in <code>Node.ping</code> and <code>Node.pingAll<code> operation. * Ping timeout used in <code>Node.ping</code> and <code>Node.pingAll<code> operation.
* Value in milliseconds. * Value in milliseconds.
@ -26,7 +35,7 @@ class BaseConfig<T extends BaseConfig<T>> {
private int pingTimeout = 1000; private int pingTimeout = 1000;
/** /**
* This timeout used during connection establishment to any Redis server. * Timeout during connecting to any Redis server.
* Value in milliseconds. * Value in milliseconds.
* *
*/ */
@ -36,6 +45,8 @@ class BaseConfig<T extends BaseConfig<T>> {
* Redis operation execution timeout. * Redis operation execution timeout.
* Then amount is reached exception will be thrown in case of <b>sync</b> operation usage * Then amount is reached exception will be thrown in case of <b>sync</b> operation usage
* or <code>Future</code> callback fails in case of <b>async</b> operation. * or <code>Future</code> callback fails in case of <b>async</b> operation.
* Value in milliseconds.
*
*/ */
private int timeout = 1000; private int timeout = 1000;
@ -80,6 +91,7 @@ class BaseConfig<T extends BaseConfig<T>> {
setPingTimeout(config.getPingTimeout()); setPingTimeout(config.getPingTimeout());
setRefreshConnectionAfterFails(config.getRefreshConnectionAfterFails()); setRefreshConnectionAfterFails(config.getRefreshConnectionAfterFails());
setConnectTimeout(config.getConnectTimeout()); setConnectTimeout(config.getConnectTimeout());
setIdleConnectionTimeout(config.getIdleConnectionTimeout());
} }
/** /**
@ -216,7 +228,7 @@ class BaseConfig<T extends BaseConfig<T>> {
} }
/** /**
* This timeout used during connection establishment to any Redis server. * Timeout during connecting to any Redis server.
* *
* @param connectTimeout - timeout in milliseconds * @param connectTimeout - timeout in milliseconds
* @return * @return
@ -229,4 +241,21 @@ class BaseConfig<T extends BaseConfig<T>> {
return connectTimeout; return connectTimeout;
} }
/**
* If pooled connection not used for a <code>timeout</code> time
* and current connections amount bigger than minimum idle connections pool size,
* then it will closed and removed from pool.
*
* @param idleConnectionTimeout - timeout in milliseconds
* @return
*/
public T setIdleConnectionTimeout(int idleConnectionTimeout) {
this.idleConnectionTimeout = idleConnectionTimeout;
return (T) this;
}
public int getIdleConnectionTimeout() {
return idleConnectionTimeout;
}
} }

@ -15,8 +15,8 @@
*/ */
package org.redisson; package org.redisson;
import org.redisson.connection.LoadBalancer; import org.redisson.connection.balancer.LoadBalancer;
import org.redisson.connection.RoundRobinLoadBalancer; import org.redisson.connection.balancer.RoundRobinLoadBalancer;
public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig<T>> extends BaseConfig<T> { public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig<T>> extends BaseConfig<T> {
@ -124,7 +124,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
* @param loadBalancer * @param loadBalancer
* @return * @return
* *
* @see org.redisson.connection.RoundRobinLoadBalancer * @see org.redisson.connection.balancer.RoundRobinLoadBalancer
* @see org.redisson.connection.BaseLoadBalancer * @see org.redisson.connection.BaseLoadBalancer
*/ */
public T setLoadBalancer(LoadBalancer loadBalancer) { public T setLoadBalancer(LoadBalancer loadBalancer) {

@ -40,7 +40,10 @@ public class RedisConnection implements RedisCommands {
private volatile boolean closed; private volatile boolean closed;
volatile Channel channel; volatile Channel channel;
private ReconnectListener reconnectListener; private ReconnectListener reconnectListener;
private long lastUsageTime;
@Deprecated
private int failAttempts; private int failAttempts;
@ -49,12 +52,21 @@ public class RedisConnection implements RedisCommands {
this.redisClient = redisClient; this.redisClient = redisClient;
updateChannel(channel); updateChannel(channel);
lastUsageTime = System.currentTimeMillis();
} }
public static <C extends RedisConnection> C getFrom(Channel channel) { public static <C extends RedisConnection> C getFrom(Channel channel) {
return (C) channel.attr(RedisConnection.CONNECTION).get(); return (C) channel.attr(RedisConnection.CONNECTION).get();
} }
public long getLastUsageTime() {
return lastUsageTime;
}
public void setLastUsageTime(long lastUsageTime) {
this.lastUsageTime = lastUsageTime;
}
public void setReconnectListener(ReconnectListener reconnectListener) { public void setReconnectListener(ReconnectListener reconnectListener) {
this.reconnectListener = reconnectListener; this.reconnectListener = reconnectListener;
} }
@ -180,7 +192,7 @@ public class RedisConnection implements RedisCommands {
/** /**
* Access to Netty channel. * Access to Netty channel.
* This method is only provided to use in debug info. * This method is provided to use in debug info only.
* *
*/ */
public Channel getChannel() { public Channel getChannel() {

@ -395,6 +395,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
c.setConnectTimeout(cfg.getConnectTimeout()); c.setConnectTimeout(cfg.getConnectTimeout());
c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts()); c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts());
c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout()); c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout());

@ -39,6 +39,9 @@ public class ClientConnectionsEntry {
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>(); private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger freeSubscribeConnectionsCounter = new AtomicInteger(); private final AtomicInteger freeSubscribeConnectionsCounter = new AtomicInteger();
private final Queue<RedisConnection> freeConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger freeConnectionsCounter = new AtomicInteger();
public enum FreezeReason {MANAGER, RECONNECT, SYSTEM} public enum FreezeReason {MANAGER, RECONNECT, SYSTEM}
private volatile boolean freezed; private volatile boolean freezed;
@ -50,17 +53,21 @@ public class ClientConnectionsEntry {
private final NodeType nodeType; private final NodeType nodeType;
private final ConnectionListener connectionListener; private final ConnectionListener connectionListener;
private final Queue<RedisConnection> freeConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger freeConnectionsCounter = new AtomicInteger();
private final AtomicInteger failedAttempts = new AtomicInteger(); private final AtomicInteger failedAttempts = new AtomicInteger();
public ClientConnectionsEntry(RedisClient client, int poolSize, int subscribePoolSize, ConnectionListener connectionListener, NodeType serverMode) { public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize,
ConnectionListener connectionListener, NodeType serverMode,
IdleConnectionWatcher watcher, MasterSlaveServersConfig config) {
this.client = client; this.client = client;
this.freeConnectionsCounter.set(poolSize); this.freeConnectionsCounter.set(poolMaxSize);
this.connectionListener = connectionListener; this.connectionListener = connectionListener;
this.nodeType = serverMode; this.nodeType = serverMode;
freeSubscribeConnectionsCounter.set(subscribePoolSize); this.freeSubscribeConnectionsCounter.set(subscribePoolMaxSize);
if (subscribePoolMaxSize > 0) {
watcher.add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter);
}
watcher.add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter);
} }
public NodeType getNodeType() { public NodeType getNodeType() {
@ -128,6 +135,7 @@ public class ClientConnectionsEntry {
} }
public void releaseConnection(RedisConnection connection) { public void releaseConnection(RedisConnection connection) {
connection.setLastUsageTime(System.currentTimeMillis());
freeConnections.add(connection); freeConnections.add(connection);
} }
@ -200,6 +208,7 @@ public class ClientConnectionsEntry {
} }
public void releaseSubscribeConnection(RedisPubSubConnection connection) { public void releaseSubscribeConnection(RedisPubSubConnection connection) {
connection.setLastUsageTime(System.currentTimeMillis());
freeSubscribeConnections.add(connection); freeSubscribeConnections.add(connection);
} }

@ -43,9 +43,10 @@ import io.netty.util.concurrent.Promise;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
//TODO ping support
public interface ConnectionManager { public interface ConnectionManager {
IdleConnectionWatcher getConnectionWatcher();
<R> Future<R> newFailedFuture(Throwable cause); <R> Future<R> newFailedFuture(Throwable cause);
void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason); void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason);

@ -160,6 +160,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
c.setConnectTimeout(cfg.getConnectTimeout()); c.setConnectTimeout(cfg.getConnectTimeout());
c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts()); c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts());
c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout()); c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout());

@ -0,0 +1,93 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
public class IdleConnectionWatcher {
private final Logger log = LoggerFactory.getLogger(getClass());
public static class Entry {
private final int minimumAmount;
private final int maximumAmount;
private final AtomicInteger freeConnectionsCounter;
private final Collection<? extends RedisConnection> connections;
public Entry(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AtomicInteger freeConnectionsCounter) {
super();
this.minimumAmount = minimumAmount;
this.maximumAmount = maximumAmount;
this.connections = connections;
this.freeConnectionsCounter = freeConnectionsCounter;
}
};
private final Queue<Entry> entries = new ConcurrentLinkedQueue<Entry>();
public IdleConnectionWatcher(final ConnectionManager manager, final MasterSlaveServersConfig config) {
manager.getGroup().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
for (Entry entry : entries) {
if (!validateAmount(entry)) {
continue;
}
for (final RedisConnection c : entry.connections) {
final long timeInPool = System.currentTimeMillis() - c.getLastUsageTime();
if (timeInPool > config.getIdleConnectionTimeout()
&& validateAmount(entry) && entry.connections.remove(c)) {
ChannelFuture future = c.closeAsync();
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
log.debug("Connection {} has been closed due to idle timeout. Not used for {} ms", c.getChannel(), timeInPool);
}
});
}
}
}
}
}, config.getIdleConnectionTimeout(), config.getIdleConnectionTimeout(), TimeUnit.MILLISECONDS);
}
private boolean validateAmount(Entry entry) {
return entry.maximumAmount - entry.freeConnectionsCounter.get() + entry.connections.size() >= entry.minimumAmount;
}
public void add(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AtomicInteger freeConnectionsCounter) {
entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter));
}
}

@ -119,9 +119,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final Set<RedisClientEntry> clients = Collections.newSetFromMap(PlatformDependent.<RedisClientEntry, Boolean>newConcurrentHashMap()); private final Set<RedisClientEntry> clients = Collections.newSetFromMap(PlatformDependent.<RedisClientEntry, Boolean>newConcurrentHashMap());
private IdleConnectionWatcher connectionWatcher;
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
init(config);
init(cfg);
}
protected MasterSlaveConnectionManager() { protected MasterSlaveConnectionManager() {
} }
public IdleConnectionWatcher getConnectionWatcher() {
return connectionWatcher;
}
@Override @Override
public MasterSlaveServersConfig getConfig() { public MasterSlaveServersConfig getConfig() {
return config; return config;
@ -137,15 +148,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return entries; return entries;
} }
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
init(cfg, config);
}
protected void init(MasterSlaveServersConfig config, Config cfg) {
init(cfg);
init(config);
}
protected void init(MasterSlaveServersConfig config) { protected void init(MasterSlaveServersConfig config) {
this.config = config; this.config = config;
@ -161,6 +163,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
timer = new HashedWheelTimer(minTimeout, TimeUnit.MILLISECONDS); timer = new HashedWheelTimer(minTimeout, TimeUnit.MILLISECONDS);
connectionWatcher = new IdleConnectionWatcher(this, config);
initEntry(config); initEntry(config);
} }

@ -28,6 +28,8 @@ import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType; import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl;
import org.redisson.misc.MasterConnectionPool; import org.redisson.misc.MasterConnectionPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -80,7 +82,8 @@ public class MasterSlaveEntry {
public void setupMasterEntry(String host, int port) { public void setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port); RedisClient client = connectionManager.createClient(host, port);
masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionPoolSize(), 0, connectListener, NodeType.MASTER); masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(),
0, 0, connectListener, NodeType.MASTER, connectionManager.getConnectionWatcher(), config);
writeConnectionHolder.add(masterEntry); writeConnectionHolder.add(masterEntry);
} }
@ -102,8 +105,10 @@ public class MasterSlaveEntry {
private void addSlave(String host, int port, boolean freezed, NodeType mode) { private void addSlave(String host, int port, boolean freezed, NodeType mode) {
RedisClient client = connectionManager.createClient(host, port); RedisClient client = connectionManager.createClient(host, port);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client, ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode); this.config.getSlaveSubscriptionConnectionMinimumIdleSize(),
this.config.getSlaveSubscriptionConnectionPoolSize(), connectListener, mode, connectionManager.getConnectionWatcher(), config);
if (freezed) { if (freezed) {
entry.setFreezed(freezed); entry.setFreezed(freezed);
entry.setFreezeReason(FreezeReason.SYSTEM); entry.setFreezeReason(FreezeReason.SYSTEM);
@ -136,11 +141,12 @@ public class MasterSlaveEntry {
ClientConnectionsEntry oldMaster = masterEntry; ClientConnectionsEntry oldMaster = masterEntry;
setupMasterEntry(host, port); setupMasterEntry(host, port);
writeConnectionHolder.remove(oldMaster); writeConnectionHolder.remove(oldMaster);
oldMaster.freezeMaster(FreezeReason.MANAGER);
if (slaveBalancer.getAvailableClients() > 1) { if (slaveBalancer.getAvailableClients() > 1) {
// more than one slave available, so master could be removed from slaves // more than one slave available, so master could be removed from slaves
connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM); connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM);
} }
oldMaster.freezeMaster(FreezeReason.MANAGER);
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
} }

@ -68,6 +68,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
c.setConnectTimeout(cfg.getConnectTimeout()); c.setConnectTimeout(cfg.getConnectTimeout());
c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts()); c.setSlaveFailedAttempts(cfg.getSlaveFailedAttempts());
c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout()); c.setSlaveReconnectionTimeout(cfg.getSlaveReconnectionTimeout());

@ -41,6 +41,20 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
private ScheduledFuture<?> monitorFuture; private ScheduledFuture<?> monitorFuture;
public SingleConnectionManager(SingleServerConfig cfg, Config config) { public SingleConnectionManager(SingleServerConfig cfg, Config config) {
super(create(cfg), config);
if (cfg.isDnsMonitoring()) {
try {
this.currentMaster.set(InetAddress.getByName(cfg.getAddress().getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host", e);
}
log.debug("DNS monitoring enabled; Current master set to {}", currentMaster.get());
monitorDnsChange(cfg);
}
}
private static MasterSlaveServersConfig create(SingleServerConfig cfg) {
MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig(); MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig();
String addr = cfg.getAddress().getHost() + ":" + cfg.getAddress().getPort(); String addr = cfg.getAddress().getHost() + ":" + cfg.getAddress().getPort();
newconfig.setRetryAttempts(cfg.getRetryAttempts()); newconfig.setRetryAttempts(cfg.getRetryAttempts());
@ -56,21 +70,11 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
newconfig.setConnectTimeout(cfg.getConnectTimeout()); newconfig.setConnectTimeout(cfg.getConnectTimeout());
newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize()); newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize());
newconfig.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); newconfig.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
return newconfig;
init(newconfig, config);
if (cfg.isDnsMonitoring()) {
try {
this.currentMaster.set(InetAddress.getByName(cfg.getAddress().getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host", e);
}
log.debug("DNS monitoring enabled; Current master set to {}", currentMaster.get());
monitorDnsChange(cfg);
}
} }
@Override @Override

@ -46,7 +46,10 @@ public class SingleEntry extends MasterSlaveEntry {
public void setupMasterEntry(String host, int port) { public void setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port); RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new ClientConnectionsEntry(masterClient, masterEntry = new ClientConnectionsEntry(masterClient,
config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize(), connectListener, NodeType.MASTER); config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSlaveConnectionMinimumIdleSize(),
config.getSlaveSubscriptionConnectionPoolSize(), connectListener, NodeType.MASTER, connectionManager.getConnectionWatcher(), config);
writeConnectionHolder.add(masterEntry); writeConnectionHolder.add(masterEntry);
pubSubConnectionHolder.add(masterEntry); pubSubConnectionHolder.add(masterEntry);
} }

@ -13,10 +13,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.connection; package org.redisson.connection.balancer;
import java.util.List; import java.util.List;
import org.redisson.connection.ClientConnectionsEntry;
public interface LoadBalancer { public interface LoadBalancer {
ClientConnectionsEntry getEntry(List<ClientConnectionsEntry> clientsCopy); ClientConnectionsEntry getEntry(List<ClientConnectionsEntry> clientsCopy);

@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.connection; package org.redisson.connection.balancer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.connection; package org.redisson.connection.balancer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -26,6 +26,9 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.ConnectionPool; import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll; import org.redisson.misc.PubSubConnectionPoll;

@ -13,12 +13,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.connection; package org.redisson.connection.balancer;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import org.redisson.connection.ClientConnectionsEntry;
public class RandomLoadBalancer implements LoadBalancer { public class RandomLoadBalancer implements LoadBalancer {
private final Random random = new SecureRandom(); private final Random random = new SecureRandom();

@ -13,11 +13,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.connection; package org.redisson.connection.balancer;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.connection.ClientConnectionsEntry;
public class RoundRobinLoadBalancer implements LoadBalancer { public class RoundRobinLoadBalancer implements LoadBalancer {
private final AtomicInteger index = new AtomicInteger(-1); private final AtomicInteger index = new AtomicInteger(-1);

@ -114,10 +114,6 @@ public class ConnectionPool<T extends RedisConnection> {
return config.getSlaveConnectionMinimumIdleSize(); return config.getSlaveConnectionMinimumIdleSize();
} }
public void remove(ClientConnectionsEntry entry) {
entries.remove(entry);
}
protected ClientConnectionsEntry getEntry() { protected ClientConnectionsEntry getEntry() {
return config.getLoadBalancer().getEntry(entries); return config.getLoadBalancer().getEntry(entries);
} }
@ -215,7 +211,6 @@ public class ConnectionPool<T extends RedisConnection> {
} }
} }
// promises.add(promise);
promise.tryFailure(cause); promise.tryFailure(cause);
} }
@ -242,7 +237,6 @@ public class ConnectionPool<T extends RedisConnection> {
releaseConnection(entry); releaseConnection(entry);
// promises.add(promise);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
promise.tryFailure(cause); promise.tryFailure(cause);
} }

@ -33,6 +33,10 @@ public class MasterConnectionPool extends ConnectionPool<RedisConnection> {
return entries.get(0); return entries.get(0);
} }
public void remove(ClientConnectionsEntry entry) {
entries.remove(entry);
}
@Override @Override
protected int getMinimumIdleSize(ClientConnectionsEntry entry) { protected int getMinimumIdleSize(ClientConnectionsEntry entry) {
return config.getMasterConnectionMinimumIdleSize(); return config.getMasterConnectionMinimumIdleSize();

Loading…
Cancel
Save