RBatch.executeSkipResult method added #415

pull/653/head
Nikita 8 years ago
parent a43e599b1f
commit 1819d3dd19

@ -226,6 +226,16 @@ public class RedissonBatch implements RBatch {
return executorService.execute();
}
@Override
public void executeSkipResult() {
executorService.executeSkipResult();
}
@Override
public RFuture<Void> executeSkipResultAsync() {
return executorService.executeSkipResultAsync();
}
@Override
public RFuture<List<?>> executeAsync() {
return executorService.executeAsync();

@ -398,4 +398,29 @@ public interface RBatch {
*/
RFuture<List<?>> executeAsync();
/**
* Executes all operations accumulated during async methods invocations,
* but skip command replies
*
* If cluster configuration used then operations are grouped by slot ids
* and may be executed on different servers. Thus command execution order could be changed
*
* @return List with result object for each command
* @throws RedisException in case of any error
*
*/
void executeSkipResult();
/**
* Executes all operations accumulated during async methods invocations asynchronously,
* but skip command replies
*
* If cluster configuration used then operations are grouped by slot ids
* and may be executed on different servers. Thus command execution order could be changed
*
* @return List with result object for each command
* @throws RedisException in case of any error
*
*/
RFuture<Void> executeSkipResultAsync();
}

@ -199,7 +199,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
}
}
if (i == commandBatch.getCommands().size()) {
if (commandBatch.isNoResult() || i == commandBatch.getCommands().size()) {
RPromise<Void> promise = commandBatch.getPromise();
if (error != null) {
if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) {

@ -29,17 +29,27 @@ public class CommandsData implements QueueCommand {
private final List<CommandData<?, ?>> commands;
private final RPromise<Void> promise;
private final boolean noResult;
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands) {
this(promise, commands, false);
}
public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean noResult) {
super();
this.promise = promise;
this.commands = commands;
this.noResult = noResult;
}
public RPromise<Void> getPromise() {
return promise;
}
public boolean isNoResult() {
return noResult;
}
public List<CommandData<?, ?>> getCommands() {
return commands;
}

@ -85,6 +85,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> SETBIT = new RedisStrictCommand<Boolean>("SETBIT", new BitSetReplayConvertor());
RedisStrictCommand<Void> BITOP = new RedisStrictCommand<Void>("BITOP", new VoidReplayConvertor());
RedisStrictCommand<Void> CLIENT_REPLY = new RedisStrictCommand<Void>("CLIENT", "REPLY", new VoidReplayConvertor());
RedisStrictCommand<Void> ASKING = new RedisStrictCommand<Void>("ASKING", new VoidReplayConvertor());
RedisStrictCommand<Void> READONLY = new RedisStrictCommand<Void>("READONLY", new VoidReplayConvertor());

@ -16,14 +16,15 @@
package org.redisson.command;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedissonReference;
import org.redisson.api.RFuture;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
@ -44,6 +45,7 @@ import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@ -52,18 +54,16 @@ import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
import org.redisson.RedissonReference;
import org.redisson.misc.RedissonObjectFactory;
public class CommandBatchService extends CommandReactiveService {
public static class Entry {
Collection<BatchCommandData<?, ?>> commands = new ConcurrentLinkedQueue<BatchCommandData<?,?>>();
Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<BatchCommandData<?,?>>();
volatile boolean readOnlyMode = true;
public Collection<BatchCommandData<?, ?>> getCommands() {
public Deque<BatchCommandData<?, ?>> getCommands() {
return commands;
}
@ -116,7 +116,9 @@ public class CommandBatchService extends CommandReactiveService {
RedissonReference reference = redisson != null
? RedissonObjectFactory.toReference(redisson, params[i])
: RedissonObjectFactory.toReference(redissonReactive, params[i]);
params[i] = reference == null ? params[i] : reference;
if (reference != null) {
params[i] = reference;
}
}
}
BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codec, command, params, index.incrementAndGet());
@ -128,6 +130,10 @@ public class CommandBatchService extends CommandReactiveService {
}
public RFuture<Void> executeAsyncVoid() {
return executeAsyncVoid(false);
}
private RFuture<Void> executeAsyncVoid(boolean noResult) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
@ -135,6 +141,18 @@ public class CommandBatchService extends CommandReactiveService {
if (commands.isEmpty()) {
return connectionManager.newSucceededFuture(null);
}
if (noResult) {
for (Entry entry : commands.values()) {
RPromise<Object> s = connectionManager.newPromise();
BatchCommandData commandData = new BatchCommandData(s, null, RedisCommands.CLIENT_REPLY, new Object[] { "OFF" }, index.incrementAndGet());
entry.getCommands().addFirst(commandData);
RPromise<Object> s1 = connectionManager.newPromise();
BatchCommandData commandData1 = new BatchCommandData(s1, null, RedisCommands.CLIENT_REPLY, new Object[] { "ON" }, index.incrementAndGet());
entry.getCommands().add(commandData1);
}
}
executed = true;
RPromise<Void> voidPromise = connectionManager.newPromise();
@ -147,11 +165,19 @@ public class CommandBatchService extends CommandReactiveService {
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0);
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, true);
}
return voidPromise;
}
public void executeSkipResult() {
get(executeSkipResultAsync());
}
public RFuture<Void> executeSkipResultAsync() {
return executeAsyncVoid(true);
}
public RFuture<List<?>> executeAsync() {
if (executed) {
throw new IllegalStateException("Batch already executed!");
@ -173,7 +199,7 @@ public class CommandBatchService extends CommandReactiveService {
return;
}
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
List<BatchCommandData<?, ?>> entries = new ArrayList<BatchCommandData<?, ?>>();
for (Entry e : commands.values()) {
entries.addAll(e.getCommands());
}
@ -196,12 +222,12 @@ public class CommandBatchService extends CommandReactiveService {
AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0);
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0, false);
}
return promise;
}
public void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
private void execute(final Entry entry, final NodeSource source, final RPromise<Void> mainPromise, final AtomicInteger slots, final int attempt, final boolean noResult) {
if (mainPromise.isCancelled()) {
return;
}
@ -257,23 +283,19 @@ public class CommandBatchService extends CommandReactiveService {
}
int count = attempt + 1;
execute(entry, source, mainPromise, slots, count);
execute(entry, source, mainPromise, slots, count, noResult);
}
};
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
if (connectionFuture.isDone()) {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture);
} else {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture);
}
});
}
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(entry, source, mainPromise, attemptPromise, details, connectionFuture, noResult);
}
});
attemptPromise.addListener(new FutureListener<Void>() {
@Override
@ -286,18 +308,18 @@ public class CommandBatchService extends CommandReactiveService {
if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
entry.clearErrors();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt);
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.MOVED), mainPromise, slots, attempt, noResult);
return;
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
entry.clearErrors();
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt);
execute(entry, new NodeSource(ex.getSlot(), ex.getAddr(), Redirect.ASK), mainPromise, slots, attempt, noResult);
return;
}
if (future.cause() instanceof RedisLoadingException) {
entry.clearErrors();
execute(entry, source, mainPromise, slots, attempt);
execute(entry, source, mainPromise, slots, attempt, noResult);
return;
}
if (future.cause() instanceof RedisTryAgainException) {
@ -305,7 +327,7 @@ public class CommandBatchService extends CommandReactiveService {
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
execute(entry, source, mainPromise, slots, attempt);
execute(entry, source, mainPromise, slots, attempt, noResult);
}
}, 1, TimeUnit.SECONDS);
return;
@ -324,7 +346,7 @@ public class CommandBatchService extends CommandReactiveService {
}
private void checkWriteFuture(final RPromise<Void> attemptPromise, AsyncDetails details,
final RedisConnection connection, ChannelFuture future) {
final RedisConnection connection, ChannelFuture future, boolean noResult) {
if (attemptPromise.isDone() || future.isCancelled()) {
return;
}
@ -347,7 +369,7 @@ public class CommandBatchService extends CommandReactiveService {
private void checkConnectionFuture(final Entry entry, final NodeSource source,
final RPromise<Void> mainPromise, final RPromise<Void> attemptPromise, final AsyncDetails details,
RFuture<RedisConnection> connFuture) {
RFuture<RedisConnection> connFuture, boolean noResult) {
if (attemptPromise.isDone() || mainPromise.isCancelled() || connFuture.isCancelled()) {
return;
}
@ -372,19 +394,16 @@ public class CommandBatchService extends CommandReactiveService {
}
list.add(c);
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list, noResult));
details.setWriteFuture(future);
if (details.getWriteFuture().isDone()) {
checkWriteFuture(attemptPromise, details, connection, details.getWriteFuture());
} else {
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(attemptPromise, details, connection, future);
}
});
}
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(attemptPromise, details, connection, future, noResult);
}
});
releaseConnection(source, connFuture, entry.isReadOnlyMode(), attemptPromise, details);
}

@ -9,6 +9,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.assertj.core.api.Assertions.*;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RBatch;
@ -39,6 +40,20 @@ public class RedissonBatchTest extends BaseTest {
System.out.println(t);
}
@Test
public void testSkipResult() {
RBatch batch = redisson.createBatch();
batch.getBucket("A1").setAsync("001");
batch.getBucket("A2").setAsync("001");
batch.getBucket("A3").setAsync("001");
batch.getKeys().deleteAsync("A1");
batch.getKeys().deleteAsync("A2");
batch.executeSkipResult();
assertThat(redisson.getBucket("A1").isExists()).isFalse();
assertThat(redisson.getBucket("A3").isExists()).isTrue();
}
@Test
public void testBatchNPE() {
RBatch batch = redisson.createBatch();

Loading…
Cancel
Save