diff --git a/src/main/java/org/redisson/CommandBatchExecutorService.java b/src/main/java/org/redisson/CommandBatchExecutorService.java index 7e38772aa..42db00d07 100644 --- a/src/main/java/org/redisson/CommandBatchExecutorService.java +++ b/src/main/java/org/redisson/CommandBatchExecutorService.java @@ -49,6 +49,7 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.internal.PlatformDependent; @@ -183,7 +184,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { entries.addAll(e.getCommands()); } Collections.sort(entries); - List result = new ArrayList(); + List result = new ArrayList(entries.size()); for (CommandEntry commandEntry : entries) { result.add(commandEntry.getCommand().getPromise().getNow()); } @@ -225,7 +226,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { final TimerTask retryTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (attemptPromise.isDone()) { + if (attemptPromise.isDone() || mainPromise.isDone()) { return; } @@ -287,14 +288,23 @@ public class CommandBatchExecutorService extends CommandExecutorService { writeFutureRef.set(future); } else { List> list = new ArrayList>(entry.getCommands().size()); + FutureListener listener = new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess() && !mainPromise.isDone()) { + mainPromise.setFailure(future.cause()); + } + } + }; + for (CommandEntry c : entry.getCommands()) { + c.getCommand().getPromise().addListener(listener); list.add(c.getCommand()); } ChannelFuture future = connection.send(new CommandsData(attemptPromise, list)); writeFutureRef.set(future); } - writeFutureRef.get().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -331,7 +341,7 @@ public class CommandBatchExecutorService extends CommandExecutorService { @Override public void operationComplete(Future future) throws Exception { timeoutRef.get().cancel(); - if (future.isCancelled()) { + if (future.isCancelled() || mainPromise.isDone()) { return; } diff --git a/src/main/java/org/redisson/core/RBatch.java b/src/main/java/org/redisson/core/RBatch.java index 8a0c319b7..7a4272517 100644 --- a/src/main/java/org/redisson/core/RBatch.java +++ b/src/main/java/org/redisson/core/RBatch.java @@ -17,6 +17,7 @@ package org.redisson.core; import java.util.List; +import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; import io.netty.util.concurrent.Future; @@ -171,10 +172,10 @@ public interface RBatch { /** * Executes all operations accumulated during async methods invocations. * - * In cluster configurations operations grouped by slot ids - * so may be executed on different servers. Thus command execution order could be changed + * If cluster configuration used then operations are grouped by slot ids + * and may be executed on different servers. Thus command execution order could be changed * - * @return + * @return List with result object for each command */ List execute(); @@ -184,7 +185,7 @@ public interface RBatch { * In cluster configurations operations grouped by slot ids * so may be executed on different servers. Thus command execution order could be changed * - * @return + * @return List with result object for each command */ Future> executeAsync(); diff --git a/src/test/java/org/redisson/RedissonBatchTest.java b/src/test/java/org/redisson/RedissonBatchTest.java index 2413caf90..413cf68b2 100644 --- a/src/test/java/org/redisson/RedissonBatchTest.java +++ b/src/test/java/org/redisson/RedissonBatchTest.java @@ -6,9 +6,12 @@ import java.util.Map; import org.junit.Assert; import org.junit.Test; +import org.redisson.client.RedisException; import org.redisson.client.codec.StringCodec; +import org.redisson.core.RScript; import org.redisson.core.RBatch; import org.redisson.core.RListAsync; +import org.redisson.core.RScript.Mode; import io.netty.util.concurrent.Future; @@ -52,6 +55,14 @@ public class RedissonBatchTest extends BaseTest { Assert.assertEquals(210*5, res.size()); } + @Test(expected=RedisException.class) + public void testExceptionHandling() { + RBatch batch = redisson.createBatch(); + batch.getMap("test").putAsync("1", "2"); + batch.getScript().evalAsync(Mode.READ_WRITE, "wrong_code", RScript.ReturnType.VALUE); + batch.execute(); + } + @Test(expected=IllegalStateException.class) public void testTwice() { RBatch batch = redisson.createBatch();