From ec53659eecb2a2b0e8ebfce57ac622e08baa327c Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 11 Dec 2015 18:05:59 +0300 Subject: [PATCH] Checking enough connections amount according settings during Redisson start. --- .../org/redisson/connection/SingleEntry.java | 4 ++ .../org/redisson/misc/ConnectionPool.java | 40 +++++++++++-------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/redisson/connection/SingleEntry.java b/src/main/java/org/redisson/connection/SingleEntry.java index ce5097ee6..3f24627bb 100644 --- a/src/main/java/org/redisson/connection/SingleEntry.java +++ b/src/main/java/org/redisson/connection/SingleEntry.java @@ -60,6 +60,10 @@ public class SingleEntry extends MasterSlaveEntry { AtomicInteger counter = new AtomicInteger(2); @Override public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + res.setFailure(future.cause()); + return; + } if (counter.decrementAndGet() == 0) { res.setSuccess(null); } diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index 4b6af8d49..8a0b937b6 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -54,31 +54,30 @@ public class ConnectionPool { public Future add(final ClientConnectionsEntry entry) { final Promise promise = connectionManager.newPromise(); - initConnections(entry, new Runnable() { + promise.addListener(new FutureListener() { @Override - public void run() { + public void operationComplete(Future future) throws Exception { entries.add(entry); - promise.setSuccess(null); } - }, true); + }); + initConnections(entry, promise, true); return promise; } - private void initConnections(final ClientConnectionsEntry entry, final Runnable runnable, boolean checkFreezed) { + private void initConnections(final ClientConnectionsEntry entry, final Promise initPromise, boolean checkFreezed) { int minimumIdleSize = getMinimumIdleSize(entry); if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) { - runnable.run(); + initPromise.setSuccess(null); return; } - final AtomicInteger completedConnections = new AtomicInteger(minimumIdleSize); + final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize); for (int i = 0; i < minimumIdleSize; i++) { if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) { - if (completedConnections.decrementAndGet() == 0) { - runnable.run(); - } - continue; + Throwable cause = new RedisConnectionException("Can't init enough connections amount! from " + entry.getClient().getAddr()); + initPromise.tryFailure(cause); + return; } Promise promise = connectionManager.newPromise(); @@ -91,9 +90,14 @@ public class ConnectionPool { releaseConnection(entry, conn); } releaseConnection(entry); + if (!future.isSuccess()) { + Throwable cause = new RedisConnectionException("Can't init enough connections amount! from " + entry.getClient().getAddr()); + initPromise.tryFailure(cause); + return; + } - if (completedConnections.decrementAndGet() == 0) { - runnable.run(); + if (initializedConnections.decrementAndGet() == 0) { + initPromise.setSuccess(null); } } }); @@ -272,9 +276,11 @@ public class ConnectionPool { if (future.isSuccess() && "PONG".equals(future.getNow())) { entry.resetFailedAttempts(); - initConnections(entry, new Runnable() { + Promise promise = connectionManager.newPromise(); + promise.addListener(new FutureListener() { @Override - public void run() { + public void operationComplete(Future future) + throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); } else { @@ -286,8 +292,8 @@ public class ConnectionPool { } } } - }, false); - + }); + initConnections(entry, promise, false); } else { scheduleCheck(entry); }