From 79abeec89fc1c90459728aa2a6261c3b8145999c Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 23 Nov 2018 17:26:45 +0300 Subject: [PATCH] Feature - stream methods added to RKeys, RDeque, RScoredSortedSet, RSet, RSetCache object. #1745 --- .../main/java/org/redisson/RedissonKeys.java | 28 ++++++++++ .../java/org/redisson/RedissonObject.java | 10 ++++ .../org/redisson/RedissonPriorityDeque.java | 6 ++ .../org/redisson/RedissonScoredSortedSet.java | 21 +++++++ .../main/java/org/redisson/RedissonSet.java | 16 ++++++ .../java/org/redisson/RedissonSetCache.java | 16 ++++++ .../redisson/RedissonSetMultimapValues.java | 16 ++++++ .../src/main/java/org/redisson/api/RKeys.java | 55 +++++++++++++++++++ .../java/org/redisson/api/RPriorityDeque.java | 8 +++ .../org/redisson/api/RScoredSortedSet.java | 41 +++++++++++++- .../src/main/java/org/redisson/api/RSet.java | 29 ++++++++++ .../main/java/org/redisson/api/RSetCache.java | 29 ++++++++++ 12 files changed, 274 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index f3445d95c..39fb3c3ca 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -21,10 +21,14 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.redisson.api.RFuture; import org.redisson.api.RKeys; @@ -516,5 +520,29 @@ public class RedissonKeys implements RKeys { return commandExecutor.writeAsync(name, RedisCommands.MOVE, name, database); } + @Override + public Stream getKeysStreamByPattern(String pattern) { + return toStream(getKeysByPattern(pattern).iterator()); + } + + protected Stream toStream(Iterator iterator) { + Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); + return StreamSupport.stream(spliterator, false); + } + + @Override + public Stream getKeysStreamByPattern(String pattern, int count) { + return toStream(getKeysByPattern(pattern, count).iterator()); + } + + @Override + public Stream getKeysStream() { + return toStream(getKeys().iterator()); + } + + @Override + public Stream getKeysStream(int count) { + return toStream(getKeys(count).iterator()); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonObject.java b/redisson/src/main/java/org/redisson/RedissonObject.java index b65a639e3..4daf32ac4 100644 --- a/redisson/src/main/java/org/redisson/RedissonObject.java +++ b/redisson/src/main/java/org/redisson/RedissonObject.java @@ -19,8 +19,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import org.redisson.api.RFuture; import org.redisson.api.RObject; @@ -73,6 +78,11 @@ public abstract class RedissonObject implements RObject { return "{" + name + "}:" + suffix; } + protected final Stream toStream(Iterator iterator) { + Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); + return StreamSupport.stream(spliterator, false); + } + protected final V get(RFuture future) { return commandExecutor.get(future); } diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java index 612d1f792..47e728ffe 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityDeque.java @@ -17,6 +17,7 @@ package org.redisson; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.stream.Stream; import org.redisson.api.RFuture; import org.redisson.api.RPriorityDeque; @@ -228,4 +229,9 @@ public class RedissonPriorityDeque extends RedissonPriorityQueue implement return remove(o, -1); } + @Override + public Stream descendingStream() { + return toStream(descendingIterator()); + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 0aa341ec5..3aa9df393 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.redisson.api.RFuture; import org.redisson.api.RScoredSortedSet; @@ -1086,4 +1087,24 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc public V takeLast() { return get(takeLastAsync()); } + + @Override + public Stream stream() { + return toStream(iterator()); + } + + @Override + public Stream stream(String pattern) { + return toStream(iterator(pattern)); + } + + @Override + public Stream stream(int count) { + return toStream(iterator(count)); + } + + @Override + public Stream stream(String pattern, int count) { + return toStream(iterator(pattern, count)); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index a7318db63..6d0c76fec 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import org.redisson.api.RFuture; import org.redisson.api.RLock; @@ -658,4 +659,19 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt return commandExecutor.readAsync(getName(), codec, RedisCommands.SORT_SET, params.toArray()); } + + @Override + public Stream stream(int count) { + return toStream(iterator(count)); + } + + @Override + public Stream stream(String pattern, int count) { + return toStream(iterator(pattern, count)); + } + + @Override + public Stream stream(String pattern) { + return toStream(iterator(pattern)); + } } diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 76b77dc27..9c50b43b4 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.redisson.api.RFuture; import org.redisson.api.RLock; @@ -388,4 +389,19 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< } } + @Override + public Stream stream(int count) { + return toStream(iterator(count)); + } + + @Override + public Stream stream(String pattern, int count) { + return toStream(iterator(pattern, count)); + } + + @Override + public Stream stream(String pattern) { + return toStream(iterator(pattern)); + } + } diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index 5ab429cc3..fd28a83e7 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.redisson.api.RFuture; import org.redisson.api.RLock; @@ -744,5 +745,20 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R int offset, int count) { return set.sortToAsync(destName, byPattern, getPatterns, order, offset, count); } + + @Override + public Stream stream(int count) { + return toStream(iterator(count)); + } + + @Override + public Stream stream(String pattern, int count) { + return toStream(iterator(pattern, count)); + } + + @Override + public Stream stream(String pattern) { + return toStream(iterator(pattern)); + } } diff --git a/redisson/src/main/java/org/redisson/api/RKeys.java b/redisson/src/main/java/org/redisson/api/RKeys.java index a326078ce..d720ed653 100644 --- a/redisson/src/main/java/org/redisson/api/RKeys.java +++ b/redisson/src/main/java/org/redisson/api/RKeys.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; /** * @@ -200,6 +201,60 @@ public interface RKeys extends RKeysAsync { * @return Iterable object */ Iterable getKeys(int count); + + /** + * Get all keys by pattern using Stream. + * Keys traversed with SCAN operation. Each SCAN operation loads + * up to 10 keys per request. + *

