|
|
|
@ -54,31 +54,30 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
|
|
|
|
|
public Future<Void> add(final ClientConnectionsEntry entry) {
|
|
|
|
|
final Promise<Void> promise = connectionManager.newPromise();
|
|
|
|
|
initConnections(entry, new Runnable() {
|
|
|
|
|
promise.addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
entries.add(entry);
|
|
|
|
|
promise.setSuccess(null);
|
|
|
|
|
}
|
|
|
|
|
}, true);
|
|
|
|
|
});
|
|
|
|
|
initConnections(entry, promise, true);
|
|
|
|
|
return promise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void initConnections(final ClientConnectionsEntry entry, final Runnable runnable, boolean checkFreezed) {
|
|
|
|
|
private void initConnections(final ClientConnectionsEntry entry, final Promise<Void> initPromise, boolean checkFreezed) {
|
|
|
|
|
int minimumIdleSize = getMinimumIdleSize(entry);
|
|
|
|
|
|
|
|
|
|
if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {
|
|
|
|
|
runnable.run();
|
|
|
|
|
initPromise.setSuccess(null);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize);
|
|
|
|
|
final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);
|
|
|
|
|
for (int i = 0; i < minimumIdleSize; i++) {
|
|
|
|
|
if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) {
|
|
|
|
|
if (completedConnections.decrementAndGet() == 0) {
|
|
|
|
|
runnable.run();
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
Throwable cause = new RedisConnectionException("Can't init enough connections amount! from " + entry.getClient().getAddr());
|
|
|
|
|
initPromise.tryFailure(cause);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Promise<T> promise = connectionManager.newPromise();
|
|
|
|
@ -91,9 +90,14 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
releaseConnection(entry, conn);
|
|
|
|
|
}
|
|
|
|
|
releaseConnection(entry);
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
Throwable cause = new RedisConnectionException("Can't init enough connections amount! from " + entry.getClient().getAddr());
|
|
|
|
|
initPromise.tryFailure(cause);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (completedConnections.decrementAndGet() == 0) {
|
|
|
|
|
runnable.run();
|
|
|
|
|
if (initializedConnections.decrementAndGet() == 0) {
|
|
|
|
|
initPromise.setSuccess(null);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
@ -272,9 +276,11 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
|
|
|
|
|
if (future.isSuccess() && "PONG".equals(future.getNow())) {
|
|
|
|
|
entry.resetFailedAttempts();
|
|
|
|
|
initConnections(entry, new Runnable() {
|
|
|
|
|
Promise<Void> promise = connectionManager.newPromise();
|
|
|
|
|
promise.addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
public void operationComplete(Future<Void> future)
|
|
|
|
|
throws Exception {
|
|
|
|
|
if (entry.getNodeType() == NodeType.SLAVE) {
|
|
|
|
|
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
|
|
|
|
|
} else {
|
|
|
|
@ -286,8 +292,8 @@ public class ConnectionPool<T extends RedisConnection> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}, false);
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
initConnections(entry, promise, false);
|
|
|
|
|
} else {
|
|
|
|
|
scheduleCheck(entry);
|
|
|
|
|
}
|
|
|
|
|