From b0d9803593f619bd14531c6a88a58e8dce18e45e Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 18 Dec 2015 13:35:07 +0300 Subject: [PATCH] Object allocation optimization. #338 --- .../cluster/ClusterConnectionManager.java | 6 +-- .../redisson/command/CommandAsyncService.java | 4 +- .../connection/ConnectionManager.java | 2 + .../MasterSlaveConnectionManager.java | 4 +- .../org/redisson/misc/ConnectionPool.java | 42 ++++++++++++------- 5 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 5a8ac56f9..e2aec4881 100644 --- a/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -117,19 +117,19 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private Collection> addMasterEntry(final ClusterPartition partition, ClusterServersConfig cfg) { if (partition.isMasterFail()) { log.warn("add master: {} for slot ranges: {} failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getSlotRanges()); - Future f = newSucceededFuture(); + Future f = newSucceededFuture(null); return Collections.singletonList(f); } RedisConnection connection = connect(cfg, partition.getMasterAddress()); if (connection == null) { - Future f = newSucceededFuture(); + Future f = newSucceededFuture(null); return Collections.singletonList(f); } Map params = connection.sync(RedisCommands.CLUSTER_INFO); if ("fail".equals(params.get("cluster_state"))) { log.warn("add master: {} for slot ranges: {} failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getSlotRanges()); - Future f = newSucceededFuture(); + Future f = newSucceededFuture(null); return Collections.singletonList(f); } diff --git a/src/main/java/org/redisson/command/CommandAsyncService.java b/src/main/java/org/redisson/command/CommandAsyncService.java index bdf28b9d2..4f7c6875f 100644 --- a/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/src/main/java/org/redisson/command/CommandAsyncService.java @@ -363,6 +363,9 @@ public class CommandAsyncService implements CommandAsyncExecutor { } if (attempt == connectionManager.getConfig().getRetryAttempts()) { + if (exceptionRef.get() == null) { + exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params))); + } attemptPromise.tryFailure(exceptionRef.get()); return; } @@ -375,7 +378,6 @@ public class CommandAsyncService implements CommandAsyncExecutor { } }; - exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params))); Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); timeoutRef.set(timeout); diff --git a/src/main/java/org/redisson/connection/ConnectionManager.java b/src/main/java/org/redisson/connection/ConnectionManager.java index c83e1f358..b1745116f 100644 --- a/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/src/main/java/org/redisson/connection/ConnectionManager.java @@ -45,6 +45,8 @@ import io.netty.util.concurrent.Promise; */ public interface ConnectionManager { + Future newSucceededFuture(R value); + ConnectionEventsHub getConnectionEventsHub(); boolean isShutdown(); diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 684522dfd..f93912b10 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -673,8 +673,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return group.next().newPromise(); } - public Future newSucceededFuture() { - return group.next().newSucceededFuture(null); + public Future newSucceededFuture(R value) { + return group.next().newSucceededFuture(value); } @Override diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index dfe933b1c..eaa7aff38 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -82,8 +82,7 @@ public class ConnectionPool { return; } - Promise promise = connectionManager.newPromise(); - connect(entry, promise); + Future promise = connectTo(entry); promise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -118,9 +117,7 @@ public class ConnectionPool { for (int j = entries.size() - 1; j >= 0; j--) { ClientConnectionsEntry entry = getEntry(); if (!entry.isFreezed() && tryAcquireConnection(entry)) { - Promise promise = connectionManager.newPromise(); - connect(entry, promise); - return promise; + return connectTo(entry); } } @@ -132,9 +129,7 @@ public class ConnectionPool { public Future get(ClientConnectionsEntry entry) { if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed()) && tryAcquireConnection(entry)) { - Promise promise = connectionManager.newPromise(); - connect(entry, promise); - return promise; + return connectTo(entry); } RedisConnectionException exception = new RedisConnectionException( @@ -154,18 +149,17 @@ public class ConnectionPool { return (Future) entry.connect(config); } - private void connect(final ClientConnectionsEntry entry, final Promise promise) { + private Future connectTo(final ClientConnectionsEntry entry) { T conn = poll(entry); if (conn != null) { if (!conn.isActive()) { - promiseFailure(entry, promise, conn); - return; + return promiseFailure(entry, conn); } - promiseSuccessful(entry, promise, conn); - return; + return promiseSuccessful(entry, conn); } + final Promise promise = connectionManager.newPromise(); Future connFuture = connect(entry); connFuture.addListener(new FutureListener() { @Override @@ -186,9 +180,10 @@ public class ConnectionPool { promiseSuccessful(entry, promise, conn); } }); + return promise; } - private void promiseSuccessful(final ClientConnectionsEntry entry, final Promise promise, T conn) { + private void promiseSuccessful(ClientConnectionsEntry entry, Promise promise, T conn) { entry.resetFailedAttempts(); if (!promise.trySuccess(conn)) { releaseConnection(entry, conn); @@ -196,6 +191,11 @@ public class ConnectionPool { } } + private Future promiseSuccessful(ClientConnectionsEntry entry, T conn) { + entry.resetFailedAttempts(); + return connectionManager.newSucceededFuture(conn); + } + private void promiseFailure(ClientConnectionsEntry entry, Promise promise, Throwable cause) { if (entry.incFailedAttempts() == config.getFailedAttempts()) { checkForReconnect(entry); @@ -218,6 +218,20 @@ public class ConnectionPool { promise.tryFailure(cause); } + private Future promiseFailure(ClientConnectionsEntry entry, T conn) { + int attempts = entry.incFailedAttempts(); + if (attempts == config.getFailedAttempts()) { + checkForReconnect(entry); + } else if (attempts < config.getFailedAttempts()) { + releaseConnection(entry, conn); + } + + releaseConnection(entry); + + RedisConnectionException cause = new RedisConnectionException(conn + " is not active!"); + return connectionManager.newFailedFuture(cause); + } + private void checkForReconnect(ClientConnectionsEntry entry) { if (entry.getNodeType() == NodeType.SLAVE) { connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),