+ * Supported glob-style patterns: + *

+ * h?llo subscribes to hello, hallo and hxllo + *

+ * h*llo subscribes to hllo and heeeello + *

+ * h[ae]llo subscribes to hello and hallo, but not hillo + * + * @param pattern - match pattern + * @return Iterable object + */ + Stream getKeysStreamByPattern(String pattern); + + /** + * Get all keys by pattern using Stream. + * Keys traversed with SCAN operation. Each SCAN operation loads + * up to count keys per request. + *

+ * Supported glob-style patterns: + *

+ * h?llo subscribes to hello, hallo and hxllo + *

+ * h*llo subscribes to hllo and heeeello + *

+ * h[ae]llo subscribes to hello and hallo, but not hillo + * + * @param pattern - match pattern + * @param count - keys loaded per request to Redis + * @return Iterable object + */ + Stream getKeysStreamByPattern(String pattern, int count); + + /** + * Get all keys using Stream. Keys traversing with SCAN operation. + * Each SCAN operation loads up to 10 keys per request. + * + * @return Iterable object + */ + Stream getKeysStream(); + + /** + * Get all keys using Stream. Keys traversing with SCAN operation. + * Each SCAN operation loads up to count keys per request. + * + * @param count - keys loaded per request to Redis + * @return Iterable object + */ + Stream getKeysStream(int count); /** * Get random key diff --git a/redisson/src/main/java/org/redisson/api/RPriorityDeque.java b/redisson/src/main/java/org/redisson/api/RPriorityDeque.java index f097541be..506474de3 100644 --- a/redisson/src/main/java/org/redisson/api/RPriorityDeque.java +++ b/redisson/src/main/java/org/redisson/api/RPriorityDeque.java @@ -16,6 +16,7 @@ package org.redisson.api; import java.util.Deque; +import java.util.stream.Stream; /** * @@ -25,4 +26,11 @@ import java.util.Deque; */ public interface RPriorityDeque extends Deque, RPriorityQueue { + /** + * Returns stream of elements contained in this deque in reverse order + * + * @return stream of elements in reverse order + */ + Stream descendingStream(); + } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index d8bbf29fc..5618d2cb5 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -20,15 +20,17 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.protocol.ScoredEntry; /** + * Set containing elements sorted by score. * * @author Nikita Koksharov * - * @param value + * @param object type */ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable, RExpirable, RSortable> { @@ -287,6 +289,43 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ boolean isEmpty(); + /** + * Returns stream of elements in this set. + * Elements are loaded in batch. Batch size is 10. + * + * @return stream of elements + */ + Stream stream(); + + /** + * Returns stream of elements in this set. + * If pattern is not null then only elements match this pattern are loaded. + * + * @param pattern - search pattern + * @return stream of elements + */ + Stream stream(String pattern); + + /** + * Returns stream of elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * + * @param count - size of elements batch + * @return stream of elements + */ + Stream stream(int count); + + /** + * Returns stream of elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * If pattern is not null then only elements match this pattern are loaded. + * + * @param pattern - search pattern + * @param count - size of elements batch + * @return stream of elements + */ + Stream stream(String pattern, int count); + /** * Returns an iterator over elements in this set. * If pattern is not null then only elements match this pattern are loaded. diff --git a/redisson/src/main/java/org/redisson/api/RSet.java b/redisson/src/main/java/org/redisson/api/RSet.java index 62debf2c9..56d6c800e 100644 --- a/redisson/src/main/java/org/redisson/api/RSet.java +++ b/redisson/src/main/java/org/redisson/api/RSet.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.Iterator; import java.util.Set; +import java.util.stream.Stream; import org.redisson.api.mapreduce.RCollectionMapReduce; @@ -37,6 +38,34 @@ public interface RSet extends Set, RExpirable, RSetAsync, RSortablecount param. + * + * @param count - size of elements batch + * @return stream of elements + */ + Stream stream(int count); + + /** + * Returns stream of elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * If pattern is not null then only elements match this pattern are loaded. + * + * @param pattern - search pattern + * @param count - size of elements batch + * @return stream of elements + */ + Stream stream(String pattern, int count); + + /** + * Returns stream of elements in this set matches pattern. + * + * @param pattern - search pattern + * @return stream of elements + */ + Stream stream(String pattern); + /** * Returns an iterator over elements in this set. * Elements are loaded in batch. Batch size is defined by count param. diff --git a/redisson/src/main/java/org/redisson/api/RSetCache.java b/redisson/src/main/java/org/redisson/api/RSetCache.java index ae78e7d07..2f4f53dee 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCache.java +++ b/redisson/src/main/java/org/redisson/api/RSetCache.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Iterator; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.redisson.api.mapreduce.RCollectionMapReduce; @@ -48,6 +49,34 @@ public interface RSetCache extends Set, RExpirable, RSetCacheAsync, RDe */ RLock getLock(V value); + /** + * Returns stream of elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * + * @param count - size of elements batch + * @return stream of elements + */ + Stream stream(int count); + + /** + * Returns stream of elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * If pattern is not null then only elements match this pattern are loaded. + * + * @param pattern - search pattern + * @param count - size of elements batch + * @return stream of elements + */ + Stream stream(String pattern, int count); + + /** + * Returns stream of elements in this set matches pattern. + * + * @param pattern - search pattern + * @return stream of elements + */ + Stream stream(String pattern); + /** * Returns an iterator over elements in this set. * Elements are loaded in batch. Batch size is defined by count param.