diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 371ba659f..99a9428da 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -38,8 +38,10 @@ import org.redisson.command.CommandBatchService; import org.redisson.config.Protocol; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; +import org.redisson.iterator.BaseAsyncIterator; import org.redisson.iterator.RedissonBaseIterator; import org.redisson.misc.CompletableFutureWrapper; +import org.redisson.misc.CompositeAsyncIterator; import org.redisson.misc.CompositeIterable; import org.redisson.pubsub.PublishSubscribeService; import org.redisson.reactive.CommandReactiveBatchService; @@ -139,12 +141,34 @@ public final class RedissonKeys implements RKeys { return getKeys(KeysScanOptions.defaults()); } + @Override + public AsyncIterator getKeysAsync() { + return getKeysAsync(KeysScanOptions.defaults()); + } + @Override public Iterable getKeys(KeysScanOptions options) { KeysScanParams params = (KeysScanParams) options; return getKeysByPattern(scan, params.getPattern(), params.getLimit(), params.getChunkSize(), params.getType()); } + @Override + public AsyncIterator getKeysAsync(KeysScanOptions options) { + KeysScanParams params = (KeysScanParams) options; + List> asyncIterators = new ArrayList<>(); + for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { + AsyncIterator asyncIterator = new BaseAsyncIterator() { + @Override + protected RFuture> iterator(RedisClient client, String nextItPos) { + return scanIteratorAsync(client, entry, scan, nextItPos, params.getPattern(), params.getChunkSize(), params.getType()); + } + }; + asyncIterators.add(asyncIterator); + + } + return new CompositeAsyncIterator<>(asyncIterators, params.getLimit()); + } + @Override public Iterable getKeys(int count) { return getKeysByPattern(null, count); diff --git a/redisson/src/main/java/org/redisson/api/RKeysAsync.java b/redisson/src/main/java/org/redisson/api/RKeysAsync.java index 0355d7098..86b309e13 100644 --- a/redisson/src/main/java/org/redisson/api/RKeysAsync.java +++ b/redisson/src/main/java/org/redisson/api/RKeysAsync.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import org.redisson.api.options.KeysScanOptions; + import java.util.concurrent.TimeUnit; /** @@ -132,7 +134,23 @@ public interface RKeysAsync { * @return amount of existing keys */ RFuture countExistsAsync(String... names); - + + /** + * Get all keys using iterable. Keys traversing with SCAN operation. + * Each SCAN operation loads up to 10 keys per request. + * + * @return Asynchronous Iterable object + */ + AsyncIterator getKeysAsync(); + + /** + * Get all keys using iterable. Keys traversing with SCAN operation. + * + * @param options scan options + * @return Asynchronous Iterable object + */ + AsyncIterator getKeysAsync(KeysScanOptions options); + /** * Get Redis object type by key * diff --git a/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java b/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java new file mode 100644 index 000000000..52e250689 --- /dev/null +++ b/redisson/src/main/java/org/redisson/iterator/BaseAsyncIterator.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.iterator; + +import org.redisson.ScanResult; +import org.redisson.api.AsyncIterator; +import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * + * @author seakider + * + */ +public abstract class BaseAsyncIterator implements AsyncIterator { + private Iterator lastIt; + protected String nextItPos = "0"; + protected RedisClient client; + + @Override + public CompletionStage hasNext() { + CompletableFuture result = new CompletableFuture<>(); + if (nextItPos == null && (lastIt == null || !lastIt.hasNext())) { + result.complete(false); + return result; + } + if (lastIt == null || !lastIt.hasNext()) { + iterator(client, nextItPos).whenComplete((v, e) -> { + if (e != null || v == null) { + client = null; + nextItPos = null; + result.complete(false); + } else { + client = v.getRedisClient(); + nextItPos = v.getPos(); + lastIt = v.getValues().iterator(); + if ("0".equals(nextItPos)) { + nextItPos = null; + } + result.complete(lastIt.hasNext()); + } + }); + } else { + result.complete(true); + } + return result; + } + + @Override + public CompletionStage next() { + CompletableFuture result = new CompletableFuture<>(); + hasNext().thenAccept(v -> { + if (!v) { + result.completeExceptionally(new NoSuchElementException()); + return; + } + E next = lastIt.next(); + result.complete(getValue(next)); + }); + return result; + } + + protected abstract RFuture> iterator(RedisClient client, String nextItPos); + + protected V getValue(E entry) { + return (V) entry; + } +} diff --git a/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java b/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java new file mode 100644 index 000000000..4243c9a94 --- /dev/null +++ b/redisson/src/main/java/org/redisson/misc/CompositeAsyncIterator.java @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2013-2024 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.misc; + +import org.redisson.api.AsyncIterator; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; + +/** + * + * @author seakider + * + */ +public class CompositeAsyncIterator implements AsyncIterator { + private final Iterator> iterator; + private AsyncIterator currentAsyncIterator; + private final int limit; + private int counter; + + public CompositeAsyncIterator(List> asyncIterators, int limit) { + this.iterator = asyncIterators.iterator(); + this.limit = limit; + } + + @Override + public CompletionStage hasNext() { + if (limit > 0 && limit <= counter) { + return CompletableFuture.completedFuture(false); + } + while (currentAsyncIterator == null && iterator.hasNext()) { + currentAsyncIterator = iterator.next(); + } + if (currentAsyncIterator == null) { + return CompletableFuture.completedFuture(false); + + } + CompletionStage main = currentAsyncIterator.hasNext(); + return main.thenCompose(v -> { + if (v) { + return CompletableFuture.completedFuture(true); + } else { + currentAsyncIterator = null; + return hasNext(); + } + }); + } + + @Override + public CompletionStage next() { + CompletableFuture result = new CompletableFuture<>(); + hasNext().thenAccept(v1 -> { + if (!v1) { + result.completeExceptionally(new NoSuchElementException()); + return; + } + currentAsyncIterator.next().whenComplete((v2, e2) -> { + if (e2 != null) { + result.completeExceptionally(new CompletionException(e2)); + return; + } + result.complete(v2); + counter++; + }); + }); + return result; + } +} diff --git a/redisson/src/test/java/org/redisson/RedissonKeysTest.java b/redisson/src/test/java/org/redisson/RedissonKeysTest.java index ebd21adab..1f626c590 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysTest.java @@ -16,6 +16,8 @@ import org.redisson.config.Protocol; import java.time.Duration; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -280,6 +282,49 @@ public class RedissonKeysTest extends RedisDockerTest { Assertions.assertFalse(redisson.getKeys().getKeys().iterator().hasNext()); } + @Test + public void testKeysAsyncIterable() { + Set keys = new HashSet(); + for (int i = 0; i < 115; i++) { + String key = "key" + Math.random(); + RBucket bucket = redisson.getBucket(key); + bucket.set("someValue"); + } + + AsyncIterator iterator = redisson.getKeys().getKeysAsync(); + CompletionStage f = iterateAll(iterator, keys); + f.toCompletableFuture().join(); + assertThat(redisson.getKeys().count()).isEqualTo(keys.size()); + } + + @Test + public void testKeysAsyncIterablePattern() { + Set keys = new HashSet(); + for (int i = 0; i < 115; i++) { + String key = "key" + Math.random(); + RBucket bucket = redisson.getBucket(key); + bucket.set("someValue"); + } + int limit = 23; + AsyncIterator iterator = redisson.getKeys().getKeysAsync(KeysScanOptions.defaults().limit(limit)); + CompletionStage f = iterateAll(iterator, keys); + f.toCompletableFuture().join(); + assertThat(limit).isEqualTo(keys.size()); + } + + public CompletionStage iterateAll(AsyncIterator iterator, Set keys) { + return iterator.hasNext().thenCompose(r -> { + if (r) { + return iterator.next().thenCompose(k -> { + keys.add(k); + return iterateAll(iterator, keys); + }); + } else { + return CompletableFuture.completedFuture(null); + } + }); + } + @Test public void testRandomKey() { RBucket bucket = redisson.getBucket("test1"); @@ -371,7 +416,6 @@ public class RedissonKeysTest extends RedisDockerTest { Assertions.assertEquals(4L, r.getResponses().get(0)); } - @Test public void testFindKeys() { RBucket bucket = redisson.getBucket("test1"); @@ -418,5 +462,4 @@ public class RedissonKeysTest extends RedisDockerTest { s = redisson.getKeys().count(); assertThat(s).isEqualTo(1); } - }