diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 67ccea014..86ea95685 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -634,30 +634,24 @@ public class CommandAsyncService implements CommandAsyncExecutor { return writeAsync((String) null, codec, command, keys); } - Map> range2key = new HashMap>(); + Map> range2key = new HashMap<>(); for (String key : keys) { int slot = connectionManager.calcSlot(key); MasterSlaveEntry entry = connectionManager.getEntry(slot); - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } + List list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); list.add(key); } RPromise result = new RedissonPromise<>(); + AtomicLong executed = new AtomicLong(keys.length); AtomicReference failed = new AtomicReference<>(); - AtomicLong executed = new AtomicLong(range2key.size()); - BiConsumer, Throwable> listener = (t, u) -> { - if (u == null) { - for (T res : (List) t.getResponses()) { - if (res != null) { - callback.onSlotResult(res); - } - } + BiConsumer listener = (res, ex) -> { + if (ex != null) { + failed.set(ex); } else { - failed.set(u); + if (res != null) { + callback.onSlotResult(res); + } } if (executed.decrementAndGet() == 0) { @@ -671,7 +665,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { for (Entry> entry : range2key.entrySet()) { // executes in batch due to CROSSLOT error - CommandBatchService executorService = new CommandBatchService(connectionManager); + CommandBatchService executorService; + if (this instanceof CommandBatchService) { + executorService = (CommandBatchService) this; + } else { + executorService = new CommandBatchService(connectionManager); + } + for (String key : entry.getValue()) { RedisCommand c = command; RedisCommand newCommand = callback.createCommand(key); @@ -679,14 +679,17 @@ public class CommandAsyncService implements CommandAsyncExecutor { c = newCommand; } if (readOnly) { - executorService.readAsync(entry.getKey(), codec, c, key); + RFuture f = executorService.readAsync(entry.getKey(), codec, c, key); + f.onComplete(listener); } else { - executorService.writeAsync(entry.getKey(), codec, c, key); + RFuture f = executorService.writeAsync(entry.getKey(), codec, c, key); + f.onComplete(listener); } } - RFuture> future = executorService.executeAsync(); - future.onComplete(listener); + if (!(this instanceof CommandBatchService)) { + executorService.executeAsync(); + } } return result;