From 8c59eca2791b32c0cf7070e599ba9061bef84009 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 18 Nov 2015 16:48:46 +0300 Subject: [PATCH] canceled command future checking --- .../org/redisson/CommandBatchExecutorService.java | 12 ++++++++---- .../java/org/redisson/CommandExecutorService.java | 12 ++++++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 73186d90c..8a07aec7c 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -198,6 +198,10 @@ public class CommandBatchExecutorService extends CommandExecutorService { } public void execute(final Entry entry, final NodeSource source, final Promise mainPromise, final AtomicInteger slots, final int attempt) { + if (mainPromise.isCancelled()) { + return; + } + if (!connectionManager.getShutdownLatch().acquire()) { mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); return; @@ -220,7 +224,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { connectionManager.getShutdownLatch().release(); } - if (attemptPromise.isDone()) { + if (attemptPromise.isDone() || mainPromise.isCancelled()) { return; } @@ -243,7 +247,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isDone() || connFuture.isCancelled()) { + if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) { return; } if (!connFuture.isSuccess()) { @@ -266,7 +270,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isDone() || future.isCancelled()) { + if (attemptPromise.isDone() || future.isCancelled() || mainPromise.isCancelled()) { return; } @@ -290,7 +294,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { @Override public void operationComplete(Future future) throws Exception { timeout.cancel(); - if (future.isCancelled()) { + if (future.isCancelled() || mainPromise.isCancelled()) { return; } diff --git a/src/main/java/org/redisson/CommandExecutorService.java b/src/main/java/org/redisson/CommandExecutorService.java index 58757323b..2a895e2b0 100644 --- a/src/main/java/org/redisson/CommandExecutorService.java +++ b/src/main/java/org/redisson/CommandExecutorService.java @@ -414,6 +414,10 @@ public class CommandExecutorService implements CommandExecutor { protected void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder messageDecoder, final Codec codec, final RedisCommand command, final Object[] params, final Promise mainPromise, final int attempt) { + if (mainPromise.isCancelled()) { + return; + } + if (!connectionManager.getShutdownLatch().acquire()) { mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); return; @@ -436,7 +440,7 @@ public class CommandExecutorService implements CommandExecutor { connectionManager.getShutdownLatch().release(); } - if (attemptPromise.isDone()) { + if (attemptPromise.isDone() || mainPromise.isCancelled()) { return; } @@ -459,7 +463,7 @@ public class CommandExecutorService implements CommandExecutor { connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future connFuture) throws Exception { - if (attemptPromise.isDone() || connFuture.isCancelled()) { + if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) { return; } if (!connFuture.isSuccess()) { @@ -487,7 +491,7 @@ public class CommandExecutorService implements CommandExecutor { writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isDone() || future.isCancelled()) { + if (attemptPromise.isDone() || future.isCancelled() || mainPromise.isCancelled()) { return; } if (!future.isSuccess()) { @@ -511,7 +515,7 @@ public class CommandExecutorService implements CommandExecutor { @Override public void operationComplete(Future future) throws Exception { timeout.cancel(); - if (future.isCancelled()) { + if (future.isCancelled() || mainPromise.isCancelled()) { return; }