From 7557715581689538474dcd402429f05f99988a6e Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 27 Aug 2018 17:07:57 +0300 Subject: [PATCH] Fixed - 30x spike in Commands to the failover shard #1567 --- .../redisson/command/CommandAsyncService.java | 2 +- .../redisson/command/CommandBatchService.java | 23 ++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index b780f7c90..d66a7b74a 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -582,7 +582,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { details.setException(new RedisTimeoutException("Unable to get connection! " + "Node source: " + source + ", command: " + command + ", command params: " + LogHelper.toString(details.getParams()) - + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")); + + " after " + details.getAttempt() + " retry attempts")); } connectionManager.getShutdownLatch().release(); } else { diff --git a/redisson/src/main/java/org/redisson/command/CommandBatchService.java b/redisson/src/main/java/org/redisson/command/CommandBatchService.java index 31c6ebc46..232b4f933 100644 --- a/redisson/src/main/java/org/redisson/command/CommandBatchService.java +++ b/redisson/src/main/java/org/redisson/command/CommandBatchService.java @@ -55,6 +55,7 @@ import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource.Redirect; import org.redisson.misc.CountableListener; +import org.redisson.misc.LogHelper; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonPromise; @@ -671,6 +672,13 @@ public class CommandBatchService extends CommandAsyncService { attempts = connectionManager.getConfig().getRetryAttempts(); } + final long interval; + if (options.getRetryInterval() > 0) { + interval = options.getRetryInterval(); + } else { + interval = connectionManager.getConfig().getRetryInterval(); + } + final FutureListener mainPromiseListener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -692,21 +700,25 @@ public class CommandBatchService extends CommandAsyncService { } if (connectionFuture.cancel(false)) { + if (details.getException() == null) { + details.setException(new RedisTimeoutException("Unable to get connection! " + + "Node source: " + source + " after " + attempts + " retry attempts")); + } connectionManager.getShutdownLatch().release(); } else { if (connectionFuture.isSuccess()) { if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) { if (details.getAttempt() == attempts) { - if (details.getWriteFuture() == null || details.getWriteFuture().cancel(false)) { + if (details.getWriteFuture() != null && details.getWriteFuture().cancel(false)) { if (details.getException() == null) { - details.setException(new RedisTimeoutException("Unable to send batch after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")); + details.setException(new RedisTimeoutException("Unable to send batch after " + attempts + " retry attempts")); } attemptPromise.tryFailure(details.getException()); } return; } details.incAttempt(); - Timeout timeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + Timeout timeout = connectionManager.newTimeout(this, interval, TimeUnit.MILLISECONDS); details.setTimeout(timeout); return; } @@ -741,11 +753,6 @@ public class CommandBatchService extends CommandAsyncService { } }; - long interval = connectionManager.getConfig().getRetryInterval(); - if (options.getRetryInterval() > 0) { - interval = options.getRetryInterval(); - } - Timeout timeout = connectionManager.newTimeout(retryTimerTask, interval, TimeUnit.MILLISECONDS); details.setTimeout(timeout); mainPromise.addListener(mainPromiseListener);