diff --git a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index e0d869a23..5630f4760 100644 --- a/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/redisson/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -27,12 +27,12 @@ import org.redisson.client.RedisConnection; import org.redisson.client.RedisPubSubConnection; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.RPromise; +import org.redisson.pubsub.AsyncSemaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; public class ClientConnectionsEntry { @@ -40,10 +40,10 @@ public class ClientConnectionsEntry { private final Queue allSubscribeConnections = new ConcurrentLinkedQueue(); private final Queue freeSubscribeConnections = new ConcurrentLinkedQueue(); - private final AtomicInteger freeSubscribeConnectionsCounter = new AtomicInteger(); + private final AsyncSemaphore freeSubscribeConnectionsCounter; private final Queue freeConnections = new ConcurrentLinkedQueue(); - private final AtomicInteger freeConnectionsCounter = new AtomicInteger(); + private final AsyncSemaphore freeConnectionsCounter; public enum FreezeReason {MANAGER, RECONNECT, SYSTEM} @@ -59,10 +59,10 @@ public class ClientConnectionsEntry { public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize, ConnectionManager connectionManager, NodeType serverMode) { this.client = client; - this.freeConnectionsCounter.set(poolMaxSize); + this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize); this.connectionManager = connectionManager; this.nodeType = serverMode; - this.freeSubscribeConnectionsCounter.set(subscribePoolMaxSize); + this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize); if (subscribePoolMaxSize > 0) { connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter); @@ -107,27 +107,15 @@ public class ClientConnectionsEntry { } public int getFreeAmount() { - return freeConnectionsCounter.get(); + return freeConnectionsCounter.getCounter(); } - private boolean tryAcquire(AtomicInteger counter) { - while (true) { - int value = counter.get(); - if (value == 0) { - return false; - } - if (counter.compareAndSet(value, value - 1)) { - return true; - } - } - } - - public boolean tryAcquireConnection() { - return tryAcquire(freeConnectionsCounter); + public void acquireConnection(Runnable runnable) { + freeConnectionsCounter.acquire(runnable); } public void releaseConnection() { - freeConnectionsCounter.incrementAndGet(); + freeConnectionsCounter.release(); } public RedisConnection pollConnection() { @@ -228,12 +216,12 @@ public class ClientConnectionsEntry { freeSubscribeConnections.add(connection); } - public boolean tryAcquireSubscribeConnection() { - return tryAcquire(freeSubscribeConnectionsCounter); + public void acquireSubscribeConnection(Runnable runnable) { + freeSubscribeConnectionsCounter.acquire(runnable); } public void releaseSubscribeConnection() { - freeSubscribeConnectionsCounter.incrementAndGet(); + freeSubscribeConnectionsCounter.release(); } public boolean freezeMaster(FreezeReason reason) { diff --git a/redisson/src/main/java/org/redisson/connection/IdleConnectionWatcher.java b/redisson/src/main/java/org/redisson/connection/IdleConnectionWatcher.java index c96852a2e..a9dab67c4 100644 --- a/redisson/src/main/java/org/redisson/connection/IdleConnectionWatcher.java +++ b/redisson/src/main/java/org/redisson/connection/IdleConnectionWatcher.java @@ -19,10 +19,10 @@ import java.util.Collection; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.redisson.client.RedisConnection; import org.redisson.config.MasterSlaveServersConfig; +import org.redisson.pubsub.AsyncSemaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +38,10 @@ public class IdleConnectionWatcher { private final int minimumAmount; private final int maximumAmount; - private final AtomicInteger freeConnectionsCounter; + private final AsyncSemaphore freeConnectionsCounter; private final Collection connections; - public Entry(int minimumAmount, int maximumAmount, Collection connections, AtomicInteger freeConnectionsCounter) { + public Entry(int minimumAmount, int maximumAmount, Collection connections, AsyncSemaphore freeConnectionsCounter) { super(); this.minimumAmount = minimumAmount; this.maximumAmount = maximumAmount; @@ -84,10 +84,10 @@ public class IdleConnectionWatcher { } private boolean validateAmount(Entry entry) { - return entry.maximumAmount - entry.freeConnectionsCounter.get() + entry.connections.size() > entry.minimumAmount; + return entry.maximumAmount - entry.freeConnectionsCounter.getCounter() + entry.connections.size() > entry.minimumAmount; } - public void add(int minimumAmount, int maximumAmount, Collection connections, AtomicInteger freeConnectionsCounter) { + public void add(int minimumAmount, int maximumAmount, Collection connections, AsyncSemaphore freeConnectionsCounter) { entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter)); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 6ac025101..92122e56d 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -104,39 +104,51 @@ abstract class ConnectionPool { initPromise.tryFailure(cause); return; } - - RFuture promise = createConnection(entry); - promise.addListener(new FutureListener() { + + acquireConnection(entry, new Runnable() { + @Override - public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - T conn = future.getNow(); - releaseConnection(entry, conn); - } + public void run() { + RFuture promise = createConnection(entry, null); + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + T conn = future.getNow(); - releaseConnection(entry); + releaseConnection(entry, conn); + } - if (!future.isSuccess()) { - Throwable cause = new RedisConnectionException( - "Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: " - + entry.getClient().getAddr(), future.cause()); - initPromise.tryFailure(cause); - return; - } + releaseConnection(entry); - int value = initializedConnections.decrementAndGet(); - if (value == 0) { - log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); - if (!initPromise.trySuccess(null)) { - throw new IllegalStateException(); - } - } else if (value > 0 && !initPromise.isDone()) { - if (requests.incrementAndGet() <= minimumIdleSize) { - createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections); + if (!future.isSuccess()) { + Throwable cause = new RedisConnectionException( + "Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: " + + entry.getClient().getAddr(), future.cause()); + initPromise.tryFailure(cause); + return; + } + + int value = initializedConnections.decrementAndGet(); + if (value == 0) { + log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); + if (!initPromise.trySuccess(null)) { + throw new IllegalStateException(); + } + } else if (value > 0 && !initPromise.isDone()) { + if (requests.incrementAndGet() <= minimumIdleSize) { + createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections); + } + } } - } + }); } }); + + } + + protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) { + entry.acquireConnection(runnable); } protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry); @@ -148,27 +160,35 @@ abstract class ConnectionPool { public RFuture get() { for (int j = entries.size() - 1; j >= 0; j--) { ClientConnectionsEntry entry = getEntry(); - if (!entry.isFreezed() && tryAcquireConnection(entry)) { - return connectTo(entry); + if (!entry.isFreezed() + && tryAcquireConnection(entry)) { + final RPromise result = connectionManager.newPromise(); + acquireConnection(entry, new Runnable() { + @Override + public void run() { + connectTo(entry, result); + } + }); + return result; } } - - List zeroConnectionsAmount = new LinkedList(); + + List failedAttempts = new LinkedList(); List freezed = new LinkedList(); for (ClientConnectionsEntry entry : entries) { if (entry.isFreezed()) { freezed.add(entry.getClient().getAddr()); } else { - zeroConnectionsAmount.add(entry.getClient().getAddr()); + failedAttempts.add(entry.getClient().getAddr()); } } - StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " exhausted! "); + StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. "); if (!freezed.isEmpty()) { errorMsg.append(" Disconnected hosts: " + freezed); } - if (!zeroConnectionsAmount.isEmpty()) { - errorMsg.append(" Hosts with fully busy connections: " + zeroConnectionsAmount); + if (!failedAttempts.isEmpty()) { + errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts); } RedisConnectionException exception = new RedisConnectionException(errorMsg.toString()); @@ -178,7 +198,9 @@ abstract class ConnectionPool { public RFuture get(ClientConnectionsEntry entry) { if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed()) && tryAcquireConnection(entry)) { - return connectTo(entry); + RPromise result = connectionManager.newPromise(); + connectTo(entry, result); + return result; } RedisConnectionException exception = new RedisConnectionException( @@ -187,7 +209,7 @@ abstract class ConnectionPool { } protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { - return entry.getFailedAttempts() < config.getFailedAttempts() && entry.tryAcquireConnection(); + return entry.getFailedAttempts() < config.getFailedAttempts(); } protected T poll(ClientConnectionsEntry entry) { @@ -198,21 +220,29 @@ abstract class ConnectionPool { return (RFuture) entry.connect(); } - private RFuture connectTo(ClientConnectionsEntry entry) { + private void connectTo(ClientConnectionsEntry entry, RPromise promise) { T conn = poll(entry); if (conn != null) { if (!conn.isActive()) { - return promiseFailure(entry, conn); + promiseFailure(entry, promise, conn); + return; } - return promiseSuccessful(entry, conn); + entry.resetFailedAttempts(); + promise.trySuccess(conn); + return; } - return createConnection(entry); + createConnection(entry, promise); } - private RFuture createConnection(final ClientConnectionsEntry entry) { - final RPromise promise = connectionManager.newPromise(); + private RFuture createConnection(final ClientConnectionsEntry entry, RPromise ppromise) { + final RPromise promise; + if (ppromise != null) { + promise = ppromise; + } else { + promise = connectionManager.newPromise(); + } RFuture connFuture = connect(entry); connFuture.addListener(new FutureListener() { @Override @@ -242,11 +272,6 @@ abstract class ConnectionPool { } } - private RFuture promiseSuccessful(ClientConnectionsEntry entry, T conn) { - entry.resetFailedAttempts(); - return (RFuture) conn.getAcquireFuture(); - } - private void promiseFailure(ClientConnectionsEntry entry, RPromise promise, Throwable cause) { if (entry.incFailedAttempts() == config.getFailedAttempts()) { checkForReconnect(entry); @@ -274,23 +299,6 @@ abstract class ConnectionPool { promise.tryFailure(cause); } - private RFuture promiseFailure(ClientConnectionsEntry entry, T conn) { - int attempts = entry.incFailedAttempts(); - if (attempts == config.getFailedAttempts()) { - conn.closeAsync(); - checkForReconnect(entry); - } else if (attempts < config.getFailedAttempts()) { - releaseConnection(entry, conn); - } else { - conn.closeAsync(); - } - - releaseConnection(entry); - - RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); - return connectionManager.newFailedFuture(cause); - } - private void checkForReconnect(ClientConnectionsEntry entry) { if (entry.getNodeType() == NodeType.SLAVE) { masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(), diff --git a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java index 82ede0d4c..b3bef548c 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/PubSubConnectionPool.java @@ -50,10 +50,10 @@ public class PubSubConnectionPool extends ConnectionPool } @Override - protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { - return entry.tryAcquireSubscribeConnection(); + protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) { + entry.acquireSubscribeConnection(runnable); } - + @Override protected void releaseConnection(ClientConnectionsEntry entry) { entry.releaseSubscribeConnection(); diff --git a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java index 49f3d29b2..74ebcd055 100644 --- a/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java +++ b/redisson/src/main/java/org/redisson/pubsub/AsyncSemaphore.java @@ -74,6 +74,10 @@ public class AsyncSemaphore { } } + public int getCounter() { + return counter; + } + public void release() { Runnable runnable = null;