Remove `connection pool exhausted` exception #680

2.2.x
Nikita 8 years ago
parent 28b1b0ef8d
commit 0028f43e6b

@ -33,16 +33,18 @@ import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.redisson.pubsub.AsyncSemaphore;
public class ClientConnectionsEntry {
final Logger log = LoggerFactory.getLogger(getClass());
private final Queue<RedisPubSubConnection> allSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger freeSubscribeConnectionsCounter = new AtomicInteger();
private final AsyncSemaphore freeSubscribeConnectionsCounter;
private final Queue<RedisConnection> freeConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger freeConnectionsCounter = new AtomicInteger();
private final AsyncSemaphore freeConnectionsCounter;
public enum FreezeReason {MANAGER, RECONNECT, SYSTEM}
@ -58,10 +60,10 @@ public class ClientConnectionsEntry {
public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize,
ConnectionManager connectionManager, NodeType serverMode) {
this.client = client;
this.freeConnectionsCounter.set(poolMaxSize);
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize);
this.connectionManager = connectionManager;
this.nodeType = serverMode;
this.freeSubscribeConnectionsCounter.set(subscribePoolMaxSize);
this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize);
if (subscribePoolMaxSize > 0) {
connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter);
@ -106,27 +108,15 @@ public class ClientConnectionsEntry {
}
public int getFreeAmount() {
return freeConnectionsCounter.get();
}
private boolean tryAcquire(AtomicInteger counter) {
while (true) {
int value = counter.get();
if (value == 0) {
return false;
}
if (counter.compareAndSet(value, value - 1)) {
return true;
}
}
return freeConnectionsCounter.getCounter();
}
public boolean tryAcquireConnection() {
return tryAcquire(freeConnectionsCounter);
public void acquireConnection(Runnable runnable) {
freeConnectionsCounter.acquire(runnable);
}
public void releaseConnection() {
freeConnectionsCounter.incrementAndGet();
freeConnectionsCounter.release();
}
public RedisConnection pollConnection() {
@ -139,7 +129,7 @@ public class ClientConnectionsEntry {
}
public Future<RedisConnection> connect() {
final Promise<RedisConnection> connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise();
final Promise<RedisConnection> connectionFuture = connectionManager.newPromise();
Future<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {
@Override
@ -192,7 +182,7 @@ public class ClientConnectionsEntry {
}
public Future<RedisPubSubConnection> connectPubSub() {
final Promise<RedisPubSubConnection> connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise();
final Promise<RedisPubSubConnection> connectionFuture = connectionManager.newPromise();
Future<RedisPubSubConnection> future = client.connectPubSubAsync();
future.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
@ -227,12 +217,12 @@ public class ClientConnectionsEntry {
freeSubscribeConnections.add(connection);
}
public boolean tryAcquireSubscribeConnection() {
return tryAcquire(freeSubscribeConnectionsCounter);
public void acquireSubscribeConnection(Runnable runnable) {
freeSubscribeConnectionsCounter.acquire(runnable);
}
public void releaseSubscribeConnection() {
freeSubscribeConnectionsCounter.incrementAndGet();
freeSubscribeConnectionsCounter.release();
}
public boolean freezeMaster(FreezeReason reason) {

@ -30,6 +30,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.redisson.pubsub.AsyncSemaphore;
public class IdleConnectionWatcher {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -38,10 +40,10 @@ public class IdleConnectionWatcher {
private final int minimumAmount;
private final int maximumAmount;
private final AtomicInteger freeConnectionsCounter;
private final AsyncSemaphore freeConnectionsCounter;
private final Collection<? extends RedisConnection> connections;
public Entry(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AtomicInteger freeConnectionsCounter) {
public Entry(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AsyncSemaphore freeConnectionsCounter) {
super();
this.minimumAmount = minimumAmount;
this.maximumAmount = maximumAmount;
@ -84,10 +86,10 @@ public class IdleConnectionWatcher {
}
private boolean validateAmount(Entry entry) {
return entry.maximumAmount - entry.freeConnectionsCounter.get() + entry.connections.size() > entry.minimumAmount;
return entry.maximumAmount - entry.freeConnectionsCounter.getCounter() + entry.connections.size() > entry.minimumAmount;
}
public void add(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AtomicInteger freeConnectionsCounter) {
public void add(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AsyncSemaphore freeConnectionsCounter) {
entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter));
}

@ -40,6 +40,15 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import org.redisson.pubsub.AsyncSemaphore;
/**
* Base connection pool class
*
* @author Nikita Koksharov
*
* @param <T> - connection type
*/
abstract class ConnectionPool<T extends RedisConnection> {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -74,7 +83,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
final int minimumIdleSize = getMinimumIdleSize(entry);
if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {
initPromise.setSuccess(null);
initPromise.trySuccess(null);
return;
}
@ -97,12 +106,18 @@ abstract class ConnectionPool<T extends RedisConnection> {
return;
}
Future<T> promise = createConnection(entry);
acquireConnection(entry, new Runnable() {
@Override
public void run() {
Promise<T> promise = connectionManager.newPromise();
createConnection(entry, promise);
promise.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (future.isSuccess()) {
T conn = future.getNow();
releaseConnection(entry, conn);
}
@ -119,7 +134,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
int value = initializedConnections.decrementAndGet();
if (value == 0) {
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
initPromise.setSuccess(null);
if (!initPromise.trySuccess(null)) {
throw new IllegalStateException();
}
} else if (value > 0 && !initPromise.isDone()) {
if (requests.incrementAndGet() <= minimumIdleSize) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
@ -128,6 +145,13 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
});
}
});
}
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) {
entry.acquireConnection(runnable);
}
protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
@ -137,28 +161,36 @@ abstract class ConnectionPool<T extends RedisConnection> {
public Future<T> get() {
for (int j = entries.size() - 1; j >= 0; j--) {
ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
return connectTo(entry);
final ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed()
&& tryAcquireConnection(entry)) {
final Promise<T> result = connectionManager.newPromise();
acquireConnection(entry, new Runnable() {
@Override
public void run() {
connectTo(entry, result);
}
});
return result;
}
}
List<InetSocketAddress> zeroConnectionsAmount = new LinkedList<InetSocketAddress>();
List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();
List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();
for (ClientConnectionsEntry entry : entries) {
if (entry.isFreezed()) {
freezed.add(entry.getClient().getAddr());
} else {
zeroConnectionsAmount.add(entry.getClient().getAddr());
failedAttempts.add(entry.getClient().getAddr());
}
}
StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " exhausted! ");
StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
if (!freezed.isEmpty()) {
errorMsg.append(" Disconnected hosts: " + freezed);
}
if (!zeroConnectionsAmount.isEmpty()) {
errorMsg.append(" Hosts with fully busy connections: " + zeroConnectionsAmount);
if (!failedAttempts.isEmpty()) {
errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);
}
RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
@ -168,7 +200,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
public Future<T> get(ClientConnectionsEntry entry) {
if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed())
&& tryAcquireConnection(entry)) {
return connectTo(entry);
Promise<T> result = connectionManager.newPromise();
connectTo(entry, result);
return result;
}
RedisConnectionException exception = new RedisConnectionException(
@ -177,7 +211,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
return entry.getFailedAttempts() < config.getFailedAttempts() && entry.tryAcquireConnection();
return entry.getFailedAttempts() < config.getFailedAttempts();
}
protected T poll(ClientConnectionsEntry entry) {
@ -188,28 +222,31 @@ abstract class ConnectionPool<T extends RedisConnection> {
return (Future<T>) entry.connect();
}
private Future<T> connectTo(ClientConnectionsEntry entry) {
private void connectTo(ClientConnectionsEntry entry, Promise<T> promise) {
if (promise.isDone()) {
releaseConnection(entry);
return;
}
T conn = poll(entry);
if (conn != null) {
if (!conn.isActive()) {
return promiseFailure(entry, conn);
promiseFailure(entry, promise, conn);
return;
}
return promiseSuccessful(entry, conn);
connectedSuccessful(entry, promise, conn);
return;
}
return createConnection(entry);
createConnection(entry, promise);
}
private Future<T> createConnection(final ClientConnectionsEntry entry) {
final Promise<T> promise = connectionManager.newPromise();
private void createConnection(final ClientConnectionsEntry entry, final Promise promise) {
Future<T> connFuture = connect(entry);
connFuture.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
releaseConnection(entry);
promiseFailure(entry, promise, future.cause());
return;
}
@ -220,13 +257,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
return;
}
promiseSuccessful(entry, promise, conn);
connectedSuccessful(entry, promise, conn);
}
});
return promise;
}
private void promiseSuccessful(ClientConnectionsEntry entry, Promise<T> promise, T conn) {
private void connectedSuccessful(ClientConnectionsEntry entry, Promise<T> promise, T conn) {
entry.resetFailedAttempts();
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
@ -234,45 +270,31 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
}
private Future<T> promiseSuccessful(ClientConnectionsEntry entry, T conn) {
entry.resetFailedAttempts();
return (Future<T>) conn.getAcquireFuture();
}
private void promiseFailure(ClientConnectionsEntry entry, Promise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getFailedAttempts()) {
checkForReconnect(entry);
}
promise.tryFailure(cause);
}
private void promiseFailure(ClientConnectionsEntry entry, Promise<T> promise, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
}
releaseConnection(entry);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
promise.tryFailure(cause);
}
private Future<T> promiseFailure(ClientConnectionsEntry entry, T conn) {
private void promiseFailure(ClientConnectionsEntry entry, Promise<T> promise, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
conn.closeAsync();
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
} else {
conn.closeAsync();
}
releaseConnection(entry);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
return connectionManager.newFailedFuture(cause);
promise.tryFailure(cause);
}
private void checkForReconnect(ClientConnectionsEntry entry) {

@ -23,6 +23,12 @@ import org.redisson.connection.ClientConnectionsEntry;
import io.netty.util.concurrent.Future;
/**
* Connection pool for Publish / Subscribe
*
* @author Nikita Koksharov
*
*/
public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection> {
public PubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
@ -45,8 +51,8 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
}
@Override
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
return entry.tryAcquireSubscribeConnection();
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) {
entry.acquireSubscribeConnection(runnable);
}
@Override

@ -74,6 +74,10 @@ public class AsyncSemaphore {
}
}
public int getCounter() {
return counter;
}
public void release() {
Runnable runnable = null;

Loading…
Cancel
Save