Object allocation optimization. #338

pull/365/head
Nikita 9 years ago
parent 9468dd6fc0
commit b0d9803593

@ -117,19 +117,19 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private Collection<Future<Void>> addMasterEntry(final ClusterPartition partition, ClusterServersConfig cfg) { private Collection<Future<Void>> addMasterEntry(final ClusterPartition partition, ClusterServersConfig cfg) {
if (partition.isMasterFail()) { if (partition.isMasterFail()) {
log.warn("add master: {} for slot ranges: {} failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getSlotRanges()); log.warn("add master: {} for slot ranges: {} failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getSlotRanges());
Future<Void> f = newSucceededFuture(); Future<Void> f = newSucceededFuture(null);
return Collections.singletonList(f); return Collections.singletonList(f);
} }
RedisConnection connection = connect(cfg, partition.getMasterAddress()); RedisConnection connection = connect(cfg, partition.getMasterAddress());
if (connection == null) { if (connection == null) {
Future<Void> f = newSucceededFuture(); Future<Void> f = newSucceededFuture(null);
return Collections.singletonList(f); return Collections.singletonList(f);
} }
Map<String, String> params = connection.sync(RedisCommands.CLUSTER_INFO); Map<String, String> params = connection.sync(RedisCommands.CLUSTER_INFO);
if ("fail".equals(params.get("cluster_state"))) { if ("fail".equals(params.get("cluster_state"))) {
log.warn("add master: {} for slot ranges: {} failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getSlotRanges()); log.warn("add master: {} for slot ranges: {} failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getSlotRanges());
Future<Void> f = newSucceededFuture(); Future<Void> f = newSucceededFuture(null);
return Collections.singletonList(f); return Collections.singletonList(f);
} }

@ -363,6 +363,9 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
if (attempt == connectionManager.getConfig().getRetryAttempts()) { if (attempt == connectionManager.getConfig().getRetryAttempts()) {
if (exceptionRef.get() == null) {
exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params)));
}
attemptPromise.tryFailure(exceptionRef.get()); attemptPromise.tryFailure(exceptionRef.get());
return; return;
} }
@ -375,7 +378,6 @@ public class CommandAsyncService implements CommandAsyncExecutor {
} }
}; };
exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params)));
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
timeoutRef.set(timeout); timeoutRef.set(timeout);

@ -45,6 +45,8 @@ import io.netty.util.concurrent.Promise;
*/ */
public interface ConnectionManager { public interface ConnectionManager {
<R> Future<R> newSucceededFuture(R value);
ConnectionEventsHub getConnectionEventsHub(); ConnectionEventsHub getConnectionEventsHub();
boolean isShutdown(); boolean isShutdown();

@ -673,8 +673,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return group.next().newPromise(); return group.next().newPromise();
} }
public <R> Future<R> newSucceededFuture() { public <R> Future<R> newSucceededFuture(R value) {
return group.next().newSucceededFuture(null); return group.next().newSucceededFuture(value);
} }
@Override @Override

@ -82,8 +82,7 @@ public class ConnectionPool<T extends RedisConnection> {
return; return;
} }
Promise<T> promise = connectionManager.newPromise(); Future<T> promise = connectTo(entry);
connect(entry, promise);
promise.addListener(new FutureListener<T>() { promise.addListener(new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
@ -118,9 +117,7 @@ public class ConnectionPool<T extends RedisConnection> {
for (int j = entries.size() - 1; j >= 0; j--) { for (int j = entries.size() - 1; j >= 0; j--) {
ClientConnectionsEntry entry = getEntry(); ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed() && tryAcquireConnection(entry)) { if (!entry.isFreezed() && tryAcquireConnection(entry)) {
Promise<T> promise = connectionManager.newPromise(); return connectTo(entry);
connect(entry, promise);
return promise;
} }
} }
@ -132,9 +129,7 @@ public class ConnectionPool<T extends RedisConnection> {
public Future<T> get(ClientConnectionsEntry entry) { public Future<T> get(ClientConnectionsEntry entry) {
if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed()) if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed())
&& tryAcquireConnection(entry)) { && tryAcquireConnection(entry)) {
Promise<T> promise = connectionManager.newPromise(); return connectTo(entry);
connect(entry, promise);
return promise;
} }
RedisConnectionException exception = new RedisConnectionException( RedisConnectionException exception = new RedisConnectionException(
@ -154,18 +149,17 @@ public class ConnectionPool<T extends RedisConnection> {
return (Future<T>) entry.connect(config); return (Future<T>) entry.connect(config);
} }
private void connect(final ClientConnectionsEntry entry, final Promise<T> promise) { private Future<T> connectTo(final ClientConnectionsEntry entry) {
T conn = poll(entry); T conn = poll(entry);
if (conn != null) { if (conn != null) {
if (!conn.isActive()) { if (!conn.isActive()) {
promiseFailure(entry, promise, conn); return promiseFailure(entry, conn);
return;
} }
promiseSuccessful(entry, promise, conn); return promiseSuccessful(entry, conn);
return;
} }
final Promise<T> promise = connectionManager.newPromise();
Future<T> connFuture = connect(entry); Future<T> connFuture = connect(entry);
connFuture.addListener(new FutureListener<T>() { connFuture.addListener(new FutureListener<T>() {
@Override @Override
@ -186,9 +180,10 @@ public class ConnectionPool<T extends RedisConnection> {
promiseSuccessful(entry, promise, conn); promiseSuccessful(entry, promise, conn);
} }
}); });
return promise;
} }
private void promiseSuccessful(final ClientConnectionsEntry entry, final Promise<T> promise, T conn) { private void promiseSuccessful(ClientConnectionsEntry entry, Promise<T> promise, T conn) {
entry.resetFailedAttempts(); entry.resetFailedAttempts();
if (!promise.trySuccess(conn)) { if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn); releaseConnection(entry, conn);
@ -196,6 +191,11 @@ public class ConnectionPool<T extends RedisConnection> {
} }
} }
private Future<T> promiseSuccessful(ClientConnectionsEntry entry, T conn) {
entry.resetFailedAttempts();
return connectionManager.newSucceededFuture(conn);
}
private void promiseFailure(ClientConnectionsEntry entry, Promise<T> promise, Throwable cause) { private void promiseFailure(ClientConnectionsEntry entry, Promise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getFailedAttempts()) { if (entry.incFailedAttempts() == config.getFailedAttempts()) {
checkForReconnect(entry); checkForReconnect(entry);
@ -218,6 +218,20 @@ public class ConnectionPool<T extends RedisConnection> {
promise.tryFailure(cause); promise.tryFailure(cause);
} }
private Future<T> promiseFailure(ClientConnectionsEntry entry, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
}
releaseConnection(entry);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
return connectionManager.newFailedFuture(cause);
}
private void checkForReconnect(ClientConnectionsEntry entry) { private void checkForReconnect(ClientConnectionsEntry entry) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),

Loading…
Cancel
Save