Fixed - RKeysAsync.countExists() throws errors in cluster mode. #4395

pull/4550/merge
Nikita Koksharov 3 years ago
parent bacd89756d
commit 7d2e36e4a9

@ -39,6 +39,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@ -193,10 +194,32 @@ public class RedissonKeys implements RKeys {
@Override
public RFuture<Long> countExistsAsync(String... names) {
List<CompletableFuture<Long>> futures = commandExecutor.readAllAsync(RedisCommands.EXISTS_LONG, names);
CompletableFuture<Void> f = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<Long> s = f.thenApply(r -> futures.stream().mapToLong(v -> v.getNow(0L)).sum());
return new CompletableFutureWrapper<>(s);
if (names.length == 0) {
return new CompletableFutureWrapper<>(0L);
}
List<String> keysList = Arrays.stream(names)
.map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k))
.collect(Collectors.toList());
return commandExecutor.readBatchedAsync(StringCodec.INSTANCE, RedisCommands.EXISTS_LONG, new SlotCallback<Long, Long>() {
final AtomicLong total = new AtomicLong();
@Override
public void onSlotResult(Long result) {
total.addAndGet(result);
}
@Override
public Long onFinish() {
return total.get();
}
@Override
public RedisCommand<Long> createCommand(List<String> keys) {
return RedisCommands.EXISTS_LONG;
}
}, keysList.toArray(new String[0]));
}
@Override

@ -61,6 +61,45 @@ public class RedissonKeysTest extends BaseTest {
assertThat(redisson.getKeys().touch("test3", "test10", "test")).isEqualTo(2);
}
@Test
public void testExistsInCluster() throws FailedToStartRedisException, IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
int size = 10000;
List<String> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
list.add("test" + i);
redisson.getBucket("test" + i).set(i);
}
assertThat(redisson.getKeys().countExists("test1", "test2", "test34", "test45", "asdfl;jasf")).isEqualTo(4);
long deletedSize = redisson.getKeys().delete(list.toArray(new String[list.size()]));
assertThat(deletedSize).isEqualTo(size);
redisson.shutdown();
process.shutdown();
}
@Test
public void testExists() {

Loading…
Cancel
Save