Connection pool refactoring

pull/395/head
Nikita 9 years ago
parent ba18a3c5a7
commit b23b6adc03

@ -33,7 +33,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl;
import org.redisson.misc.MasterConnectionPool;
import org.redisson.connection.pool.MasterConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ -25,8 +25,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;
import org.redisson.connection.pool.PubSubConnectionPool;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -34,11 +33,11 @@ import io.netty.util.concurrent.Promise;
public class SingleEntry extends MasterSlaveEntry {
final ConnectionPool<RedisPubSubConnection> pubSubConnectionHolder;
final PubSubConnectionPool pubSubConnectionHolder;
public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(slotRanges, connectionManager, config);
pubSubConnectionHolder = new PubSubConnectionPoll(config, connectionManager, this) {
pubSubConnectionHolder = new PubSubConnectionPool(config, connectionManager, this) {
protected ClientConnectionsEntry getEntry() {
return entries.get(0);
}

@ -27,11 +27,11 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.ConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,13 +45,13 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPoll pubSubEntries;
private final ConnectionPool<RedisConnection> entries;
private final PubSubConnectionPool pubSubEntries;
private final SlaveConnectionPool entries;
public LoadBalancerManagerImpl(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager;
entries = new ConnectionPool<RedisConnection>(config, connectionManager, entry);
pubSubEntries = new PubSubConnectionPoll(config, connectionManager, entry);
entries = new SlaveConnectionPool(config, connectionManager, entry);
pubSubEntries = new PubSubConnectionPool(config, connectionManager, entry);
}
public Future<Void> add(final ClientConnectionsEntry entry) {

@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
package org.redisson.connection.pool;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@ -29,6 +31,8 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
@ -36,7 +40,9 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class ConnectionPool<T extends RedisConnection> {
abstract class ConnectionPool<T extends RedisConnection> {
private final Logger log = LoggerFactory.getLogger(getClass());
protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>();
@ -107,9 +113,7 @@ public class ConnectionPool<T extends RedisConnection> {
}
}
protected int getMinimumIdleSize(ClientConnectionsEntry entry) {
return config.getSlaveConnectionMinimumIdleSize();
}
protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
protected ClientConnectionsEntry getEntry() {
return config.getLoadBalancer().getEntry(entries);
@ -123,8 +127,26 @@ public class ConnectionPool<T extends RedisConnection> {
}
}
RedisConnectionException exception = new RedisConnectionException(
"Can't aquire connection from pool! " + entries);
List<InetSocketAddress> zeroConnectionsAmount = new LinkedList<InetSocketAddress>();
List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();
for (ClientConnectionsEntry entry : entries) {
if (entry.isFreezed()) {
freezed.add(entry.getClient().getAddr());
} else {
zeroConnectionsAmount.add(entry.getClient().getAddr());
}
}
StringBuilder errorMsg = new StringBuilder("Connection pool exhausted!");
if (!freezed.isEmpty()) {
errorMsg.append(" disconnected hosts: " + freezed);
}
if (!zeroConnectionsAmount.isEmpty()) {
errorMsg.append(" hosts with (available connections amount) = 0 : " + zeroConnectionsAmount);
}
errorMsg.append(" Try to increase connection pool size.");
RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
return connectionManager.newFailedFuture(exception);
}
@ -238,9 +260,11 @@ public class ConnectionPool<T extends RedisConnection> {
if (entry.getNodeType() == NodeType.SLAVE) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
scheduleCheck(entry);
} else {
if (entry.freezeMaster(FreezeReason.RECONNECT)) {
log.warn("host {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
scheduleCheck(entry);
}
}
@ -297,11 +321,13 @@ public class ConnectionPool<T extends RedisConnection> {
throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr());
} else {
synchronized (entry) {
if (entry.getFreezeReason() == FreezeReason.RECONNECT) {
entry.setFreezed(false);
entry.setFreezeReason(null);
log.info("host {} successfully reconnected", entry.getClient().getAddr());
}
}
}

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
package org.redisson.connection.pool;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
@ -21,8 +21,8 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ClientConnectionsEntry;
public class MasterConnectionPool extends ConnectionPool<RedisConnection> {
public class MasterConnectionPool extends ConnectionPool<RedisConnection> {
public MasterConnectionPool(MasterSlaveServersConfig config,
ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
super(config, connectionManager, masterSlaveEntry);

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
package org.redisson.connection.pool;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisPubSubConnection;
@ -23,9 +23,9 @@ import org.redisson.connection.ClientConnectionsEntry;
import io.netty.util.concurrent.Future;
public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection> {
public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection> {
public PubSubConnectionPoll(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
public PubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
super(config, connectionManager, masterSlaveEntry);
}
Loading…
Cancel
Save