From 032dc1bdaa6a637a4c01f1a5be8928404d8f9afb Mon Sep 17 00:00:00 2001 From: seakider Date: Wed, 22 Jan 2025 23:47:00 +0800 Subject: [PATCH 1/4] Feature - RKeyAsync getKeysAsync Signed-off-by: seakider --- .../main/java/org/redisson/RedissonKeys.java | 22 +++++++++++++ .../java/org/redisson/api/RKeysAsync.java | 20 ++++++++++- .../java/org/redisson/RedissonKeysTest.java | 33 +++++++++++++++++-- 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 371ba659f..8fc12e724 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -48,6 +48,7 @@ import org.redisson.rx.CommandRxBatchService; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; @@ -139,12 +140,33 @@ public final class RedissonKeys implements RKeys { return getKeys(KeysScanOptions.defaults()); } + @Override + public AsyncIterator getKeysAsync() { + return getKeysAsync(KeysScanOptions.defaults()); + } + @Override public Iterable getKeys(KeysScanOptions options) { KeysScanParams params = (KeysScanParams) options; return getKeysByPattern(scan, params.getPattern(), params.getLimit(), params.getChunkSize(), params.getType()); } + @Override + public AsyncIterator getKeysAsync(KeysScanOptions options) { + Iterator iter = getKeys(options).iterator(); + return new AsyncIterator() { + @Override + public CompletionStage hasNext() { + return CompletableFuture.completedFuture(iter.hasNext()); + } + + @Override + public CompletionStage next() { + return CompletableFuture.completedFuture(iter.next()); + } + }; + } + @Override public Iterable getKeys(int count) { return getKeysByPattern(null, count); diff --git a/redisson/src/main/java/org/redisson/api/RKeysAsync.java b/redisson/src/main/java/org/redisson/api/RKeysAsync.java index 0355d7098..86b309e13 100644 --- a/redisson/src/main/java/org/redisson/api/RKeysAsync.java +++ b/redisson/src/main/java/org/redisson/api/RKeysAsync.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import org.redisson.api.options.KeysScanOptions; + import java.util.concurrent.TimeUnit; /** @@ -132,7 +134,23 @@ public interface RKeysAsync { * @return amount of existing keys */ RFuture countExistsAsync(String... names); - + + /** + * Get all keys using iterable. Keys traversing with SCAN operation. + * Each SCAN operation loads up to 10 keys per request. + * + * @return Asynchronous Iterable object + */ + AsyncIterator getKeysAsync(); + + /** + * Get all keys using iterable. Keys traversing with SCAN operation. + * + * @param options scan options + * @return Asynchronous Iterable object + */ + AsyncIterator getKeysAsync(KeysScanOptions options); + /** * Get Redis object type by key * diff --git a/redisson/src/test/java/org/redisson/RedissonKeysTest.java b/redisson/src/test/java/org/redisson/RedissonKeysTest.java index ebd21adab..707de748b 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysTest.java @@ -16,6 +16,8 @@ import org.redisson.config.Protocol; import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -280,6 +282,35 @@ public class RedissonKeysTest extends RedisDockerTest { Assertions.assertFalse(redisson.getKeys().getKeys().iterator().hasNext()); } + @Test + public void testKeysAsyncIterable() { + Set keys = new HashSet(); + for (int i = 0; i < 115; i++) { + String key = "key" + Math.random(); + RBucket bucket = redisson.getBucket(key); + keys.add(key); + bucket.set("someValue"); + } + + AsyncIterator iterator = redisson.getKeys().getKeysAsync(); + CompletionStage f = iterateAll(iterator, keys); + f.whenComplete((r, e) -> { + Assertions.assertEquals(0, keys.size()); + }); + } + + public CompletionStage iterateAll(AsyncIterator iterator, Set keys) { + return iterator.hasNext().thenCompose(r -> { + if (r) { + iterator.next().thenCompose(k -> { + keys.remove(redisson.getConfig().useSingleServer().getNameMapper().map(k)); + return iterateAll(iterator, keys); + }); + } + return CompletableFuture.completedFuture(null); + }); + } + @Test public void testRandomKey() { RBucket bucket = redisson.getBucket("test1"); @@ -371,7 +402,6 @@ public class RedissonKeysTest extends RedisDockerTest { Assertions.assertEquals(4L, r.getResponses().get(0)); } - @Test public void testFindKeys() { RBucket bucket = redisson.getBucket("test1"); @@ -418,5 +448,4 @@ public class RedissonKeysTest extends RedisDockerTest { s = redisson.getKeys().count(); assertThat(s).isEqualTo(1); } - } From caabd174835646b4049f0b3e723a5ea6292e75b0 Mon Sep 17 00:00:00 2001 From: seakider Date: Thu, 23 Jan 2025 21:00:47 +0800 Subject: [PATCH 2/4] Feature - RKeyAsync getKeysAsync Signed-off-by: seakider --- .../main/java/org/redisson/RedissonKeys.java | 27 +++--- .../redisson/iterator/BaseAsyncIterator.java | 86 +++++++++++++++++++ .../redisson/misc/CompositeAsyncIterator.java | 85 ++++++++++++++++++ .../java/org/redisson/RedissonKeysTest.java | 32 +++++-- 4 files changed, 209 insertions(+), 21 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java create mode 100644 redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 8fc12e724..6d45fa024 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -38,8 +38,10 @@ import org.redisson.command.CommandBatchService; import org.redisson.config.Protocol; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; +import org.redisson.iterator.BaseAsyncIterator; import org.redisson.iterator.RedissonBaseIterator; import org.redisson.misc.CompletableFutureWrapper; +import org.redisson.misc.CompositeAsyncIterator; import org.redisson.misc.CompositeIterable; import org.redisson.pubsub.PublishSubscribeService; import org.redisson.reactive.CommandReactiveBatchService; @@ -153,18 +155,19 @@ public final class RedissonKeys implements RKeys { @Override public AsyncIterator getKeysAsync(KeysScanOptions options) { - Iterator iter = getKeys(options).iterator(); - return new AsyncIterator() { - @Override - public CompletionStage hasNext() { - return CompletableFuture.completedFuture(iter.hasNext()); - } - - @Override - public CompletionStage next() { - return CompletableFuture.completedFuture(iter.next()); - } - }; + KeysScanParams params = (KeysScanParams) options; + List> asyncIterators = new ArrayList<>(); + for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { + AsyncIterator asyncIterator = new BaseAsyncIterator() { + @Override + protected RFuture> iterator(RedisClient client, String nextItPos) { + return scanIteratorAsync(client, entry, scan, nextItPos, params.getPattern(), params.getChunkSize(), params.getType()); + } + }; + asyncIterators.add(asyncIterator); + + } + return new CompositeAsyncIterator<>(asyncIterators, params.getLimit()); } @Override diff --git a/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java b/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java new file mode 100644 index 000000000..0e89171a0 --- /dev/null +++ b/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2013-2025 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.iterator; + +import org.redisson.ScanResult; +import org.redisson.api.AsyncIterator; +import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * + * @author seakider + * + */ +public abstract class BaseAsyncIterator implements AsyncIterator { + private Iterator lastIt; + protected String nextItPos = "0"; + protected RedisClient client; + + @Override + public CompletionStage hasNext() { + CompletableFuture result = new CompletableFuture<>(); + if (nextItPos == null && (lastIt == null || !lastIt.hasNext())) { + result.complete(false); + return result; + } + if (lastIt == null || !lastIt.hasNext()) { + iterator(client, nextItPos).whenComplete((v, e) -> { + if (e != null || v == null) { + client = null; + nextItPos = null; + result.complete(false); + } else { + client = v.getRedisClient(); + nextItPos = v.getPos(); + lastIt = v.getValues().iterator(); + if (nextItPos.equals("0")) { + nextItPos = null; + } + result.complete(lastIt.hasNext()); + } + }); + } else { + result.complete(true); + } + return result; + } + + @Override + public CompletionStage next() { + CompletableFuture result = new CompletableFuture<>(); + hasNext().thenAccept(v -> { + if (!v) { + result.completeExceptionally(new NoSuchElementException()); + return; + } + E next = lastIt.next(); + result.complete(getValue(next)); + }); + return result; + } + + protected abstract RFuture> iterator(RedisClient client, String nextItPos); + + protected V getValue(E entry) { + return (V) entry; + } +} diff --git a/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java b/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java new file mode 100644 index 000000000..e2b74a71d --- /dev/null +++ b/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2013-2025 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import org.redisson.api.AsyncIterator; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; + +/** + * + * @author seakider + * + */ +public class CompositeAsyncIterator implements AsyncIterator { + private final Iterator> iterator; + private AsyncIterator currentAsyncIterator; + private final int limit; + private int counter; + + public CompositeAsyncIterator(List> asyncIterators, int limit) { + this.iterator = asyncIterators.iterator(); + this.limit = limit; + } + + @Override + public CompletionStage hasNext() { + if (limit > 0 && limit <= counter) { + return CompletableFuture.completedFuture(false); + } + while (currentAsyncIterator == null && iterator.hasNext()) { + currentAsyncIterator = iterator.next(); + } + if (currentAsyncIterator == null) { + return CompletableFuture.completedFuture(false); + + } + CompletionStage main = currentAsyncIterator.hasNext(); + return main.thenCompose(v -> { + if (v) { + return CompletableFuture.completedFuture(true); + } else { + currentAsyncIterator = null; + return hasNext(); + } + }); + } + + @Override + public CompletionStage next() { + CompletableFuture result = new CompletableFuture<>(); + hasNext().thenAccept(v1 -> { + if (!v1) { + result.completeExceptionally(new NoSuchElementException()); + return; + } + currentAsyncIterator.next().whenComplete((v2, e2) -> { + if (e2 != null) { + result.completeExceptionally(new CompletionException(e2)); + return; + } + result.complete(v2); + counter++; + }); + }); + return result; + } +} diff --git a/redisson/src/test/java/org/redisson/RedissonKeysTest.java b/redisson/src/test/java/org/redisson/RedissonKeysTest.java index 707de748b..1f626c590 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysTest.java @@ -288,26 +288,40 @@ public class RedissonKeysTest extends RedisDockerTest { for (int i = 0; i < 115; i++) { String key = "key" + Math.random(); RBucket bucket = redisson.getBucket(key); - keys.add(key); bucket.set("someValue"); } - + AsyncIterator iterator = redisson.getKeys().getKeysAsync(); CompletionStage f = iterateAll(iterator, keys); - f.whenComplete((r, e) -> { - Assertions.assertEquals(0, keys.size()); - }); + f.toCompletableFuture().join(); + assertThat(redisson.getKeys().count()).isEqualTo(keys.size()); } - + + @Test + public void testKeysAsyncIterablePattern() { + Set keys = new HashSet(); + for (int i = 0; i < 115; i++) { + String key = "key" + Math.random(); + RBucket bucket = redisson.getBucket(key); + bucket.set("someValue"); + } + int limit = 23; + AsyncIterator iterator = redisson.getKeys().getKeysAsync(KeysScanOptions.defaults().limit(limit)); + CompletionStage f = iterateAll(iterator, keys); + f.toCompletableFuture().join(); + assertThat(limit).isEqualTo(keys.size()); + } + public CompletionStage iterateAll(AsyncIterator iterator, Set keys) { return iterator.hasNext().thenCompose(r -> { if (r) { - iterator.next().thenCompose(k -> { - keys.remove(redisson.getConfig().useSingleServer().getNameMapper().map(k)); + return iterator.next().thenCompose(k -> { + keys.add(k); return iterateAll(iterator, keys); }); + } else { + return CompletableFuture.completedFuture(null); } - return CompletableFuture.completedFuture(null); }); } From aa00650f2e788663927a0ff87bbbca7efb3e3d17 Mon Sep 17 00:00:00 2001 From: seakider Date: Thu, 23 Jan 2025 21:05:36 +0800 Subject: [PATCH 3/4] Fixed - Style check Signed-off-by: seakider --- redisson/src/main/java/org/redisson/RedissonKeys.java | 1 - .../src/main/java/org/redisson/iterator/BaseAsyncIterator.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 6d45fa024..99a9428da 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -50,7 +50,6 @@ import org.redisson.rx.CommandRxBatchService; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; diff --git a/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java b/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java index 0e89171a0..37d8b5982 100644 --- a/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java +++ b/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java @@ -52,7 +52,7 @@ public abstract class BaseAsyncIterator implements AsyncIterator { client = v.getRedisClient(); nextItPos = v.getPos(); lastIt = v.getValues().iterator(); - if (nextItPos.equals("0")) { + if ("0".equals(nextItPos)) { nextItPos = null; } result.complete(lastIt.hasNext()); From 9f3b297c5ef31a5f7e662f9cb74c6b15d6a2b94e Mon Sep 17 00:00:00 2001 From: seakider Date: Thu, 23 Jan 2025 21:13:57 +0800 Subject: [PATCH 4/4] Fixed - Style check Signed-off-by: seakider --- .../src/main/java/org/redisson/iterator/BaseAsyncIterator.java | 2 +- .../src/main/java/org/redisson/misc/CompositeAsyncIterator.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java b/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java index 37d8b5982..52e250689 100644 --- a/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java +++ b/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2013-2025 Nikita Koksharov + * Copyright (c) 2013-2024 Nikita Koksharov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java b/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java index e2b74a71d..4243c9a94 100644 --- a/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java +++ b/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2013-2025 Nikita Koksharov + * Copyright (c) 2013-2024 Nikita Koksharov * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.