diff --git a/redisson/src/main/java/org/redisson/BooleanSlotCallback.java b/redisson/src/main/java/org/redisson/BooleanSlotCallback.java index 97ffd623e..67cf3d66b 100644 --- a/redisson/src/main/java/org/redisson/BooleanSlotCallback.java +++ b/redisson/src/main/java/org/redisson/BooleanSlotCallback.java @@ -15,8 +15,8 @@ */ package org.redisson; +import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class BooleanSlotCallback implements SlotCallback { - private final AtomicBoolean r = new AtomicBoolean(); - private final Object[] params; public BooleanSlotCallback() { @@ -38,15 +36,8 @@ public class BooleanSlotCallback implements SlotCallback { } @Override - public void onSlotResult(List keys, Boolean res) { - if (res) { - r.set(true); - } - } - - @Override - public Boolean onFinish() { - return r.get(); + public Boolean onResult(Collection res) { + return res.contains(true); } @Override diff --git a/redisson/src/main/java/org/redisson/IntegerSlotCallback.java b/redisson/src/main/java/org/redisson/IntegerSlotCallback.java index 1690c5e22..3231dd0d5 100644 --- a/redisson/src/main/java/org/redisson/IntegerSlotCallback.java +++ b/redisson/src/main/java/org/redisson/IntegerSlotCallback.java @@ -15,8 +15,8 @@ */ package org.redisson; +import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; /** * @@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class IntegerSlotCallback implements SlotCallback { - private final AtomicInteger results = new AtomicInteger(); - private final Object[] params; public IntegerSlotCallback() { @@ -38,13 +36,8 @@ public class IntegerSlotCallback implements SlotCallback { } @Override - public void onSlotResult(List keys, Integer result) { - results.addAndGet(result); - } - - @Override - public Integer onFinish() { - return results.get(); + public Integer onResult(Collection result) { + return result.stream().mapToInt(r -> r).sum(); } @Override diff --git a/redisson/src/main/java/org/redisson/LongSlotCallback.java b/redisson/src/main/java/org/redisson/LongSlotCallback.java index 2d2f2295e..2306d9b32 100644 --- a/redisson/src/main/java/org/redisson/LongSlotCallback.java +++ b/redisson/src/main/java/org/redisson/LongSlotCallback.java @@ -15,8 +15,8 @@ */ package org.redisson; +import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicLong; /** * @@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicLong; */ public class LongSlotCallback implements SlotCallback { - private final AtomicLong results = new AtomicLong(); - private final Object[] params; public LongSlotCallback() { @@ -38,13 +36,8 @@ public class LongSlotCallback implements SlotCallback { } @Override - public void onSlotResult(List keys, Long result) { - results.addAndGet(result); - } - - @Override - public Long onFinish() { - return results.get(); + public Long onResult(Collection result) { + return result.stream().mapToLong(r -> r).sum(); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBuckets.java b/redisson/src/main/java/org/redisson/RedissonBuckets.java index e0a9b8d02..46f85341c 100644 --- a/redisson/src/main/java/org/redisson/RedissonBuckets.java +++ b/redisson/src/main/java/org/redisson/RedissonBuckets.java @@ -29,8 +29,6 @@ import org.redisson.misc.CompletableFutureWrapper; import java.io.IOException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -82,23 +80,18 @@ public class RedissonBuckets implements RBuckets { Codec commandCodec = new CompositeCodec(StringCodec.INSTANCE, codec, codec); - RedisCommand> command = new RedisCommand>("MGET", new MapGetAllDecoder(keysList, 0)); + RedisCommand> command = new RedisCommand<>("MGET", new MapGetAllDecoder(keysList, 0)); return commandExecutor.readBatchedAsync(commandCodec, command, new SlotCallback, Map>() { - final Map results = new ConcurrentHashMap<>(); @Override - public void onSlotResult(List keys, Map result) { - for (Map.Entry entry : result.entrySet()) { - if (entry.getKey() != null && entry.getValue() != null) { - String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) entry.getKey()); - results.put(key, (V) entry.getValue()); - } - } - } - - @Override - public Map onFinish() { - return results; + public Map onResult(Collection> result) { + return result.stream() + .flatMap(c -> c.entrySet().stream()) + .filter(e -> e.getKey() != null && e.getValue() != null) + .map(e -> { + String key = commandExecutor.getServiceManager().getConfig().getNameMapper().unmap((String) e.getKey()); + return new AbstractMap.SimpleEntry<>(key, (V) e.getValue()); + }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); } @Override @@ -116,21 +109,7 @@ public class RedissonBuckets implements RBuckets { Map mappedBuckets = map(buckets); - return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSETNX, new SlotCallback() { - final AtomicBoolean result = new AtomicBoolean(true); - - @Override - public void onSlotResult(List keys, Boolean result) { - if (!result && this.result.get()){ - this.result.set(result); - } - } - - @Override - public Boolean onFinish() { - return this.result.get(); - } - + return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSETNX, new BooleanSlotCallback() { @Override public Object[] createParams(List keys) { List params = new ArrayList<>(keys.size()); @@ -161,16 +140,7 @@ public class RedissonBuckets implements RBuckets { Map mappedBuckets = map(buckets); - return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new SlotCallback() { - @Override - public void onSlotResult(List keys, Void result) { - } - - @Override - public Void onFinish() { - return null; - } - + return commandExecutor.writeBatchedAsync(codec, RedisCommands.MSET, new VoidSlotCallback() { @Override public Object[] createParams(List keys) { List params = new ArrayList<>(keys.size()); diff --git a/redisson/src/main/java/org/redisson/SlotCallback.java b/redisson/src/main/java/org/redisson/SlotCallback.java index 041127305..b68f86e6f 100644 --- a/redisson/src/main/java/org/redisson/SlotCallback.java +++ b/redisson/src/main/java/org/redisson/SlotCallback.java @@ -18,6 +18,7 @@ package org.redisson; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.MasterSlaveEntry; +import java.util.Collection; import java.util.List; /** @@ -41,8 +42,6 @@ public interface SlotCallback { return params.toArray(); } - void onSlotResult(List keys, T result); - - R onFinish(); + R onResult(Collection result); } diff --git a/redisson/src/main/java/org/redisson/VoidSlotCallback.java b/redisson/src/main/java/org/redisson/VoidSlotCallback.java index c9dc9a753..071ae0460 100644 --- a/redisson/src/main/java/org/redisson/VoidSlotCallback.java +++ b/redisson/src/main/java/org/redisson/VoidSlotCallback.java @@ -15,8 +15,8 @@ */ package org.redisson; +import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -25,8 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class VoidSlotCallback implements SlotCallback { - private final AtomicBoolean r = new AtomicBoolean(); - private final Object[] params; public VoidSlotCallback() { @@ -38,11 +36,7 @@ public class VoidSlotCallback implements SlotCallback { } @Override - public void onSlotResult(List keys, Void res) { - } - - @Override - public Void onFinish() { + public Void onResult(Collection result) { return null; } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 26603de11..821be42ff 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -729,7 +729,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { }, Collectors.toList()))); } - Map, CompletableFuture> futures = new IdentityHashMap<>(); + List> futures = new ArrayList<>(); for (Entry>> entry : entry2keys.entrySet()) { // executes in batch due to CROSSLOT error CommandBatchService executorService; @@ -747,13 +747,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { } Object[] keysArray = callback.createKeys(entry.getKey(), groupedKeys); Object[] paramsArray = callback.createParams(Collections.emptyList()); + RFuture f; if (readOnly) { - RFuture f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); - futures.put(groupedKeys, f.toCompletableFuture()); + f = executorService.evalReadAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); } else { - RFuture f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); - futures.put(groupedKeys, f.toCompletableFuture()); + f = executorService.evalWriteAsync(entry.getKey(), codec, c, script, Arrays.asList(keysArray), paramsArray); } + futures.add(f.toCompletableFuture()); } if (!(this instanceof CommandBatchService)) { @@ -761,12 +761,12 @@ public class CommandAsyncService implements CommandAsyncExecutor { } } - CompletableFuture future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])); + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); CompletableFuture result = future.thenApply(r -> { - futures.entrySet().forEach(e -> { - callback.onSlotResult(e.getKey(), (T) e.getValue().join()); - }); - return callback.onFinish(); + List res = futures.stream() + .map(e -> (T) e.join()) + .collect(Collectors.toList()); + return callback.onResult(res); }); return new CompletableFutureWrapper<>(result); @@ -802,7 +802,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { } }, Collectors.toList()))); - Map, CompletableFuture> futures = new IdentityHashMap<>(); + List> futures = new ArrayList<>(); List> mainFutures = new ArrayList<>(); for (Entry>> entry : entry2keys.entrySet()) { // executes in batch due to CROSSLOT error @@ -820,13 +820,13 @@ public class CommandAsyncService implements CommandAsyncExecutor { c = newCommand; } Object[] params = callback.createParams(groupedKeys); + RFuture f; if (readOnly) { - RFuture f = executorService.readAsync(entry.getKey(), codec, c, params); - futures.put(groupedKeys, f.toCompletableFuture()); + f = executorService.readAsync(entry.getKey(), codec, c, params); } else { - RFuture f = executorService.writeAsync(entry.getKey(), codec, c, params); - futures.put(groupedKeys, f.toCompletableFuture()); + f = executorService.writeAsync(entry.getKey(), codec, c, params); } + futures.add(f.toCompletableFuture()); } if (!(this instanceof CommandBatchService)) { @@ -839,15 +839,15 @@ public class CommandAsyncService implements CommandAsyncExecutor { if (!mainFutures.isEmpty()) { future = CompletableFuture.allOf(mainFutures.toArray(new CompletableFuture[0])); } else { - future = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])); + future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } CompletableFuture result = future.thenApply(r -> { - futures.entrySet().forEach(e -> { - if (!e.getValue().isCompletedExceptionally() && e.getValue().getNow(null) != null) { - callback.onSlotResult(e.getKey(), (T) e.getValue().getNow(null)); - } - }); - return callback.onFinish(); + List res = futures.stream() + .filter(e -> !e.isCompletedExceptionally() && e.getNow(null) != null) + .map(e -> (T) e.join()) + .collect(Collectors.toList()); + + return callback.onResult(res); }); return new CompletableFutureWrapper<>(result);