RBatch.execute and RBatch.executeAsync errors handling fixed. #293

pull/297/head
Nikita 9 years ago
parent 5a236826e7
commit e26beab46f

@ -49,6 +49,7 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
@ -183,7 +184,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> result = new ArrayList<Object>();
List<Object> result = new ArrayList<Object>(entries.size());
for (CommandEntry commandEntry : entries) {
result.add(commandEntry.getCommand().getPromise().getNow());
}
@ -225,7 +226,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) {
if (attemptPromise.isDone() || mainPromise.isDone()) {
return;
}
@ -287,14 +288,23 @@ public class CommandBatchExecutorService extends CommandExecutorService {
writeFutureRef.set(future);
} else {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
FutureListener<Object> listener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess() && !mainPromise.isDone()) {
mainPromise.setFailure(future.cause());
}
}
};
for (CommandEntry c : entry.getCommands()) {
c.getCommand().getPromise().addListener(listener);
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
writeFutureRef.set(future);
}
writeFutureRef.get().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -331,7 +341,7 @@ public class CommandBatchExecutorService extends CommandExecutorService {
@Override
public void operationComplete(Future<Void> future) throws Exception {
timeoutRef.get().cancel();
if (future.isCancelled()) {
if (future.isCancelled() || mainPromise.isDone()) {
return;
}

@ -17,6 +17,7 @@ package org.redisson.core;
import java.util.List;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import io.netty.util.concurrent.Future;
@ -171,10 +172,10 @@ public interface RBatch {
/**
* Executes all operations accumulated during async methods invocations.
*
* In cluster configurations operations grouped by slot ids
* so may be executed on different servers. Thus command execution order could be changed
* If cluster configuration used then operations are grouped by slot ids
* and may be executed on different servers. Thus command execution order could be changed
*
* @return
* @return List with result object for each command
*/
List<?> execute();
@ -184,7 +185,7 @@ public interface RBatch {
* In cluster configurations operations grouped by slot ids
* so may be executed on different servers. Thus command execution order could be changed
*
* @return
* @return List with result object for each command
*/
Future<List<?>> executeAsync();

@ -6,9 +6,12 @@ import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.core.RScript;
import org.redisson.core.RBatch;
import org.redisson.core.RListAsync;
import org.redisson.core.RScript.Mode;
import io.netty.util.concurrent.Future;
@ -52,6 +55,14 @@ public class RedissonBatchTest extends BaseTest {
Assert.assertEquals(210*5, res.size());
}
@Test(expected=RedisException.class)
public void testExceptionHandling() {
RBatch batch = redisson.createBatch();
batch.getMap("test").putAsync("1", "2");
batch.getScript().evalAsync(Mode.READ_WRITE, "wrong_code", RScript.ReturnType.VALUE);
batch.execute();
}
@Test(expected=IllegalStateException.class)
public void testTwice() {
RBatch batch = redisson.createBatch();

Loading…
Cancel
Save