Feature - RKeyAsync getKeysAsync

Signed-off-by: seakider <seakider@gmail.com>
pull/6407/head
seakider 1 week ago
parent 0ba57eca24
commit 032dc1bdaa

@ -48,6 +48,7 @@ import org.redisson.rx.CommandRxBatchService;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
@ -139,12 +140,33 @@ public final class RedissonKeys implements RKeys {
return getKeys(KeysScanOptions.defaults());
}
@Override
public AsyncIterator<String> getKeysAsync() {
return getKeysAsync(KeysScanOptions.defaults());
}
@Override
public Iterable<String> getKeys(KeysScanOptions options) {
KeysScanParams params = (KeysScanParams) options;
return getKeysByPattern(scan, params.getPattern(), params.getLimit(), params.getChunkSize(), params.getType());
}
@Override
public AsyncIterator<String> getKeysAsync(KeysScanOptions options) {
Iterator<String> iter = getKeys(options).iterator();
return new AsyncIterator<String>() {
@Override
public CompletionStage<Boolean> hasNext() {
return CompletableFuture.completedFuture(iter.hasNext());
}
@Override
public CompletionStage<String> next() {
return CompletableFuture.completedFuture(iter.next());
}
};
}
@Override
public Iterable<String> getKeys(int count) {
return getKeysByPattern(null, count);

@ -15,6 +15,8 @@
*/
package org.redisson.api;
import org.redisson.api.options.KeysScanOptions;
import java.util.concurrent.TimeUnit;
/**
@ -133,6 +135,22 @@ public interface RKeysAsync {
*/
RFuture<Long> countExistsAsync(String... names);
/**
* Get all keys using iterable. Keys traversing with SCAN operation.
* Each SCAN operation loads up to <code>10</code> keys per request.
*
* @return Asynchronous Iterable object
*/
AsyncIterator<String> getKeysAsync();
/**
* Get all keys using iterable. Keys traversing with SCAN operation.
*
* @param options scan options
* @return Asynchronous Iterable object
*/
AsyncIterator<String> getKeysAsync(KeysScanOptions options);
/**
* Get Redis object type by key
*

@ -16,6 +16,8 @@ import org.redisson.config.Protocol;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -280,6 +282,35 @@ public class RedissonKeysTest extends RedisDockerTest {
Assertions.assertFalse(redisson.getKeys().getKeys().iterator().hasNext());
}
@Test
public void testKeysAsyncIterable() {
Set<String> keys = new HashSet<String>();
for (int i = 0; i < 115; i++) {
String key = "key" + Math.random();
RBucket<String> bucket = redisson.getBucket(key);
keys.add(key);
bucket.set("someValue");
}
AsyncIterator<String> iterator = redisson.getKeys().getKeysAsync();
CompletionStage<Void> f = iterateAll(iterator, keys);
f.whenComplete((r, e) -> {
Assertions.assertEquals(0, keys.size());
});
}
public CompletionStage<Void> iterateAll(AsyncIterator<String> iterator, Set<String> keys) {
return iterator.hasNext().thenCompose(r -> {
if (r) {
iterator.next().thenCompose(k -> {
keys.remove(redisson.getConfig().useSingleServer().getNameMapper().map(k));
return iterateAll(iterator, keys);
});
}
return CompletableFuture.completedFuture(null);
});
}
@Test
public void testRandomKey() {
RBucket<String> bucket = redisson.getBucket("test1");
@ -371,7 +402,6 @@ public class RedissonKeysTest extends RedisDockerTest {
Assertions.assertEquals(4L, r.getResponses().get(0));
}
@Test
public void testFindKeys() {
RBucket<String> bucket = redisson.getBucket("test1");
@ -418,5 +448,4 @@ public class RedissonKeysTest extends RedisDockerTest {
s = redisson.getKeys().count();
assertThat(s).isEqualTo(1);
}
}

Loading…
Cancel
Save