refactoring

pull/5399/head
Nikita Koksharov 1 year ago
parent 187978f8d1
commit 1133778df0

@ -621,6 +621,7 @@ public class CommandBatchService extends CommandAsyncService {
}
}
@SuppressWarnings("MethodLength")
private <R> RFuture<R> executeRedisBasedQueue() {
CompletableFuture<R> resultPromise = new CompletableFuture<R>();
long responseTimeout;
@ -677,86 +678,129 @@ public class CommandBatchService extends CommandAsyncService {
resultPromise.completeExceptionally(ee);
return;
}
aggregatedCommands = map;
List<CompletableFuture<Void>> futures = new ArrayList<>(map.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : aggregatedCommands.entrySet()) {
boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;
CompletableFuture<List<Object>> execPromise = createPromise();
RedisExecutor<List<Object>, List<Object>> executor = new RedisQueuedBatchExecutor<>(isReadOnly, new NodeSource(entry.getKey()), codec,
RedisCommands.EXEC, new Object[] {}, execPromise,
false, connectionManager, objectBuilder, commands, connections,
options, index, executed, referenceType, false, aggregatedCommands);
executor.execute();
CompletionStage<Void> f = execPromise.thenCompose(r -> {
BatchCommandData<?, ?> lastCommand = entry.getValue().getCommands().peekLast();
result.put(entry.getKey(), r);
if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) {
return lastCommand.getPromise().thenApply(i -> null);
AtomicInteger slots = new AtomicInteger(nestedServices.size());
CompletableFuture<Void> nestedServicesFuture;
if (nestedServices.isEmpty()) {
nestedServicesFuture = CompletableFuture.completedFuture(null);
} else {
nestedServicesFuture = new CompletableFuture<>();
}
for (Map.Entry<CompletableFuture<?>, List<CommandBatchService>> entry : nestedServices.entrySet()) {
for (CommandBatchService service : entry.getValue()) {
service.executeAsync();
}
entry.getKey().whenComplete((res, e) -> {
if (e == null) {
if (slots.decrementAndGet() == 0) {
nestedServicesFuture.complete(null);
}
} else {
if (entry.getKey().isCancelled()) {
nestedServicesFuture.completeExceptionally(e);
} else {
nestedServicesFuture.completeExceptionally(e.getCause());
}
}
return CompletableFuture.completedFuture(null);
});
futures.add(f.toCompletableFuture());
}
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future.whenComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
resultPromise.completeExceptionally(ex);
nestedServicesFuture.whenComplete((r1, exc2) -> {
if (exc2 != null) {
resultPromise.completeExceptionally(exc2);
return;
}
try {
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : result.entrySet()) {
Entry commandEntry = aggregatedCommands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
break;
}
aggregatedCommands = map;
CompletableFuture<Object> promise = (CompletableFuture<Object>) data.getPromise();
if (resultIter.hasNext()) {
promise.complete(resultIter.next());
} else {
// fix for https://github.com/redisson/redisson/issues/2212
promise.complete(null);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(map.size());
for (Map.Entry<MasterSlaveEntry, Entry> entry : aggregatedCommands.entrySet()) {
boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;
CompletableFuture<List<Object>> execPromise = createPromise();
RedisExecutor<List<Object>, List<Object>> executor = new RedisQueuedBatchExecutor<>(isReadOnly, new NodeSource(entry.getKey()), codec,
RedisCommands.EXEC, new Object[] {}, execPromise,
false, connectionManager, objectBuilder, commands, connections,
options, index, executed, referenceType, false, aggregatedCommands);
executor.execute();
CompletionStage<Void> f = execPromise.thenCompose(r -> {
BatchCommandData<?, ?> lastCommand = entry.getValue().getCommands().peekLast();
result.put(entry.getKey(), r);
if (RedisCommands.WAIT.getName().equals(lastCommand.getCommand().getName())) {
return lastCommand.getPromise().thenApply(i -> null);
}
}
return CompletableFuture.completedFuture(null);
});
futures.add(f.toCompletableFuture());
}
List<BatchCommandData> entries = new ArrayList<>();
for (Entry e : aggregatedCommands.values()) {
entries.addAll(e.getCommands());
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
future.whenComplete((res, ex) -> {
executed.set(true);
if (ex != null) {
resultPromise.completeExceptionally(ex);
return;
}
Collections.sort(entries);
List<Object> responses = new ArrayList<>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
if (commandEntry.getCommand().getName().equals(RedisCommands.WAIT.getName())) {
syncedSlaves += ((CompletableFuture<Integer>) commandEntry.getPromise()).getNow(0);
} else {
List<Integer> list = ((CompletableFuture<List<Integer>>) commandEntry.getPromise()).getNow(Arrays.asList(0, 0));
syncedSlaves += list.get(1);
try {
for (java.util.Map.Entry<MasterSlaveEntry, List<Object>> entry : result.entrySet()) {
Entry commandEntry = aggregatedCommands.get(entry.getKey());
Iterator<Object> resultIter = entry.getValue().iterator();
for (BatchCommandData<?, ?> data : commandEntry.getCommands()) {
if (data.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
break;
}
CompletableFuture<Object> promise = (CompletableFuture<Object>) data.getPromise();
if (resultIter.hasNext()) {
promise.complete(resultIter.next());
} else {
// fix for https://github.com/redisson/redisson/issues/2212
promise.complete(null);
}
}
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow(null);
if (objectBuilder != null) {
entryResult = objectBuilder.tryHandleReference(entryResult, referenceType);
}
List<BatchCommandData> entries = new ArrayList<>();
for (Entry e : aggregatedCommands.values()) {
entries.addAll(e.getCommands());
}
Collections.sort(entries);
List<Object> responses = new ArrayList<>(entries.size());
int syncedSlaves = 0;
for (BatchCommandData<?, ?> commandEntry : entries) {
if (isWaitCommand(commandEntry)) {
if (commandEntry.getCommand().getName().equals(RedisCommands.WAIT.getName())) {
syncedSlaves += ((CompletableFuture<Integer>) commandEntry.getPromise()).getNow(0);
} else {
List<Integer> list = ((CompletableFuture<List<Integer>>) commandEntry.getPromise()).getNow(Arrays.asList(0, 0));
syncedSlaves += list.get(1);
}
} else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName())
&& !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName())) {
Object entryResult = commandEntry.getPromise().getNow(null);
if (objectBuilder != null) {
entryResult = objectBuilder.tryHandleReference(entryResult, referenceType);
}
responses.add(entryResult);
}
}
if (!nestedServices.isEmpty()) {
for (CompletableFuture<?> f : nestedServices.keySet()) {
responses.add(f.getNow(null));
}
responses.add(entryResult);
}
BatchResult<Object> r = new BatchResult<>(responses, syncedSlaves);
resultPromise.complete((R) r);
} catch (Exception e) {
resultPromise.completeExceptionally(e);
}
BatchResult<Object> r = new BatchResult<>(responses, syncedSlaves);
resultPromise.complete((R) r);
} catch (Exception e) {
resultPromise.completeExceptionally(e);
}
});
});
});
});

@ -20,6 +20,7 @@ import java.util.NoSuchElementException;
import org.redisson.ScanResult;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisNodeNotFoundException;
/**
*
@ -38,6 +39,9 @@ public abstract class BaseIterator<V, E> implements Iterator<V> {
private boolean currentElementRemoved;
protected E value;
protected void reset() {
}
@Override
public boolean hasNext() {
if (lastIter == null || !lastIter.hasNext()) {
@ -52,7 +56,17 @@ public abstract class BaseIterator<V, E> implements Iterator<V> {
finished = false;
}
do {
ScanResult<E> res = iterator(client, nextIterPos);
ScanResult<E> res;
try {
res = iterator(client, nextIterPos);
} catch (RedisNodeNotFoundException e) {
if (client != null) {
client = null;
nextIterPos = 0;
}
reset();
res = iterator(client, nextIterPos);
}
client = res.getRedisClient();

Loading…
Cancel
Save