Fixed - 30x spike in Commands to the failover shard #1567

pull/1613/head
Nikita 7 years ago
parent bb19fd7997
commit 7557715581

@ -582,7 +582,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setException(new RedisTimeoutException("Unable to get connection! "
+ "Node source: " + source
+ ", command: " + command + ", command params: " + LogHelper.toString(details.getParams())
+ " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
+ " after " + details.getAttempt() + " retry attempts"));
}
connectionManager.getShutdownLatch().release();
} else {

@ -55,6 +55,7 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.CountableListener;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
@ -671,6 +672,13 @@ public class CommandBatchService extends CommandAsyncService {
attempts = connectionManager.getConfig().getRetryAttempts();
}
final long interval;
if (options.getRetryInterval() > 0) {
interval = options.getRetryInterval();
} else {
interval = connectionManager.getConfig().getRetryInterval();
}
final FutureListener<Void> mainPromiseListener = new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
@ -692,21 +700,25 @@ public class CommandBatchService extends CommandAsyncService {
}
if (connectionFuture.cancel(false)) {
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Unable to get connection! "
+ "Node source: " + source + " after " + attempts + " retry attempts"));
}
connectionManager.getShutdownLatch().release();
} else {
if (connectionFuture.isSuccess()) {
if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {
if (details.getAttempt() == attempts) {
if (details.getWriteFuture() == null || details.getWriteFuture().cancel(false)) {
if (details.getWriteFuture() != null && details.getWriteFuture().cancel(false)) {
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Unable to send batch after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
details.setException(new RedisTimeoutException("Unable to send batch after " + attempts + " retry attempts"));
}
attemptPromise.tryFailure(details.getException());
}
return;
}
details.incAttempt();
Timeout timeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
Timeout timeout = connectionManager.newTimeout(this, interval, TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
return;
}
@ -741,11 +753,6 @@ public class CommandBatchService extends CommandAsyncService {
}
};
long interval = connectionManager.getConfig().getRetryInterval();
if (options.getRetryInterval() > 0) {
interval = options.getRetryInterval();
}
Timeout timeout = connectionManager.newTimeout(retryTimerTask, interval, TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
mainPromise.addListener(mainPromiseListener);

Loading…
Cancel
Save