|
|
|
@ -109,7 +109,7 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
private final AtomicInteger index = new AtomicInteger();
|
|
|
|
|
|
|
|
|
|
private final ConcurrentMap<NodeSource, Entry> commands = new ConcurrentHashMap<>();
|
|
|
|
|
private Map<MasterSlaveEntry, Entry> aggregatedCommands;
|
|
|
|
|
private Map<MasterSlaveEntry, Entry> aggregatedCommands = Collections.emptyMap();
|
|
|
|
|
private final ConcurrentMap<MasterSlaveEntry, ConnectionEntry> connections = new ConcurrentHashMap<>();
|
|
|
|
|
|
|
|
|
|
private final BatchOptions options;
|
|
|
|
@ -379,18 +379,27 @@ public class CommandBatchService extends CommandAsyncService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void resolveCommands(AtomicInteger attempt, CompletableFuture<Map<MasterSlaveEntry, Entry>> future) {
|
|
|
|
|
long retryInterval = this.options.getRetryInterval();
|
|
|
|
|
if (retryInterval == 0) {
|
|
|
|
|
retryInterval = connectionManager.getServiceManager().getConfig().getRetryInterval();
|
|
|
|
|
}
|
|
|
|
|
long retryAttempts = this.options.getRetryInterval();
|
|
|
|
|
if (retryAttempts == 0) {
|
|
|
|
|
retryAttempts = connectionManager.getServiceManager().getConfig().getRetryAttempts();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Map<MasterSlaveEntry, Entry> result = new HashMap<>();
|
|
|
|
|
for (Map.Entry<NodeSource, Entry> e : commands.entrySet()) {
|
|
|
|
|
MasterSlaveEntry entry = getEntry(e.getKey());
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
if (attempt.incrementAndGet() == this.options.getRetryAttempts() + 1) {
|
|
|
|
|
if (attempt.incrementAndGet() == retryAttempts + 1) {
|
|
|
|
|
future.completeExceptionally(connectionManager.getServiceManager().createNodeNotFoundException(e.getKey()));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
connectionManager.getServiceManager().newTimeout(task -> {
|
|
|
|
|
resolveCommands(attempt, future);
|
|
|
|
|
}, this.options.getRetryInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
|
}, retryInterval, TimeUnit.MILLISECONDS);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
Entry ee = result.computeIfAbsent(entry, k -> new Entry());
|
|
|
|
|