From 032dc1bdaa6a637a4c01f1a5be8928404d8f9afb Mon Sep 17 00:00:00 2001 From: seakider Date: Wed, 22 Jan 2025 23:47:00 +0800 Subject: [PATCH] Feature - RKeyAsync getKeysAsync Signed-off-by: seakider --- .../main/java/org/redisson/RedissonKeys.java | 22 +++++++++++++ .../java/org/redisson/api/RKeysAsync.java | 20 ++++++++++- .../java/org/redisson/RedissonKeysTest.java | 33 +++++++++++++++++-- 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 371ba659f..8fc12e724 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -48,6 +48,7 @@ import org.redisson.rx.CommandRxBatchService; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; @@ -139,12 +140,33 @@ public final class RedissonKeys implements RKeys { return getKeys(KeysScanOptions.defaults()); } + @Override + public AsyncIterator getKeysAsync() { + return getKeysAsync(KeysScanOptions.defaults()); + } + @Override public Iterable getKeys(KeysScanOptions options) { KeysScanParams params = (KeysScanParams) options; return getKeysByPattern(scan, params.getPattern(), params.getLimit(), params.getChunkSize(), params.getType()); } + @Override + public AsyncIterator getKeysAsync(KeysScanOptions options) { + Iterator iter = getKeys(options).iterator(); + return new AsyncIterator() { + @Override + public CompletionStage hasNext() { + return CompletableFuture.completedFuture(iter.hasNext()); + } + + @Override + public CompletionStage next() { + return CompletableFuture.completedFuture(iter.next()); + } + }; + } + @Override public Iterable getKeys(int count) { return getKeysByPattern(null, count); diff --git a/redisson/src/main/java/org/redisson/api/RKeysAsync.java b/redisson/src/main/java/org/redisson/api/RKeysAsync.java index 0355d7098..86b309e13 100644 --- a/redisson/src/main/java/org/redisson/api/RKeysAsync.java +++ b/redisson/src/main/java/org/redisson/api/RKeysAsync.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import org.redisson.api.options.KeysScanOptions; + import java.util.concurrent.TimeUnit; /** @@ -132,7 +134,23 @@ public interface RKeysAsync { * @return amount of existing keys */ RFuture countExistsAsync(String... names); - + + /** + * Get all keys using iterable. Keys traversing with SCAN operation. + * Each SCAN operation loads up to 10 keys per request. + * + * @return Asynchronous Iterable object + */ + AsyncIterator getKeysAsync(); + + /** + * Get all keys using iterable. Keys traversing with SCAN operation. + * + * @param options scan options + * @return Asynchronous Iterable object + */ + AsyncIterator getKeysAsync(KeysScanOptions options); + /** * Get Redis object type by key * diff --git a/redisson/src/test/java/org/redisson/RedissonKeysTest.java b/redisson/src/test/java/org/redisson/RedissonKeysTest.java index ebd21adab..707de748b 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysTest.java @@ -16,6 +16,8 @@ import org.redisson.config.Protocol; import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -280,6 +282,35 @@ public class RedissonKeysTest extends RedisDockerTest { Assertions.assertFalse(redisson.getKeys().getKeys().iterator().hasNext()); } + @Test + public void testKeysAsyncIterable() { + Set keys = new HashSet(); + for (int i = 0; i < 115; i++) { + String key = "key" + Math.random(); + RBucket bucket = redisson.getBucket(key); + keys.add(key); + bucket.set("someValue"); + } + + AsyncIterator iterator = redisson.getKeys().getKeysAsync(); + CompletionStage f = iterateAll(iterator, keys); + f.whenComplete((r, e) -> { + Assertions.assertEquals(0, keys.size()); + }); + } + + public CompletionStage iterateAll(AsyncIterator iterator, Set keys) { + return iterator.hasNext().thenCompose(r -> { + if (r) { + iterator.next().thenCompose(k -> { + keys.remove(redisson.getConfig().useSingleServer().getNameMapper().map(k)); + return iterateAll(iterator, keys); + }); + } + return CompletableFuture.completedFuture(null); + }); + } + @Test public void testRandomKey() { RBucket bucket = redisson.getBucket("test1"); @@ -371,7 +402,6 @@ public class RedissonKeysTest extends RedisDockerTest { Assertions.assertEquals(4L, r.getResponses().get(0)); } - @Test public void testFindKeys() { RBucket bucket = redisson.getBucket("test1"); @@ -418,5 +448,4 @@ public class RedissonKeysTest extends RedisDockerTest { s = redisson.getKeys().count(); assertThat(s).isEqualTo(1); } - }