From 853a0c2803c16df30e2d398b81fe297ca67a6637 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 20 Nov 2015 14:38:03 +0300 Subject: [PATCH] CommandBatchExecutorService timeout handling improvements --- .../redisson/CommandBatchExecutorService.java | 70 ++++++++++++++----- 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 3ac09296e..e68e85118 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -32,9 +32,11 @@ import org.redisson.client.RedisMovedException; import org.redisson.client.RedisTimeoutException; import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.RedisCommand; +import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.connection.ConnectionManager; import org.redisson.connection.NodeSource; @@ -207,7 +209,10 @@ public class CommandBatchExecutorService extends CommandExecutorService { } final Promise attemptPromise = connectionManager.newPromise(); - final AtomicReference ex = new AtomicReference(); + + final AtomicReference writeFutureRef = new AtomicReference(); + final AtomicReference exceptionRef = new AtomicReference(); + final AtomicReference timeoutRef = new AtomicReference(); final Future connectionFuture; if (entry.isReadOnlyMode()) { @@ -223,6 +228,17 @@ public class CommandBatchExecutorService extends CommandExecutorService { connectionManager.getShutdownLatch().release(); } + if ((writeFutureRef.get() == null || !writeFutureRef.get().isDone()) + && connectionFuture.isSuccess()) { + Timeout newTimeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); + timeoutRef.set(newTimeout); + return; + } + + if (writeFutureRef.get() != null && writeFutureRef.get().isSuccess()) { + return; + } + if (attemptPromise.isDone()) { return; } @@ -233,7 +249,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { } if (attempt == connectionManager.getConfig().getRetryAttempts()) { - attemptPromise.tryFailure(ex.get()); + attemptPromise.tryFailure(exceptionRef.get()); return; } if (!attemptPromise.cancel(false)) { @@ -245,8 +261,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { } }; - ex.set(new RedisTimeoutException("Batch command execution timeout")); - final AtomicReference timeoutRef = new AtomicReference(); + exceptionRef.set(new RedisTimeoutException("Batch command execution timeout")); Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); timeoutRef.set(timeout); @@ -256,35 +271,52 @@ public class CommandBatchExecutorService extends CommandExecutorService { if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) { return; } + if (!connFuture.isSuccess()) { - ex.set(convertException(connFuture)); - if (timeoutRef.get().cancel()) { - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - } + exceptionRef.set(convertException(connFuture)); return; } RedisConnection connection = connFuture.getNow(); - List> list = new ArrayList>(entry.getCommands().size()); - for (CommandEntry c : entry.getCommands()) { - list.add(c.getCommand()); + + if (source.getRedirect() == Redirect.ASK) { + List> list = new ArrayList>(entry.getCommands().size() + 1); + Promise promise = connectionManager.newPromise(); + list.add(new CommandData(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); + for (CommandEntry c : entry.getCommands()) { + list.add(c.getCommand()); + } + ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); + writeFutureRef.set(future); + } else { + List> list = new ArrayList>(entry.getCommands().size()); + for (CommandEntry c : entry.getCommands()) { + list.add(c.getCommand()); + } + ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); + writeFutureRef.set(future); } - ChannelFuture writeFuture = connection.send(new CommandsData(attemptPromise, list)); - writeFuture.addListener(new ChannelFutureListener() { + writeFutureRef.get().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (attemptPromise.isDone() || future.isCancelled() || mainPromise.isCancelled()) { + if (attemptPromise.isDone() || mainPromise.isCancelled()) { return; } - if (!future.isSuccess()) { - ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause())); - if (timeoutRef.get().cancel()) { - connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); - } + exceptionRef.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause())); + } else { + timeoutRef.get().cancel(); + TimerTask timeoutTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + attemptPromise.tryFailure(exceptionRef.get()); + } + }; + Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS); + timeoutRef.set(timeout); } } });