diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 04e36f219..7e38772aa 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -225,23 +225,19 @@ public class CommandBatchExecutorService extends CommandExecutorService { final TimerTask retryTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (connectionFuture.cancel(false)) { - connectionManager.getShutdownLatch().release(); - } - - if ((writeFutureRef.get() == null || !writeFutureRef.get().isDone()) - && connectionFuture.isSuccess()) { - Timeout newTimeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - timeoutRef.set(newTimeout); - return; - } - - if (writeFutureRef.get() != null && writeFutureRef.get().isSuccess()) { + if (attemptPromise.isDone()) { return; } - if (attemptPromise.isDone()) { - return; + if (connectionFuture.cancel(false)) { + connectionManager.getShutdownLatch().release(); + } else { + if (connectionFuture.isSuccess()) { + ChannelFuture writeFuture = writeFutureRef.get(); + if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) { + return; + } + } } if (mainPromise.isCancelled()) { @@ -269,7 +265,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) { + if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { return; } @@ -302,18 +298,19 @@ public class CommandBatchExecutorService extends CommandExecutorService { writeFutureRef.get().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isDone() || mainPromise.isCancelled()) { + if (attemptPromise.isDone() || future.isCancelled()) { return; } + if (!future.isSuccess()) { - exceptionRef.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause())); + exceptionRef.set(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause())); } else { timeoutRef.get().cancel(); TimerTask timeoutTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { attemptPromise.tryFailure( - new RedisTimeoutException("Redis server response timeout during batch command execution. Channel: " + connection.getChannel())); + new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel())); } }; Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); @@ -334,7 +331,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { @Override public void operationComplete(Future future) throws Exception { timeoutRef.get().cancel(); - if (future.isCancelled() || mainPromise.isCancelled()) { + if (future.isCancelled()) { return; } diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 5d06afd83..bbc8a5caf 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -439,24 +439,20 @@ public class CommandExecutorService implements CommandExecutor { final TimerTask retryTimerTask = new TimerTask() { @Override - public void run(Timeout timeout) throws Exception { - if (connectionFuture.cancel(false)) { - connectionManager.getShutdownLatch().release(); - } - - if ((writeFutureRef.get() == null || !writeFutureRef.get().isDone()) - && connectionFuture.isSuccess()) { - Timeout newTimeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - timeoutRef.set(newTimeout); - return; - } - - if (writeFutureRef.get() != null && writeFutureRef.get().isSuccess()) { + public void run(Timeout t) throws Exception { + if (attemptPromise.isDone()) { return; } - if (attemptPromise.isDone()) { - return; + if (connectionFuture.cancel(false)) { + connectionManager.getShutdownLatch().release(); + } else { + if (connectionFuture.isSuccess()) { + ChannelFuture writeFuture = writeFutureRef.get(); + if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) { + return; + } + } } if (mainPromise.isCancelled()) { @@ -484,7 +480,7 @@ public class CommandExecutorService implements CommandExecutor { connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled() || timeoutRef.get().isExpired()) { + if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) { return; } @@ -512,9 +508,10 @@ public class CommandExecutorService implements CommandExecutor { writeFutureRef.get().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isDone() || mainPromise.isCancelled()) { + if (attemptPromise.isDone() || future.isCancelled()) { return; } + if (!future.isSuccess()) { exceptionRef.set(new WriteRedisConnectionException( "Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause())); @@ -551,7 +548,7 @@ public class CommandExecutorService implements CommandExecutor { @Override public void operationComplete(Future future) throws Exception { timeoutRef.get().cancel(); - if (future.isCancelled() || mainPromise.isCancelled()) { + if (future.isCancelled()) { return; }