Minimal connections amount initialization fixed. #375

pull/395/head
Nikita 9 years ago
parent 01be3348b5
commit f843275b7f

@ -79,7 +79,16 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize); final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
for (int i = 0; i < minimumIdleSize; i++) { int startAmount = Math.min(50, minimumIdleSize);
final AtomicInteger requests = new AtomicInteger(startAmount);
for (int i = 0; i < startAmount; i++) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
}
private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final Promise<Void> initPromise,
final int minimumIdleSize, final AtomicInteger initializedConnections) {
if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) { if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) {
Throwable cause = new RedisConnectionException( Throwable cause = new RedisConnectionException(
"Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: " "Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: "
@ -88,7 +97,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
return; return;
} }
Future<T> promise = connectTo(entry); Future<T> promise = createConnection(entry);
promise.addListener(new FutureListener<T>() { promise.addListener(new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
@ -96,7 +105,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
T conn = future.getNow(); T conn = future.getNow();
releaseConnection(entry, conn); releaseConnection(entry, conn);
} }
releaseConnection(entry); releaseConnection(entry);
if (!future.isSuccess()) { if (!future.isSuccess()) {
Throwable cause = new RedisConnectionException( Throwable cause = new RedisConnectionException(
"Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: " "Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: "
@ -105,12 +116,17 @@ abstract class ConnectionPool<T extends RedisConnection> {
return; return;
} }
if (initializedConnections.decrementAndGet() == 0) { int value = initializedConnections.decrementAndGet();
if (value == 0) {
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
initPromise.setSuccess(null); initPromise.setSuccess(null);
} else if (value > 0 && !initPromise.isDone()) {
if (requests.incrementAndGet() <= minimumIdleSize) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
} }
} }
});
} }
});
} }
protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry); protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
@ -137,14 +153,13 @@ abstract class ConnectionPool<T extends RedisConnection> {
} }
} }
StringBuilder errorMsg = new StringBuilder("Connection pool exhausted! All connections are busy. "); StringBuilder errorMsg = new StringBuilder("Connection pool exhausted! All connections are busy. Try to increase connection pool size.");
if (!freezed.isEmpty()) { // if (!freezed.isEmpty()) {
errorMsg.append(" disconnected hosts: " + freezed); // errorMsg.append(" Disconnected hosts: " + freezed);
} // }
if (!zeroConnectionsAmount.isEmpty()) { if (!zeroConnectionsAmount.isEmpty()) {
errorMsg.append(" hosts with fully busy connections: " + zeroConnectionsAmount); errorMsg.append(" Hosts with fully busy connections: " + zeroConnectionsAmount);
} }
errorMsg.append(" Try to increase connection pool size.");
RedisConnectionException exception = new RedisConnectionException(errorMsg.toString()); RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
return connectionManager.newFailedFuture(exception); return connectionManager.newFailedFuture(exception);
@ -183,6 +198,10 @@ abstract class ConnectionPool<T extends RedisConnection> {
return promiseSuccessful(entry, conn); return promiseSuccessful(entry, conn);
} }
return createConnection(entry);
}
private Future<T> createConnection(final ClientConnectionsEntry entry) {
final Promise<T> promise = connectionManager.newPromise(); final Promise<T> promise = connectionManager.newPromise();
Future<T> connFuture = connect(entry); Future<T> connFuture = connect(entry);
connFuture.addListener(new FutureListener<T>() { connFuture.addListener(new FutureListener<T>() {

Loading…
Cancel
Save