Feature - Obtain values from RMap, based on key pattern, asynchronously

Signed-off-by: seakider <seakider@gmail.com>
pull/6449/head
seakider 5 days ago
parent 2df63e425c
commit c504b022d0

@ -35,10 +35,12 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ServiceManager;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.iterator.BaseAsyncIterator;
import org.redisson.iterator.RedissonMapIterator;
import org.redisson.iterator.RedissonMapKeyIterator;
import org.redisson.mapreduce.RedissonMapReduce;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.CompositeAsyncIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -747,7 +749,40 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public Collection<V> values(int count) {
return values(null, count);
}
@Override
public AsyncIterator<V> valuesAsync() {
return valuesAsync(null);
}
@Override
public AsyncIterator<V> valuesAsync(String keyPattern) {
return valuesAsync(keyPattern, 10);
}
@Override
public AsyncIterator<V> valuesAsync(String keyPattern, int count) {
AsyncIterator<V> asyncIterator = new BaseAsyncIterator<V, Map.Entry<Object, Object>>() {
@Override
protected RFuture<ScanResult<Map.Entry<Object, Object>>> iterator(RedisClient client, String nextItPos) {
return scanIteratorAsync(name, client, nextItPos, keyPattern, count);
}
@Override
protected V getValue(java.util.Map.Entry<Object, Object> entry) {
return (V) entry.getValue();
}
};
return new CompositeAsyncIterator<>(Arrays.asList(asyncIterator), count);
}
@Override
public AsyncIterator<V> valuesAsync(int count) {
return valuesAsync(null, count);
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
return entrySet(null);
@ -767,6 +802,34 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public Set<java.util.Map.Entry<K, V>> entrySet(int count) {
return entrySet(null, count);
}
@Override
public AsyncIterator<java.util.Map.Entry<K, V>> entrySetAsync() {
return entrySetAsync(null);
}
@Override
public AsyncIterator<java.util.Map.Entry<K, V>> entrySetAsync(String keyPattern) {
return entrySetAsync(keyPattern, 10);
}
@Override
public AsyncIterator<java.util.Map.Entry<K, V>> entrySetAsync(String keyPattern, int count) {
AsyncIterator<java.util.Map.Entry<K, V>> asyncIterator = new BaseAsyncIterator<java.util.Map.Entry<K, V>, Map.Entry<Object, Object>>() {
@Override
protected RFuture<ScanResult<Map.Entry<Object, Object>>> iterator(RedisClient client, String nextItPos) {
return scanIteratorAsync(name, client, nextItPos, keyPattern, count);
}
};
return new CompositeAsyncIterator<>(Arrays.asList(asyncIterator), count);
}
@Override
public AsyncIterator<java.util.Map.Entry<K, V>> entrySetAsync(int count) {
return entrySetAsync(null, count);
}
@Override
public Set<K> readAllKeySet() {

@ -210,6 +210,154 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
*/
RFuture<Integer> sizeAsync();
/**
* Returns values of this map using iterable.
* Values are loaded in batch. Batch size is <code>10</code>.
*
* @return Asynchronous Iterable object
*/
AsyncIterator<V> valuesAsync();
/**
* Returns values of this map using iterable.
* Values are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only values mapped by matched keys of this pattern are loaded.
* <p>
* Use <code>org.redisson.client.codec.StringCodec</code> for Map keys.
* <p>
* Usage example:
* <pre>
* Codec valueCodec = ...
* RMap<String, MyObject> map = redissonClient.getMap("simpleMap", new CompositeCodec(StringCodec.INSTANCE, valueCodec, valueCodec));
*
* // or
*
* RMap<String, String> map = redissonClient.getMap("simpleMap", StringCodec.INSTANCE);
* </pre>
* <pre>
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
* </pre>
*
* @param keyPattern - key pattern
* @return Asynchronous Iterable object
*/
AsyncIterator<V> valuesAsync(String keyPattern);
/**
* Returns values of this map using iterable.
* Values are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only values mapped by matched keys of this pattern are loaded.
* <p>
* Use <code>org.redisson.client.codec.StringCodec</code> for Map keys.
* <p>
* Usage example:
* <pre>
* Codec valueCodec = ...
* RMap<String, MyObject> map = redissonClient.getMap("simpleMap", new CompositeCodec(StringCodec.INSTANCE, valueCodec, valueCodec));
*
* // or
*
* RMap<String, String> map = redissonClient.getMap("simpleMap", StringCodec.INSTANCE);
* </pre>
* <pre>
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
* </pre>
*
* @param keyPattern - key pattern
* @param count - size of values batch
* @return Asynchronous Iterable object
*/
AsyncIterator<V> valuesAsync(String keyPattern, int count);
/**
* Returns values of this map using iterable.
* Values are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of values batch
* @return Asynchronous Iterable object
*/
AsyncIterator<V> valuesAsync(int count);
/**
* Returns map entries using iterable.
* Map entries are loaded in batch. Batch size is <code>10</code>.
*
* @return Asynchronous Iterable object
*/
AsyncIterator<java.util.Map.Entry<K, V>> entrySetAsync();
/**
* Returns map entries using iterable.
* Map entries are loaded in batch. Batch size is <code>10</code>.
* If <code>keyPattern</code> is not null then only entries mapped by matched keys of this pattern are loaded.
* <p>
* Use <code>org.redisson.client.codec.StringCodec</code> for Map keys.
* <p>
* Usage example:
* <pre>
* Codec valueCodec = ...
* RMap<String, MyObject> map = redissonClient.getMap("simpleMap", new CompositeCodec(StringCodec.INSTANCE, valueCodec, valueCodec));
*
* // or
*
* RMap<String, String> map = redissonClient.getMap("simpleMap", StringCodec.INSTANCE);
* </pre>
* <pre>
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
* </pre>
*
* @param keyPattern key pattern
* @return Asynchronous Iterable object
*/
AsyncIterator<java.util.Map.Entry<K, V>> entrySetAsync(String keyPattern);
/**
* Returns map entries using iterable.
* Map entries are loaded in batch. Batch size is defined by <code>count</code> param.
* If <code>keyPattern</code> is not null then only entries mapped by matched keys of this pattern are loaded.
* <p>
* Use <code>org.redisson.client.codec.StringCodec</code> for Map keys.
* <p>
* Usage example:
* <pre>
* Codec valueCodec = ...
* RMap<String, MyObject> map = redissonClient.getMap("simpleMap", new CompositeCodec(StringCodec.INSTANCE, valueCodec, valueCodec));
*
* // or
*
* RMap<String, String> map = redissonClient.getMap("simpleMap", StringCodec.INSTANCE);
* </pre>
* <pre>
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
* </pre>
*
* @param keyPattern key pattern
* @param count size of entries batch
* @return Asynchronous Iterable object
*/
AsyncIterator<java.util.Map.Entry<K, V>> entrySetAsync(String keyPattern, int count);
/**
* Returns map entries using iterable.
* Map entries are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of entries batch
* @return Asynchronous Iterable object
*/
AsyncIterator<java.util.Map.Entry<K, V>> entrySetAsync(int count);
/**
* Removes map entries mapped by specified <code>keys</code>.
* <p>

@ -4,6 +4,7 @@ import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.Test;
import org.redisson.api.AsyncIterator;
import org.redisson.api.MapOptions;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RMap;
@ -11,11 +12,11 @@ import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import static org.assertj.core.api.Assertions.assertThat;
@ -131,6 +132,139 @@ public class RedissonMapTest extends BaseMapTest {
assertThat(map.entrySet()).containsExactlyElementsOf(testMap.entrySet());
}
@Test
public void testValuesAsync() {
RMap<Integer, String> map = redisson.getMap("simple12");
map.put(1, "12");
map.put(2, "33");
map.put(3, "43");
List<String> list = new ArrayList<>();
AsyncIterator<String> iterator = map.valuesAsync();
CompletionStage<Void> f = iterateAll(iterator, list);
f.toCompletableFuture().join();
assertThat(map.size()).isEqualTo(list.size());
}
@Test
public void testValuesByCountAsync() {
RMap<Integer, String> map = redisson.getMap("simple12");
map.put(1, "12");
map.put(2, "33");
map.put(3, "43");
List<String> list = new ArrayList<>();
AsyncIterator<String> iterator = map.valuesAsync(2);
CompletionStage<Void> f = iterateAll(iterator, list);
f.toCompletableFuture().join();
assertThat(list.size()).isEqualTo(2);
}
@Test
public void testValuesByPatternAsync() {
RMap<String, String> map = getMap("simple", StringCodec.INSTANCE);
map.put("10", "100");
map.put("20", "200");
map.put("30", "300");
List<String> list = new ArrayList<>();
AsyncIterator<String> iterator = map.valuesAsync("?0");
CompletionStage<Void> f = iterateAll(iterator, list);
f.toCompletableFuture().join();
assertThat(list).containsExactlyInAnyOrder("100", "200", "300");
list.clear();
AsyncIterator<String> iterator2 = map.valuesAsync("1");
CompletionStage<Void> f2 = iterateAll(iterator2, list);
f2.toCompletableFuture().join();
assertThat(list.isEmpty()).isTrue();
list.clear();
AsyncIterator<String> iterator3 = map.valuesAsync("10");
CompletionStage<Void> f3 = iterateAll(iterator3, list);
f3.toCompletableFuture().join();
assertThat(list).containsExactlyInAnyOrder("100");
}
@Test
public void testEntrySetByCountAsync() {
RMap<Integer, String> map = redisson.getMap("simple12");
map.put(1, "12");
map.put(2, "33");
map.put(3, "43");
List<java.util.Map.Entry<Integer, String>> list = new ArrayList<>();
AsyncIterator<java.util.Map.Entry<Integer, String>> iterator = map.entrySetAsync(2);
CompletionStage<Void> f = iterateAll(iterator, list);
f.toCompletableFuture().join();
assertThat(list.size()).isEqualTo(2);
}
@Test
public void testEntrySetAsync() {
RMap<Integer, String> map = redisson.getMap("simple12");
map.put(1, "12");
map.put(2, "33");
map.put(3, "43");
List<java.util.Map.Entry<Integer, String>> list = new ArrayList<>();
AsyncIterator<java.util.Map.Entry<Integer, String>> iterator = map.entrySetAsync();
CompletionStage<Void> f = iterateAll(iterator, list);
f.toCompletableFuture().join();
assertThat(map.size()).isEqualTo(list.size());
}
@Test
public void testVEntrySetByPatternAsync() {
RMap<String, String> map = getMap("simple", StringCodec.INSTANCE);
map.put("10", "100");
map.put("20", "200");
map.put("30", "300");
List<java.util.Map.Entry<String, String>> list = new ArrayList<>();
AsyncIterator<java.util.Map.Entry<String, String>> iterator = map.entrySetAsync("?0");
CompletionStage<Void> f = iterateAll(iterator, list);
f.toCompletableFuture().join();
assertThat(list.size()).isEqualTo(3);
list.clear();
AsyncIterator<java.util.Map.Entry<String, String>> iterator2 = map.entrySetAsync("1");
CompletionStage<Void> f2 = iterateAll(iterator2, list);
f2.toCompletableFuture().join();
assertThat(list.isEmpty()).isTrue();
list.clear();
AsyncIterator<java.util.Map.Entry<String, String>> iterator3 = map.entrySetAsync("10");
CompletionStage<Void> f3 = iterateAll(iterator3, list);
f3.toCompletableFuture().join();
assertThat(list.size()).isEqualTo(1);
}
public CompletionStage<Void> iterateAll(AsyncIterator<?> iterator, List list) {
return iterator.hasNext().thenCompose(r -> {
if (r) {
return iterator.next().thenCompose(k -> {
list.add(k);
return iterateAll(iterator, list);
});
} else {
return CompletableFuture.completedFuture(null);
}
});
}
@Test
public void testReadAllEntrySet() {
RMap<Integer, String> map = redisson.getMap("simple12");

Loading…
Cancel
Save