canceled command future checking

pull/297/head
Nikita 9 years ago
parent ea22f59060
commit 8c59eca279

@ -198,6 +198,10 @@ public class CommandBatchExecutorService extends CommandExecutorService {
}
public void execute(final Entry entry, final NodeSource source, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
if (mainPromise.isCancelled()) {
return;
}
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return;
@ -220,7 +224,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
connectionManager.getShutdownLatch().release();
}
if (attemptPromise.isDone()) {
if (attemptPromise.isDone() || mainPromise.isCancelled()) {
return;
}
@ -243,7 +247,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isDone() || connFuture.isCancelled()) {
if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) {
return;
}
if (!connFuture.isSuccess()) {
@ -266,7 +270,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || future.isCancelled()) {
if (attemptPromise.isDone() || future.isCancelled() || mainPromise.isCancelled()) {
return;
}
@ -290,7 +294,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
@Override
public void operationComplete(Future<Void> future) throws Exception {
timeout.cancel();
if (future.isCancelled()) {
if (future.isCancelled() || mainPromise.isCancelled()) {
return;
}

@ -414,6 +414,10 @@ public class CommandExecutorService implements CommandExecutor {
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final int attempt) {
if (mainPromise.isCancelled()) {
return;
}
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return;
@ -436,7 +440,7 @@ public class CommandExecutorService implements CommandExecutor {
connectionManager.getShutdownLatch().release();
}
if (attemptPromise.isDone()) {
if (attemptPromise.isDone() || mainPromise.isCancelled()) {
return;
}
@ -459,7 +463,7 @@ public class CommandExecutorService implements CommandExecutor {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isDone() || connFuture.isCancelled()) {
if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) {
return;
}
if (!connFuture.isSuccess()) {
@ -487,7 +491,7 @@ public class CommandExecutorService implements CommandExecutor {
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || future.isCancelled()) {
if (attemptPromise.isDone() || future.isCancelled() || mainPromise.isCancelled()) {
return;
}
if (!future.isSuccess()) {
@ -511,7 +515,7 @@ public class CommandExecutorService implements CommandExecutor {
@Override
public void operationComplete(Future<R> future) throws Exception {
timeout.cancel();
if (future.isCancelled()) {
if (future.isCancelled() || mainPromise.isCancelled()) {
return;
}

Loading…
Cancel
Save