Merge pull request #6407 from seakider/feature_getKeysAsync

Feature - RKeyAsync getKeysAsync
pull/6297/merge
Nikita Koksharov 1 week ago committed by GitHub
commit 1450176e52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -38,8 +38,10 @@ import org.redisson.command.CommandBatchService;
import org.redisson.config.Protocol;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.iterator.BaseAsyncIterator;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.CompositeAsyncIterator;
import org.redisson.misc.CompositeIterable;
import org.redisson.pubsub.PublishSubscribeService;
import org.redisson.reactive.CommandReactiveBatchService;
@ -139,12 +141,34 @@ public final class RedissonKeys implements RKeys {
return getKeys(KeysScanOptions.defaults());
}
@Override
public AsyncIterator<String> getKeysAsync() {
return getKeysAsync(KeysScanOptions.defaults());
}
@Override
public Iterable<String> getKeys(KeysScanOptions options) {
KeysScanParams params = (KeysScanParams) options;
return getKeysByPattern(scan, params.getPattern(), params.getLimit(), params.getChunkSize(), params.getType());
}
@Override
public AsyncIterator<String> getKeysAsync(KeysScanOptions options) {
KeysScanParams params = (KeysScanParams) options;
List<AsyncIterator<String>> asyncIterators = new ArrayList<>();
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
AsyncIterator<String> asyncIterator = new BaseAsyncIterator<String, Object>() {
@Override
protected RFuture<ScanResult<Object>> iterator(RedisClient client, String nextItPos) {
return scanIteratorAsync(client, entry, scan, nextItPos, params.getPattern(), params.getChunkSize(), params.getType());
}
};
asyncIterators.add(asyncIterator);
}
return new CompositeAsyncIterator<>(asyncIterators, params.getLimit());
}
@Override
public Iterable<String> getKeys(int count) {
return getKeysByPattern(null, count);

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import org.redisson.api.options.KeysScanOptions;
import java.util.concurrent.TimeUnit;
/**
@ -133,6 +135,22 @@ public interface RKeysAsync {
*/
RFuture<Long> countExistsAsync(String... names);
/**
* Get all keys using iterable. Keys traversing with SCAN operation.
* Each SCAN operation loads up to <code>10</code> keys per request.
*
* @return Asynchronous Iterable object
*/
AsyncIterator<String> getKeysAsync();
/**
* Get all keys using iterable. Keys traversing with SCAN operation.
*
* @param options scan options
* @return Asynchronous Iterable object
*/
AsyncIterator<String> getKeysAsync(KeysScanOptions options);
/**
* Get Redis object type by key
*

@ -0,0 +1,86 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.iterator;
import org.redisson.ScanResult;
import org.redisson.api.AsyncIterator;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
*
* @author seakider
*
*/
public abstract class BaseAsyncIterator<V, E> implements AsyncIterator<V> {
private Iterator<E> lastIt;
protected String nextItPos = "0";
protected RedisClient client;
@Override
public CompletionStage<Boolean> hasNext() {
CompletableFuture<Boolean> result = new CompletableFuture<>();
if (nextItPos == null && (lastIt == null || !lastIt.hasNext())) {
result.complete(false);
return result;
}
if (lastIt == null || !lastIt.hasNext()) {
iterator(client, nextItPos).whenComplete((v, e) -> {
if (e != null || v == null) {
client = null;
nextItPos = null;
result.complete(false);
} else {
client = v.getRedisClient();
nextItPos = v.getPos();
lastIt = v.getValues().iterator();
if ("0".equals(nextItPos)) {
nextItPos = null;
}
result.complete(lastIt.hasNext());
}
});
} else {
result.complete(true);
}
return result;
}
@Override
public CompletionStage<V> next() {
CompletableFuture<V> result = new CompletableFuture<>();
hasNext().thenAccept(v -> {
if (!v) {
result.completeExceptionally(new NoSuchElementException());
return;
}
E next = lastIt.next();
result.complete(getValue(next));
});
return result;
}
protected abstract RFuture<ScanResult<E>> iterator(RedisClient client, String nextItPos);
protected V getValue(E entry) {
return (V) entry;
}
}

@ -0,0 +1,85 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import org.redisson.api.AsyncIterator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
/**
*
* @author seakider
*
*/
public class CompositeAsyncIterator<T> implements AsyncIterator<T> {
private final Iterator<AsyncIterator<T>> iterator;
private AsyncIterator<T> currentAsyncIterator;
private final int limit;
private int counter;
public CompositeAsyncIterator(List<AsyncIterator<T>> asyncIterators, int limit) {
this.iterator = asyncIterators.iterator();
this.limit = limit;
}
@Override
public CompletionStage<Boolean> hasNext() {
if (limit > 0 && limit <= counter) {
return CompletableFuture.completedFuture(false);
}
while (currentAsyncIterator == null && iterator.hasNext()) {
currentAsyncIterator = iterator.next();
}
if (currentAsyncIterator == null) {
return CompletableFuture.completedFuture(false);
}
CompletionStage<Boolean> main = currentAsyncIterator.hasNext();
return main.thenCompose(v -> {
if (v) {
return CompletableFuture.completedFuture(true);
} else {
currentAsyncIterator = null;
return hasNext();
}
});
}
@Override
public CompletionStage<T> next() {
CompletableFuture<T> result = new CompletableFuture<>();
hasNext().thenAccept(v1 -> {
if (!v1) {
result.completeExceptionally(new NoSuchElementException());
return;
}
currentAsyncIterator.next().whenComplete((v2, e2) -> {
if (e2 != null) {
result.completeExceptionally(new CompletionException(e2));
return;
}
result.complete(v2);
counter++;
});
});
return result;
}
}

@ -16,6 +16,8 @@ import org.redisson.config.Protocol;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -280,6 +282,49 @@ public class RedissonKeysTest extends RedisDockerTest {
Assertions.assertFalse(redisson.getKeys().getKeys().iterator().hasNext());
}
@Test
public void testKeysAsyncIterable() {
Set<String> keys = new HashSet<String>();
for (int i = 0; i < 115; i++) {
String key = "key" + Math.random();
RBucket<String> bucket = redisson.getBucket(key);
bucket.set("someValue");
}
AsyncIterator<String> iterator = redisson.getKeys().getKeysAsync();
CompletionStage<Void> f = iterateAll(iterator, keys);
f.toCompletableFuture().join();
assertThat(redisson.getKeys().count()).isEqualTo(keys.size());
}
@Test
public void testKeysAsyncIterablePattern() {
Set<String> keys = new HashSet<String>();
for (int i = 0; i < 115; i++) {
String key = "key" + Math.random();
RBucket<String> bucket = redisson.getBucket(key);
bucket.set("someValue");
}
int limit = 23;
AsyncIterator<String> iterator = redisson.getKeys().getKeysAsync(KeysScanOptions.defaults().limit(limit));
CompletionStage<Void> f = iterateAll(iterator, keys);
f.toCompletableFuture().join();
assertThat(limit).isEqualTo(keys.size());
}
public CompletionStage<Void> iterateAll(AsyncIterator<String> iterator, Set<String> keys) {
return iterator.hasNext().thenCompose(r -> {
if (r) {
return iterator.next().thenCompose(k -> {
keys.add(k);
return iterateAll(iterator, keys);
});
} else {
return CompletableFuture.completedFuture(null);
}
});
}
@Test
public void testRandomKey() {
RBucket<String> bucket = redisson.getBucket("test1");
@ -371,7 +416,6 @@ public class RedissonKeysTest extends RedisDockerTest {
Assertions.assertEquals(4L, r.getResponses().get(0));
}
@Test
public void testFindKeys() {
RBucket<String> bucket = redisson.getBucket("test1");
@ -418,5 +462,4 @@ public class RedissonKeysTest extends RedisDockerTest {
s = redisson.getKeys().count();
assertThat(s).isEqualTo(1);
}
}

Loading…
Cancel
Save