|
|
|
@ -275,7 +275,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
throw new IllegalStateException("Batch already executed!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (commands.isEmpty()) {
|
|
|
|
|
if (commands.isEmpty() && nestedServices.isEmpty()) {
|
|
|
|
|
executed.set(true);
|
|
|
|
|
BatchResult<Object> result = new BatchResult<>(Collections.emptyList(), 0);
|
|
|
|
|
return new CompletableFutureWrapper<>(result);
|
|
|
|
@ -324,38 +324,46 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
|
|
|
|
|
for (Entry e : res.values()) {
|
|
|
|
|
entries.addAll(e.getCommands());
|
|
|
|
|
}
|
|
|
|
|
Collections.sort(entries);
|
|
|
|
|
List<Object> responses = new ArrayList<Object>(entries.size());
|
|
|
|
|
List<Object> responses = new ArrayList<>();
|
|
|
|
|
int syncedSlaves = 0;
|
|
|
|
|
for (BatchCommandData<?, ?> commandEntry : entries) {
|
|
|
|
|
if (isWaitCommand(commandEntry)) {
|
|
|
|
|
if (commandEntry.getCommand().getName().equals(RedisCommands.WAIT.getName())) {
|
|
|
|
|
syncedSlaves += ((CompletableFuture<Integer>) commandEntry.getPromise()).getNow(0);
|
|
|
|
|
} else {
|
|
|
|
|
List<Integer> list = ((CompletableFuture<List<Integer>>) commandEntry.getPromise()).getNow(Arrays.asList(0, 0));
|
|
|
|
|
syncedSlaves += list.get(1);
|
|
|
|
|
}
|
|
|
|
|
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
|
|
|
|
|
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())
|
|
|
|
|
&& !this.options.isSkipResult()) {
|
|
|
|
|
|
|
|
|
|
if (commandEntry.getPromise().isCancelled()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if (!res.isEmpty()) {
|
|
|
|
|
List<BatchCommandData> entries = new ArrayList<BatchCommandData>();
|
|
|
|
|
for (Entry e : res.values()) {
|
|
|
|
|
entries.addAll(e.getCommands());
|
|
|
|
|
}
|
|
|
|
|
Collections.sort(entries);
|
|
|
|
|
for (BatchCommandData<?, ?> commandEntry : entries) {
|
|
|
|
|
if (isWaitCommand(commandEntry)) {
|
|
|
|
|
if (commandEntry.getCommand().getName().equals(RedisCommands.WAIT.getName())) {
|
|
|
|
|
syncedSlaves += ((CompletableFuture<Integer>) commandEntry.getPromise()).getNow(0);
|
|
|
|
|
} else {
|
|
|
|
|
List<Integer> list = ((CompletableFuture<List<Integer>>) commandEntry.getPromise()).getNow(Arrays.asList(0, 0));
|
|
|
|
|
syncedSlaves += list.get(1);
|
|
|
|
|
}
|
|
|
|
|
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
|
|
|
|
|
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())
|
|
|
|
|
&& !this.options.isSkipResult()) {
|
|
|
|
|
|
|
|
|
|
Object entryResult = commandEntry.getPromise().getNow(null);
|
|
|
|
|
try {
|
|
|
|
|
if (objectBuilder != null) {
|
|
|
|
|
entryResult = objectBuilder.tryHandleReference(entryResult, referenceType);
|
|
|
|
|
if (commandEntry.getPromise().isCancelled()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
} catch (ReflectiveOperationException exc) {
|
|
|
|
|
log.error("Unable to handle reference from {}", entryResult, exc);
|
|
|
|
|
|
|
|
|
|
Object entryResult = commandEntry.getPromise().getNow(null);
|
|
|
|
|
try {
|
|
|
|
|
if (objectBuilder != null) {
|
|
|
|
|
entryResult = objectBuilder.tryHandleReference(entryResult, referenceType);
|
|
|
|
|
}
|
|
|
|
|
} catch (ReflectiveOperationException exc) {
|
|
|
|
|
log.error("Unable to handle reference from {}", entryResult, exc);
|
|
|
|
|
}
|
|
|
|
|
responses.add(entryResult);
|
|
|
|
|
}
|
|
|
|
|
responses.add(entryResult);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (!nestedServices.isEmpty()) {
|
|
|
|
|
for (CompletableFuture<?> f : nestedServices.keySet()) {
|
|
|
|
|
responses.add(f.getNow(null));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|