Feature - stream methods added to RKeys, RDeque, RScoredSortedSet, RSet, RSetCache object. #1745

pull/1833/head
Nikita Koksharov 6 years ago
parent 7be83688d3
commit 79abeec89f

@ -21,10 +21,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.redisson.api.RFuture;
import org.redisson.api.RKeys;
@ -516,5 +520,29 @@ public class RedissonKeys implements RKeys {
return commandExecutor.writeAsync(name, RedisCommands.MOVE, name, database);
}
@Override
public Stream<String> getKeysStreamByPattern(String pattern) {
return toStream(getKeysByPattern(pattern).iterator());
}
protected <T> Stream<T> toStream(Iterator<T> iterator) {
Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL);
return StreamSupport.stream(spliterator, false);
}
@Override
public Stream<String> getKeysStreamByPattern(String pattern, int count) {
return toStream(getKeysByPattern(pattern, count).iterator());
}
@Override
public Stream<String> getKeysStream() {
return toStream(getKeys().iterator());
}
@Override
public Stream<String> getKeysStream(int count) {
return toStream(getKeys(count).iterator());
}
}

@ -19,8 +19,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.redisson.api.RFuture;
import org.redisson.api.RObject;
@ -73,6 +78,11 @@ public abstract class RedissonObject implements RObject {
return "{" + name + "}:" + suffix;
}
protected final <T> Stream<T> toStream(Iterator<T> iterator) {
Spliterator<T> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL);
return StreamSupport.stream(spliterator, false);
}
protected final <V> V get(RFuture<V> future) {
return commandExecutor.get(future);
}

@ -17,6 +17,7 @@ package org.redisson;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.stream.Stream;
import org.redisson.api.RFuture;
import org.redisson.api.RPriorityDeque;
@ -228,4 +229,9 @@ public class RedissonPriorityDeque<V> extends RedissonPriorityQueue<V> implement
return remove(o, -1);
}
@Override
public Stream<V> descendingStream() {
return toStream(descendingIterator());
}
}

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet;
@ -1086,4 +1087,24 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
public V takeLast() {
return get(takeLastAsync());
}
@Override
public Stream<V> stream() {
return toStream(iterator());
}
@Override
public Stream<V> stream(String pattern) {
return toStream(iterator(pattern));
}
@Override
public Stream<V> stream(int count) {
return toStream(iterator(count));
}
@Override
public Stream<V> stream(String pattern, int count) {
return toStream(iterator(pattern, count));
}
}

@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
@ -658,4 +659,19 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return commandExecutor.readAsync(getName(), codec, RedisCommands.SORT_SET, params.toArray());
}
@Override
public Stream<V> stream(int count) {
return toStream(iterator(count));
}
@Override
public Stream<V> stream(String pattern, int count) {
return toStream(iterator(pattern, count));
}
@Override
public Stream<V> stream(String pattern) {
return toStream(iterator(pattern));
}
}

@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
@ -388,4 +389,19 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
}
@Override
public Stream<V> stream(int count) {
return toStream(iterator(count));
}
@Override
public Stream<V> stream(String pattern, int count) {
return toStream(iterator(pattern, count));
}
@Override
public Stream<V> stream(String pattern) {
return toStream(iterator(pattern));
}
}

@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
@ -745,4 +746,19 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return set.sortToAsync(destName, byPattern, getPatterns, order, offset, count);
}
@Override
public Stream<V> stream(int count) {
return toStream(iterator(count));
}
@Override
public Stream<V> stream(String pattern, int count) {
return toStream(iterator(pattern, count));
}
@Override
public Stream<V> stream(String pattern) {
return toStream(iterator(pattern));
}
}

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
*
@ -201,6 +202,60 @@ public interface RKeys extends RKeysAsync {
*/
Iterable<String> getKeys(int count);
/**
* Get all keys by pattern using Stream.
* Keys traversed with SCAN operation. Each SCAN operation loads
* up to <b>10</b> keys per request.
* <p>
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @return Iterable object
*/
Stream<String> getKeysStreamByPattern(String pattern);
/**
* Get all keys by pattern using Stream.
* Keys traversed with SCAN operation. Each SCAN operation loads
* up to <code>count</code> keys per request.
* <p>
* Supported glob-style patterns:
* <p>
* h?llo subscribes to hello, hallo and hxllo
* <p>
* h*llo subscribes to hllo and heeeello
* <p>
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern - match pattern
* @param count - keys loaded per request to Redis
* @return Iterable object
*/
Stream<String> getKeysStreamByPattern(String pattern, int count);
/**
* Get all keys using Stream. Keys traversing with SCAN operation.
* Each SCAN operation loads up to <code>10</code> keys per request.
*
* @return Iterable object
*/
Stream<String> getKeysStream();
/**
* Get all keys using Stream. Keys traversing with SCAN operation.
* Each SCAN operation loads up to <code>count</code> keys per request.
*
* @param count - keys loaded per request to Redis
* @return Iterable object
*/
Stream<String> getKeysStream(int count);
/**
* Get random key
*

@ -16,6 +16,7 @@
package org.redisson.api;
import java.util.Deque;
import java.util.stream.Stream;
/**
*
@ -25,4 +26,11 @@ import java.util.Deque;
*/
public interface RPriorityDeque<V> extends Deque<V>, RPriorityQueue<V> {
/**
* Returns stream of elements contained in this deque in reverse order
*
* @return stream of elements in reverse order
*/
Stream<V> descendingStream();
}

@ -20,15 +20,17 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.protocol.ScoredEntry;
/**
* Set containing elements sorted by score.
*
* @author Nikita Koksharov
*
* @param <V> value
* @param <V> object type
*/
public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<V>, RExpirable, RSortable<Set<V>> {
@ -287,6 +289,43 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
boolean isEmpty();
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is 10.
*
* @return stream of elements
*/
Stream<V> stream();
/**
* Returns stream of elements in this set.
* If <code>pattern</code> is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @return stream of elements
*/
Stream<V> stream(String pattern);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(int count);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(String pattern, int count);
/**
* Returns an iterator over elements in this set.
* If <code>pattern</code> is not null then only elements match this pattern are loaded.

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Stream;
import org.redisson.api.mapreduce.RCollectionMapReduce;
@ -37,6 +38,34 @@ public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set
*/
RLock getLock(V value);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(int count);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(String pattern, int count);
/**
* Returns stream of elements in this set matches <code>pattern</code>.
*
* @param pattern - search pattern
* @return stream of elements
*/
Stream<V> stream(String pattern);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.redisson.api.mapreduce.RCollectionMapReduce;
@ -48,6 +49,34 @@ public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V>, RDe
*/
RLock getLock(V value);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(int count);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(String pattern, int count);
/**
* Returns stream of elements in this set matches <code>pattern</code>.
*
* @param pattern - search pattern
* @return stream of elements
*/
Stream<V> stream(String pattern);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.

Loading…
Cancel
Save