diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 5e4524663..feea4e708 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -39,6 +39,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -193,10 +194,32 @@ public class RedissonKeys implements RKeys { @Override public RFuture countExistsAsync(String... names) { - List> futures = commandExecutor.readAllAsync(RedisCommands.EXISTS_LONG, names); - CompletableFuture f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - CompletableFuture s = f.thenApply(r -> futures.stream().mapToLong(v -> v.getNow(0L)).sum()); - return new CompletableFutureWrapper<>(s); + if (names.length == 0) { + return new CompletableFutureWrapper<>(0L); + } + + List keysList = Arrays.stream(names) + .map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k)) + .collect(Collectors.toList()); + + return commandExecutor.readBatchedAsync(StringCodec.INSTANCE, RedisCommands.EXISTS_LONG, new SlotCallback() { + final AtomicLong total = new AtomicLong(); + + @Override + public void onSlotResult(Long result) { + total.addAndGet(result); + } + + @Override + public Long onFinish() { + return total.get(); + } + + @Override + public RedisCommand createCommand(List keys) { + return RedisCommands.EXISTS_LONG; + } + }, keysList.toArray(new String[0])); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonKeysTest.java b/redisson/src/test/java/org/redisson/RedissonKeysTest.java index 3544f9e28..e7363b001 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysTest.java @@ -61,6 +61,45 @@ public class RedissonKeysTest extends BaseTest { assertThat(redisson.getKeys().touch("test3", "test10", "test")).isEqualTo(2); } + @Test + public void testExistsInCluster() throws FailedToStartRedisException, IOException, InterruptedException { + RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave(); + RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave(); + + + ClusterRunner clusterRunner = new ClusterRunner() + .addNode(master1, slave1) + .addNode(master2, slave2) + .addNode(master3, slave3); + ClusterProcesses process = clusterRunner.run(); + + Config config = new Config(); + config.useClusterServers() + .setLoadBalancer(new RandomLoadBalancer()) + .addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort()); + RedissonClient redisson = Redisson.create(config); + + int size = 10000; + List list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + list.add("test" + i); + redisson.getBucket("test" + i).set(i); + } + + assertThat(redisson.getKeys().countExists("test1", "test2", "test34", "test45", "asdfl;jasf")).isEqualTo(4); + long deletedSize = redisson.getKeys().delete(list.toArray(new String[list.size()])); + + assertThat(deletedSize).isEqualTo(size); + + redisson.shutdown(); + process.shutdown(); + } + + @Test public void testExists() {