Fixed - RLiveObjectService.delete() works asynchronously. #2935

pull/2954/head
Nikita Koksharov 5 years ago
parent 1c49817499
commit 4b2fad8e25

@ -634,30 +634,24 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return writeAsync((String) null, codec, command, keys);
}
Map<MasterSlaveEntry, List<String>> range2key = new HashMap<MasterSlaveEntry, List<String>>();
Map<MasterSlaveEntry, List<String>> range2key = new HashMap<>();
for (String key : keys) {
int slot = connectionManager.calcSlot(key);
MasterSlaveEntry entry = connectionManager.getEntry(slot);
List<String> list = range2key.get(entry);
if (list == null) {
list = new ArrayList<String>();
range2key.put(entry, list);
}
List<String> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}
RPromise<R> result = new RedissonPromise<>();
AtomicLong executed = new AtomicLong(keys.length);
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (t, u) -> {
if (u == null) {
for (T res : (List<T>) t.getResponses()) {
if (res != null) {
callback.onSlotResult(res);
}
}
BiConsumer<T, Throwable> listener = (res, ex) -> {
if (ex != null) {
failed.set(ex);
} else {
failed.set(u);
if (res != null) {
callback.onSlotResult(res);
}
}
if (executed.decrementAndGet() == 0) {
@ -671,7 +665,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
for (Entry<MasterSlaveEntry, List<String>> entry : range2key.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService = new CommandBatchService(connectionManager);
CommandBatchService executorService;
if (this instanceof CommandBatchService) {
executorService = (CommandBatchService) this;
} else {
executorService = new CommandBatchService(connectionManager);
}
for (String key : entry.getValue()) {
RedisCommand<T> c = command;
RedisCommand<T> newCommand = callback.createCommand(key);
@ -679,14 +679,17 @@ public class CommandAsyncService implements CommandAsyncExecutor {
c = newCommand;
}
if (readOnly) {
executorService.readAsync(entry.getKey(), codec, c, key);
RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, key);
f.onComplete(listener);
} else {
executorService.writeAsync(entry.getKey(), codec, c, key);
RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, key);
f.onComplete(listener);
}
}
RFuture<BatchResult<?>> future = executorService.executeAsync();
future.onComplete(listener);
if (!(this instanceof CommandBatchService)) {
executorService.executeAsync();
}
}
return result;

Loading…
Cancel
Save