From ae15cc56cae7b1ab81cc844867de1ee154b9b722 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 27 Jun 2018 13:12:57 +0300 Subject: [PATCH] Iterator with batch size param for all collections. #1519 --- .../main/java/org/redisson/RedissonSet.java | 14 +++---- .../java/org/redisson/api/RKeysReactive.java | 40 ++++++++++++++++--- .../java/org/redisson/api/RSetReactive.java | 29 ++++++++++++++ .../reactive/RedissonKeysReactive.java | 24 +++++++---- .../reactive/RedissonSetReactive.java | 28 +++++++++++-- 5 files changed, 110 insertions(+), 25 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 28d753886..f79a7d7c9 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -93,13 +93,7 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt @Override public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { - if (pattern == null) { - RFuture> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "COUNT", count); - return get(f); - } - - RFuture> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern, "COUNT", count); - return get(f); + return get(scanIteratorAsync(name, client, startPos, pattern, count)); } @Override @@ -577,7 +571,11 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt @Override public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) { - throw new UnsupportedOperationException(); + if (pattern == null) { + return commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "COUNT", count); + } + + return commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern, "COUNT", count); } } diff --git a/redisson/src/main/java/org/redisson/api/RKeysReactive.java b/redisson/src/main/java/org/redisson/api/RKeysReactive.java index de51ee2e7..6997d4831 100644 --- a/redisson/src/main/java/org/redisson/api/RKeysReactive.java +++ b/redisson/src/main/java/org/redisson/api/RKeysReactive.java @@ -141,18 +141,27 @@ public interface RKeysReactive { Publisher getType(String key); /** - * Load keys in incrementally iterate mode. + * Load keys in incrementally iterate mode. Keys traversed with SCAN operation. + * Each SCAN operation loads up to 10 keys per request. * - * Uses SCAN Redis command. - * - * @return all keys + * @return keys */ Publisher getKeys(); + + /** + * Load keys in incrementally iterate mode. Keys traversed with SCAN operation. + * Each SCAN operation loads up to count keys per request. + * + * @param count - keys loaded per request to Redis + * @return keys + */ + Publisher getKeys(int count); /** * Find keys by pattern and load it in incrementally iterate mode. - * - * Uses SCAN Redis command. + * Keys traversed with SCAN operation. + * Each SCAN operation loads up to 10 keys per request. + *

* * Supported glob-style patterns: * h?llo subscribes to hello, hallo and hxllo @@ -164,6 +173,25 @@ public interface RKeysReactive { */ Publisher getKeysByPattern(String pattern); + /** + * Get all keys by pattern using iterator. + * Keys traversed with SCAN operation. Each SCAN operation loads + * up to count keys per request. + *

+ * Supported glob-style patterns: + *

+ * h?llo subscribes to hello, hallo and hxllo + *

+ * h*llo subscribes to hllo and heeeello + *

+ * h[ae]llo subscribes to hello and hallo, but not hillo + * + * @param pattern - match pattern + * @param count - keys loaded per request to Redis + * @return keys + */ + Publisher getKeysByPattern(String pattern, int count); + /** * Get hash slot identifier for key. * Available for cluster nodes only. diff --git a/redisson/src/main/java/org/redisson/api/RSetReactive.java b/redisson/src/main/java/org/redisson/api/RSetReactive.java index c23f87283..c04dbd404 100644 --- a/redisson/src/main/java/org/redisson/api/RSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSetReactive.java @@ -15,6 +15,7 @@ */ package org.redisson.api; +import java.util.Iterator; import java.util.Set; import org.reactivestreams.Publisher; @@ -28,6 +29,34 @@ import org.reactivestreams.Publisher; */ public interface RSetReactive extends RCollectionReactive { + /** + * Returns an iterator over elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * + * @param count - size of elements batch + * @return iterator + */ + Publisher iterator(int count); + + /** + * Returns an iterator over elements in this set. + * Elements are loaded in batch. Batch size is defined by count param. + * If pattern is not null then only elements match this pattern are loaded. + * + * @param pattern - search pattern + * @param count - size of elements batch + * @return iterator + */ + Publisher iterator(String pattern, int count); + + /** + * Returns iterator over elements in this set matches pattern. + * + * @param pattern - search pattern + * @return iterator + */ + Publisher iterator(String pattern); + /** * Removes and returns random elements from set * in async mode diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index 08dfe44b4..c92f24b64 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -66,10 +66,15 @@ public class RedissonKeysReactive implements RKeysReactive { } @Override - public Publisher getKeysByPattern(final String pattern) { + public Publisher getKeysByPattern(String pattern) { + return getKeysByPattern(pattern, 10); + } + + @Override + public Publisher getKeysByPattern(String pattern, int count) { List> publishers = new ArrayList>(); for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { - publishers.add(createKeysIterator(entry, pattern)); + publishers.add(createKeysIterator(entry, pattern, count)); } return Streams.merge(publishers); } @@ -78,15 +83,20 @@ public class RedissonKeysReactive implements RKeysReactive { public Publisher getKeys() { return getKeysByPattern(null); } + + @Override + public Publisher getKeys(int count) { + return getKeysByPattern(null, count); + } - private Publisher> scanIterator(MasterSlaveEntry entry, long startPos, String pattern) { + private Publisher> scanIterator(MasterSlaveEntry entry, long startPos, String pattern, int count) { if (pattern == null) { - return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos); + return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); } - return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern); + return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); } - private Publisher createKeysIterator(final MasterSlaveEntry entry, final String pattern) { + private Publisher createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) { return new Stream() { @Override @@ -106,7 +116,7 @@ public class RedissonKeysReactive implements RKeysReactive { protected void nextValues() { final ReactiveSubscription m = this; - scanIterator(entry, nextIterPos, pattern).subscribe(new Subscriber>() { + scanIterator(entry, nextIterPos, pattern, count).subscribe(new Subscriber>() { @Override public void onSubscribe(Subscription s) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index b0cf7f496..823b4261f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -109,8 +109,13 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements }); } - private Publisher> scanIteratorReactive(RedisClient client, long startPos) { - return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); + private Publisher> scanIteratorReactive(final RedisClient client, final long startPos, final String pattern, final int count) { + return reactive(new Supplier>>() { + @Override + public RFuture> get() { + return ((RedissonSet)instance).scanIteratorAsync(getName(), client, startPos, pattern, count); + } + }); } @Override @@ -249,15 +254,30 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements } }); } + + @Override + public Publisher iterator(int count) { + return iterator(null, count); + } + + @Override + public Publisher iterator(String pattern) { + return iterator(pattern, 10); + } @Override - public Publisher iterator() { + public Publisher iterator(final String pattern, final int count) { return new SetReactiveIterator() { @Override protected Publisher> scanIteratorReactive(RedisClient client, long nextIterPos) { - return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos); + return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos, pattern, count); } }; } + @Override + public Publisher iterator() { + return iterator(null, 10); + } + }