timeout improvements. #262

pull/297/head
Nikita 9 years ago
parent f3a3a4a82d
commit 5a236826e7

@ -225,23 +225,19 @@ public class CommandBatchExecutorService extends CommandExecutorService {
final TimerTask retryTimerTask = new TimerTask() { final TimerTask retryTimerTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
if (connectionFuture.cancel(false)) { if (attemptPromise.isDone()) {
connectionManager.getShutdownLatch().release();
}
if ((writeFutureRef.get() == null || !writeFutureRef.get().isDone())
&& connectionFuture.isSuccess()) {
Timeout newTimeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
timeoutRef.set(newTimeout);
return; return;
} }
if (writeFutureRef.get() != null && writeFutureRef.get().isSuccess()) { if (connectionFuture.cancel(false)) {
connectionManager.getShutdownLatch().release();
} else {
if (connectionFuture.isSuccess()) {
ChannelFuture writeFuture = writeFutureRef.get();
if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) {
return; return;
} }
}
if (attemptPromise.isDone()) {
return;
} }
if (mainPromise.isCancelled()) { if (mainPromise.isCancelled()) {
@ -269,7 +265,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception { public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) { if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) {
return; return;
} }
@ -302,18 +298,19 @@ public class CommandBatchExecutorService extends CommandExecutorService {
writeFutureRef.get().addListener(new ChannelFutureListener() { writeFutureRef.get().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || mainPromise.isCancelled()) { if (attemptPromise.isDone() || future.isCancelled()) {
return; return;
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {
exceptionRef.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause())); exceptionRef.set(new WriteRedisConnectionException("Can't write command batch to channel: " + future.channel(), future.cause()));
} else { } else {
timeoutRef.get().cancel(); timeoutRef.get().cancel();
TimerTask timeoutTask = new TimerTask() { TimerTask timeoutTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure( attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout during batch command execution. Channel: " + connection.getChannel())); new RedisTimeoutException("Redis server response timeout during command batch execution. Channel: " + connection.getChannel()));
} }
}; };
Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
@ -334,7 +331,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
timeoutRef.get().cancel(); timeoutRef.get().cancel();
if (future.isCancelled() || mainPromise.isCancelled()) { if (future.isCancelled()) {
return; return;
} }

@ -439,24 +439,20 @@ public class CommandExecutorService implements CommandExecutor {
final TimerTask retryTimerTask = new TimerTask() { final TimerTask retryTimerTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout t) throws Exception {
if (connectionFuture.cancel(false)) { if (attemptPromise.isDone()) {
connectionManager.getShutdownLatch().release();
}
if ((writeFutureRef.get() == null || !writeFutureRef.get().isDone())
&& connectionFuture.isSuccess()) {
Timeout newTimeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
timeoutRef.set(newTimeout);
return; return;
} }
if (writeFutureRef.get() != null && writeFutureRef.get().isSuccess()) { if (connectionFuture.cancel(false)) {
connectionManager.getShutdownLatch().release();
} else {
if (connectionFuture.isSuccess()) {
ChannelFuture writeFuture = writeFutureRef.get();
if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) {
return; return;
} }
}
if (attemptPromise.isDone()) {
return;
} }
if (mainPromise.isCancelled()) { if (mainPromise.isCancelled()) {
@ -484,7 +480,7 @@ public class CommandExecutorService implements CommandExecutor {
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception { public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled() || timeoutRef.get().isExpired()) { if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) {
return; return;
} }
@ -512,9 +508,10 @@ public class CommandExecutorService implements CommandExecutor {
writeFutureRef.get().addListener(new ChannelFutureListener() { writeFutureRef.get().addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || mainPromise.isCancelled()) { if (attemptPromise.isDone() || future.isCancelled()) {
return; return;
} }
if (!future.isSuccess()) { if (!future.isSuccess()) {
exceptionRef.set(new WriteRedisConnectionException( exceptionRef.set(new WriteRedisConnectionException(
"Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause())); "Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause()));
@ -551,7 +548,7 @@ public class CommandExecutorService implements CommandExecutor {
@Override @Override
public void operationComplete(Future<R> future) throws Exception { public void operationComplete(Future<R> future) throws Exception {
timeoutRef.get().cancel(); timeoutRef.get().cancel();
if (future.isCancelled() || mainPromise.isCancelled()) { if (future.isCancelled()) {
return; return;
} }

Loading…
Cancel
Save