|
|
|
@ -154,9 +154,6 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
@Override
|
|
|
|
|
public <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,
|
|
|
|
|
Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, int attempt, boolean ignoreRedirect) {
|
|
|
|
|
if (executed.get()) {
|
|
|
|
|
throw new IllegalStateException("Batch already has been executed!");
|
|
|
|
|
}
|
|
|
|
|
if (nodeSource.getEntry() != null) {
|
|
|
|
|
Entry entry = commands.get(nodeSource.getEntry());
|
|
|
|
|
if (entry == null) {
|
|
|
|
@ -471,11 +468,10 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
executed.set(true);
|
|
|
|
|
|
|
|
|
|
mainPromise.addListener(new FutureListener<Map<MasterSlaveEntry, List<Object>>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Map<MasterSlaveEntry, List<Object>>> future) throws Exception {
|
|
|
|
|
executed.set(true);
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
resultPromise.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
@ -525,8 +521,6 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
return resultPromise;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
executed.set(true);
|
|
|
|
|
|
|
|
|
|
if (this.options.getExecutionMode() != ExecutionMode.IN_MEMORY) {
|
|
|
|
|
for (Entry entry : commands.values()) {
|
|
|
|
|
BatchCommandData<?, ?> multiCommand = new BatchCommandData(RedisCommands.MULTI, new Object[] {}, index.incrementAndGet());
|
|
|
|
@ -560,6 +554,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
// commands = null;
|
|
|
|
|
executed.set(true);
|
|
|
|
|
nestedServices.clear();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
@ -569,6 +564,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
voidPromise.addListener(new FutureListener<Void>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<Void> future) throws Exception {
|
|
|
|
|
executed.set(true);
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
promise.tryFailure(future.cause());
|
|
|
|
|
commands = null;
|
|
|
|
|