diff --git a/redisson/src/main/java/org/redisson/RedissonLexSortedSet.java b/redisson/src/main/java/org/redisson/RedissonLexSortedSet.java index fb2bd5cb9..313e4a45c 100644 --- a/redisson/src/main/java/org/redisson/RedissonLexSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonLexSortedSet.java @@ -18,6 +18,7 @@ package org.redisson; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.SortedSet; @@ -318,6 +319,11 @@ public class RedissonLexSortedSet extends RedissonScoredSortedSet implem throw new UnsupportedOperationException(); } + @Override + public Iterator distributedIterator(String iteratorName, int count) { + return distributedIterator(iteratorName, null, count); + } + @Override public Comparator comparator() { return null; diff --git a/redisson/src/main/java/org/redisson/RedissonList.java b/redisson/src/main/java/org/redisson/RedissonList.java index 7b3730c61..bacb287d3 100644 --- a/redisson/src/main/java/org/redisson/RedissonList.java +++ b/redisson/src/main/java/org/redisson/RedissonList.java @@ -18,6 +18,7 @@ package org.redisson; import org.redisson.api.*; import org.redisson.api.listener.*; import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; import org.redisson.client.RedisException; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; @@ -27,6 +28,7 @@ import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; import org.redisson.client.protocol.convertor.Convertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.iterator.RedissonBaseIterator; import org.redisson.iterator.RedissonListIterator; import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.misc.CountableListener; @@ -300,7 +302,55 @@ public class RedissonList extends RedissonExpirable implements RList { public List get(int...indexes) { return get(getAsync(indexes)); } - + + @Override + public Iterator distributedIterator(final int count) { + String iteratorName = "__redisson_list_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, count); + } + + @Override + public Iterator distributedIterator(final String iteratorName, final int count) { + return new RedissonBaseIterator() { + + @Override + protected ScanResult iterator(RedisClient client, long nextIterPos) { + return distributedScanIterator(iteratorName, count); + } + + @Override + protected void remove(Object value) { + RedissonList.this.remove((V) value); + } + }; + } + + private ScanResult distributedScanIterator(String iteratorName, int count) { + return get(distributedScanIteratorAsync(iteratorName, count)); + } + + private RFuture> distributedScanIteratorAsync(String iteratorName, int count) { + return commandExecutor.evalWriteAsync(getRawName(), codec, EVAL_LIST_SCAN, + "local start_index = redis.call('get', KEYS[2]); " + + "if start_index ~= false then " + + "start_index = tonumber(start_index); " + + "else " + + "start_index = 0;" + + "end;" + + "if start_index == -1 then " + + "return {0, {}};" + + "end;" + + "local end_index = start_index + ARGV[1];" + + "local result; " + + "result = redis.call('lrange', KEYS[1], start_index, end_index - 1); " + + "if end_index > redis.call('llen', KEYS[1]) then " + + "end_index = -1;" + + "end; " + + "redis.call('setex', KEYS[2], 3600, end_index);" + + "return {end_index, result};", + Arrays.asList(getRawName(), iteratorName), count); + } + public RFuture> getAsync(int...indexes) { List params = new ArrayList(); for (Integer index : indexes) { diff --git a/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java index e65658cce..3169a983c 100644 --- a/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java @@ -30,6 +30,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RList; import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -37,6 +38,7 @@ import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor; import org.redisson.client.protocol.convertor.Convertor; import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.iterator.RedissonBaseIterator; /** * List based Multimap Cache values holder @@ -363,7 +365,65 @@ public class RedissonListMultimapValues extends RedissonExpirable implements public List get(int...indexes) { return get(getAsync(indexes)); } - + + @Override + public Iterator distributedIterator(final int count) { + String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, count); + } + + @Override + public Iterator distributedIterator(final String iteratorName, final int count) { + return new RedissonBaseIterator() { + + @Override + protected ScanResult iterator(RedisClient client, long nextIterPos) { + return distributedScanIterator(iteratorName, count); + } + + @Override + protected void remove(Object value) { + RedissonListMultimapValues.this.remove((V) value); + } + }; + } + + private ScanResult distributedScanIterator(String iteratorName, int count) { + return get(distributedScanIteratorAsync(iteratorName, count)); + } + + private RFuture> distributedScanIteratorAsync(String iteratorName, int count) { + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SSCAN, + "local cursor = redis.call('get', KEYS[3]); " + + "if cursor ~= false then " + + "cursor = tonumber(cursor); " + + "else " + + "cursor = 0;" + + "end;" + + "if start_index == -1 then " + + "return {0, {}}; " + + "end;" + + "local end_index = start_index + ARGV[1];" + + "local result; " + + "result = redis.call('lrange', KEYS[1], start_index, end_index - 1); " + + "if end_index > redis.call('llen', KEYS[1]) then " + + "end_index = -1;" + + "end; " + + "redis.call('setex', KEYS[2], 3600, end_index);" + + "local expireDate = 92233720368547758; " + + "local expirations = redis.call('zmscore', KEYS[1], result[2])" + + "for i = #expirations, 1, -1 do " + + "if expirations[i] ~= false then " + + "local expireDate = tonumber(expireDateScore) " + + "if expireDate <= tonumber(ARGV[1]) then " + + "table.remove(result[2], i);" + + "end; " + + "end; " + + "end; " + + "return {end_index, result[2]};", + Arrays.asList(timeoutSetName, getRawName(), iteratorName), System.currentTimeMillis(), count); + } + @Override public RFuture> getAsync(int...indexes) { List params = new ArrayList(); diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 8e3ded545..8767fd766 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -532,7 +532,78 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc }; } - + + @Override + public Iterator distributedIterator(final String pattern) { + String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, pattern, 10); + } + + @Override + public Iterator distributedIterator(final int count) { + String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, null, count); + } + + @Override + public Iterator distributedIterator(final String iteratorName, final String pattern, final int count) { + return new RedissonBaseIterator() { + + @Override + protected ScanResult iterator(RedisClient client, long nextIterPos) { + return distributedScanIterator(iteratorName, pattern, count); + } + + @Override + protected void remove(Object value) { + RedissonScoredSortedSet.this.remove((V) value); + } + }; + } + + private ScanResult distributedScanIterator(String iteratorName, String pattern, int count) { + return get(distributedScanIteratorAsync(iteratorName, pattern, count)); + } + + private RFuture> distributedScanIteratorAsync(String iteratorName, String pattern, int count) { + List args = new ArrayList<>(2); + if (pattern != null) { + args.add(pattern); + } + args.add(count); + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_ZSCAN, + "local cursor = redis.call('get', KEYS[3]); " + + "if cursor ~= false then " + + "cursor = tonumber(cursor); " + + "else " + + "cursor = 0;" + + "end;" + + "if start_index == -1 then " + + "return {0, {}}; " + + "end;" + + "local result; " + + "if (#ARGV == 2) then " + + "result = redis.call('zscan', KEYS[1], cursor, 'match', ARGV[1], 'count', ARGV[2]); " + + "else " + + "result = redis.call('zscan', KEYS[1], cursor, 'count', ARGV[1]); " + + "end;" + + "local next_cursor = result[1]" + + "if next_cursor ~= \"0\" then " + + "redis.call('setex', KEYS[2], 3600, next_cursor);" + + "else " + + "redis.call('setex', KEYS[2], 3600, -1);" + + "end; " + + "local res = {};" + + "for i, value in ipairs(result[2]) do " + + "if i % 2 == 0 then " + + "table.insert(res, result[2][i-1]); " + + "end; " + + "end;" + + "return {result[1], res};", + Arrays.asList(getRawName(), iteratorName), args.toArray()); + } + @Override public Object[] toArray() { List res = (List) get(valueRangeAsync(0, -1)); diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 47b9b5c3f..9de9c70e1 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -111,7 +111,72 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt }; } - + + @Override + public Iterator distributedIterator(final String pattern) { + String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, pattern, 10); + } + + @Override + public Iterator distributedIterator(final int count) { + String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, null, count); + } + + @Override + public Iterator distributedIterator(final String iteratorName, final String pattern, final int count) { + return new RedissonBaseIterator() { + + @Override + protected ScanResult iterator(RedisClient client, long nextIterPos) { + return distributedScanIterator(iteratorName, pattern, count); + } + + @Override + protected void remove(Object value) { + RedissonSet.this.remove((V) value); + } + }; + } + + private ScanResult distributedScanIterator(String iteratorName, String pattern, int count) { + return get(distributedScanIteratorAsync(iteratorName, pattern, count)); + } + + private RFuture> distributedScanIteratorAsync(String iteratorName, String pattern, int count) { + List args = new ArrayList<>(2); + if (pattern != null) { + args.add(pattern); + } + args.add(count); + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SSCAN, + "local cursor = redis.call('get', KEYS[2]); " + + "if cursor ~= false then " + + "cursor = tonumber(cursor); " + + "else " + + "cursor = 0;" + + "end;" + + "if start_index == -1 then " + + "return {0, {}}; " + + "end;" + + "local result; " + + "if (#ARGV == 2) then " + + "result = redis.call('sscan', KEYS[1], cursor, 'match', ARGV[1], 'count', ARGV[2]); " + + "else " + + "result = redis.call('sscan', KEYS[1], cursor, 'count', ARGV[1]); " + + "end;" + + "local next_cursor = result[1]" + + "if next_cursor ~= \"0\" then " + + "redis.call('setex', KEYS[2], 3600, next_cursor);" + + "else " + + "redis.call('setex', KEYS[2], 3600, -1);" + + "end; " + + "return result;", + Arrays.asList(getRawName(), iteratorName), args.toArray()); + } + @Override public Iterator iterator() { return iterator(null); diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index 2be4f852f..47d7c2381 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -230,7 +230,84 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R public Iterator iterator(String pattern) { return iterator(pattern, 10); } - + + @Override + public Iterator distributedIterator(final String pattern) { + String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, pattern, 10); + } + + @Override + public Iterator distributedIterator(final int count) { + String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, null, count); + } + + @Override + public Iterator distributedIterator(final String iteratorName, final String pattern, final int count) { + return new RedissonBaseIterator() { + + @Override + protected ScanResult iterator(RedisClient client, long nextIterPos) { + return distributedScanIterator(iteratorName, pattern, count); + } + + @Override + protected void remove(Object value) { + RedissonSetMultimapValues.this.remove((V) value); + } + }; + } + + private ScanResult distributedScanIterator(String iteratorName, String pattern, int count) { + return get(distributedScanIteratorAsync(iteratorName, pattern, count)); + } + + private RFuture> distributedScanIteratorAsync(String iteratorName, String pattern, int count) { + List args = new ArrayList<>(3); + args.add(System.currentTimeMillis()); + if (pattern != null) { + args.add(pattern); + } + args.add(count); + + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SSCAN, + "local cursor = redis.call('get', KEYS[3]); " + + "if cursor ~= false then " + + "cursor = tonumber(cursor); " + + "else" + + " cursor = 0;" + + "end;" + + "if start_index == -1 then " + + "return {0, {}}; " + + "end;" + + "local result; " + + "if (#ARGV == 3) then " + + "result = redis.call('sscan', KEYS[2], cursor, 'match', ARGV[2], 'count', ARGV[3]); " + + "else" + + "result = redis.call('sscan', KEYS[2], cursor, 'count', ARGV[2]); " + + "end;" + + "local next_cursor = result[1]" + + "if next_cursor ~= \"0\" then " + + "redis.call('setex', KEYS[3], 3600, next_cursor);" + + "else " + + "redis.call('setex', KEYS[3], 3600, -1);" + + "end; " + + + "local expireDate = 92233720368547758; " + + "local expirations = redis.call('zmscore', KEYS[1], result[2])" + + "for i = #expirations, 1, -1 do " + + "if expirations[i] ~= false then " + + "local expireDate = tonumber(expireDateScore) " + + "if expireDate <= tonumber(ARGV[1]) then " + + "table.remove(result[2], i);" + + "end; " + + "end; " + + "end; " + + "return result;", + Arrays.asList(timeoutSetName, getRawName(), iteratorName), args.toArray()); + } + @Override public Iterator iterator(final String pattern, final int count) { return new RedissonBaseIterator() { diff --git a/redisson/src/main/java/org/redisson/RedissonSortedSet.java b/redisson/src/main/java/org/redisson/RedissonSortedSet.java index 5d240b771..413c78abd 100644 --- a/redisson/src/main/java/org/redisson/RedissonSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSortedSet.java @@ -18,10 +18,12 @@ package org.redisson; import io.netty.buffer.ByteBuf; import org.redisson.api.*; import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.iterator.RedissonBaseIterator; import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -32,6 +34,8 @@ import java.math.BigInteger; import java.security.MessageDigest; import java.util.*; +import static org.redisson.client.protocol.RedisCommands.EVAL_LIST_SCAN; + /** * * @author Nikita Koksharov @@ -387,7 +391,55 @@ public class RedissonSortedSet extends RedissonObject implements RSortedSet distributedIterator(final int count) { + String iteratorName = "__redisson_sorted_set_cursor_{" + getRawName() + "}"; + return distributedIterator(iteratorName, count); + } + + @Override + public Iterator distributedIterator(final String iteratorName, final int count) { + return new RedissonBaseIterator() { + + @Override + protected ScanResult iterator(RedisClient client, long nextIterPos) { + return distributedScanIterator(iteratorName, count); + } + + @Override + protected void remove(Object value) { + RedissonSortedSet.this.remove((V) value); + } + }; + } + + private ScanResult distributedScanIterator(String iteratorName, int count) { + return get(distributedScanIteratorAsync(iteratorName, count)); + } + + private RFuture> distributedScanIteratorAsync(String iteratorName, int count) { + return commandExecutor.evalWriteAsync(getRawName(), codec, EVAL_LIST_SCAN, + "local start_index = redis.call('get', KEYS[2]); " + + "if start_index ~= false then " + + "start_index = tonumber(start_index); " + + "else " + + "start_index = 0;" + + "end;" + + "if start_index == -1 then " + + "return {0, {}}; " + + "end;" + + "local end_index = start_index + ARGV[1];" + + "local result; " + + "result = redis.call('lrange', KEYS[1], start_index, end_index - 1); " + + "if end_index > redis.call('llen', KEYS[1]) then " + + "end_index = -1;" + + "end; " + + "redis.call('setex', KEYS[2], 3600, end_index);" + + "return {end_index, result};", + Arrays.asList(getRawName(), iteratorName), count); + } + // TODO optimize: get three values each time instead of single public BinarySearchResult binarySearch(V value, Codec codec) { int size = list.size(); diff --git a/redisson/src/main/java/org/redisson/api/RList.java b/redisson/src/main/java/org/redisson/api/RList.java index e4c11cae1..68d29eb53 100644 --- a/redisson/src/main/java/org/redisson/api/RList.java +++ b/redisson/src/main/java/org/redisson/api/RList.java @@ -15,6 +15,7 @@ */ package org.redisson.api; +import java.util.Iterator; import java.util.List; import java.util.RandomAccess; @@ -36,7 +37,26 @@ public interface RList extends List, RExpirable, RListAsync, RSortable< * @return list of elements */ List get(int...indexes); - + + /** + * Returns element iterator that can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * See {@linkplain RList#distributedIterator(String, int)} for creating different iterators. + * @param count batch size + * @return shared elements iterator + */ + Iterator distributedIterator(int count); + + /** + * Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * Iterator name must be resolved to the same hash slot as list name. + * @param count batch size + * @param iteratorName redis object name to which cursor will be saved + * @return shared elements iterator + */ + Iterator distributedIterator(String iteratorName, int count); + /** * Returns RMapReduce object associated with this map * @@ -143,5 +163,4 @@ public interface RList extends List, RExpirable, RListAsync, RSortable< * @return listener id */ int addListener(ObjectListener listener); - } diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index ae1ec1d0f..ff56c9023 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -468,6 +468,35 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ Iterator iterator(String pattern, int count); + /** + * Returns element iterator that can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * See {@linkplain RSet#distributedIterator(String, String, int)} for creating different iterators. + * @param count batch size + * @return shared elements iterator + */ + Iterator distributedIterator(int count); + + /** + * Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * See {@linkplain RSet#distributedIterator(String, String, int)} for creating different iterators. + * @param pattern element pattern + * @return shared elements iterator + */ + Iterator distributedIterator(String pattern); + + /** + * Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * Iterator name must be resolved to the same hash slot as set name. + * @param pattern element pattern + * @param count batch size + * @param iteratorName redis object name to which cursor will be saved + * @return shared elements iterator + */ + Iterator distributedIterator(String iteratorName, String pattern, int count); + /** * Returns true if this sorted set contains encoded state of the specified element. * diff --git a/redisson/src/main/java/org/redisson/api/RSet.java b/redisson/src/main/java/org/redisson/api/RSet.java index bee2779c1..51cc98775 100644 --- a/redisson/src/main/java/org/redisson/api/RSet.java +++ b/redisson/src/main/java/org/redisson/api/RSet.java @@ -154,7 +154,36 @@ public interface RSet extends Set, RExpirable, RSetAsync, RSortable iterator(String pattern); - + + /** + * Returns element iterator that can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * See {@linkplain RSet#distributedIterator(String, String, int)} for creating different iterators. + * @param count batch size + * @return shared elements iterator + */ + Iterator distributedIterator(int count); + + /** + * Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * See {@linkplain RSet#distributedIterator(String, String, int)} for creating different iterators. + * @param pattern element pattern + * @return shared elements iterator + */ + Iterator distributedIterator(String pattern); + + /** + * Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * Iterator name must be resolved to the same hash slot as set name. + * @param pattern element pattern + * @param count batch size + * @param iteratorName redis object name to which cursor will be saved + * @return shared elements iterator + */ + Iterator distributedIterator(String iteratorName, String pattern, int count); + /** * Returns RMapReduce object associated with this object * diff --git a/redisson/src/main/java/org/redisson/api/RSortedSet.java b/redisson/src/main/java/org/redisson/api/RSortedSet.java index 970e4607a..210af0efd 100644 --- a/redisson/src/main/java/org/redisson/api/RSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RSortedSet.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.Collection; import java.util.Comparator; +import java.util.Iterator; import java.util.SortedSet; import org.redisson.api.mapreduce.RCollectionMapReduce; @@ -55,4 +56,23 @@ public interface RSortedSet extends SortedSet, RObject { */ boolean trySetComparator(Comparator comparator); + /** + * Returns element iterator that can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * See {@linkplain RList#distributedIterator(String, int)} for creating different iterators. + * @param count batch size + * @return shared elements iterator + */ + Iterator distributedIterator(int count); + + /** + * Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications. + * Creating multiple iterators on the same object with this method will result in a single shared iterator. + * Iterator name must be resolved to the same hash slot as list name. + * @param count batch size + * @param iteratorName redis object name to which cursor will be saved + * @return shared elements iterator + */ + Iterator distributedIterator(String iteratorName, int count); + } diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 549be07cd..ba5358fcb 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -241,6 +241,7 @@ public interface RedisCommands { RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); RedisCommand> EVAL_LIST_REVERSE = new RedisCommand>("EVAL", new ObjectListReplayDecoder<>(true)); RedisCommand> EVAL_INT_LIST = new RedisCommand("EVAL", new ObjectListReplayDecoder(), new IntegerReplayConvertor()); + RedisCommand> EVAL_LIST_SCAN = new RedisCommand>("EVAL", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder())); RedisCommand> EVAL_SET = new RedisCommand>("EVAL", new ObjectSetReplayDecoder()); RedisCommand EVAL_OBJECT = new RedisCommand("EVAL"); RedisCommand EVAL_MAP_VALUE = new RedisCommand("EVAL", new MapValueDecoder()); diff --git a/redisson/src/test/java/org/redisson/RedissonListMultimapTest.java b/redisson/src/test/java/org/redisson/RedissonListMultimapTest.java index 8acecce72..39e180367 100644 --- a/redisson/src/test/java/org/redisson/RedissonListMultimapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonListMultimapTest.java @@ -3,11 +3,15 @@ package org.redisson; import org.junit.jupiter.api.Test; import org.redisson.api.RList; import org.redisson.api.RListMultimap; +import org.redisson.client.codec.StringCodec; import java.io.Serializable; import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RedissonListMultimapTest extends BaseTest { @@ -360,5 +364,37 @@ public class RedissonListMultimapTest extends BaseTest { assertThat(allValues).containsExactlyElementsOf(values); } + @Test + public void testDistributedIterator() { + RListMultimap map = redisson.getListMultimap("set", StringCodec.INSTANCE); + + // populate set with elements + List stringsOne = IntStream.range(0, 64).mapToObj(i -> "" + i).collect(Collectors.toList()); + map.putAll("someKey", stringsOne); + + Iterator stringIterator = map.get("someKey") + .distributedIterator("iterator_{set}", 10); + + // read some elements using iterator + List strings = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + if (stringIterator.hasNext()) { + strings.add(stringIterator.next()); + } + } + + // create another iterator instance using the same name + RListMultimap map2 = redisson.getListMultimap("set", StringCodec.INSTANCE); + Iterator stringIterator2 = map2.get("someKey") + .distributedIterator("iterator_{set}", 10); + assertTrue(stringIterator2.hasNext()); + + // read all remaining elements + stringIterator2.forEachRemaining(strings::add); + stringIterator.forEachRemaining(strings::add); + + assertThat(strings).containsAll(stringsOne); + assertThat(strings).hasSize(stringsOne.size()); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonListTest.java b/redisson/src/test/java/org/redisson/RedissonListTest.java index 62b1878ba..fec85bb5e 100644 --- a/redisson/src/test/java/org/redisson/RedissonListTest.java +++ b/redisson/src/test/java/org/redisson/RedissonListTest.java @@ -15,8 +15,11 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RedissonListTest extends BaseTest { @@ -1319,4 +1322,36 @@ public class RedissonListTest extends BaseTest { assertThat(list).containsExactly(1, 2L, "3", "e"); } + + @Test + public void testDistributedIterator() { + RList list = redisson.getList("list", StringCodec.INSTANCE); + + // populate list with elements + List strings = IntStream.range(0, 128).mapToObj(i -> i + "").collect(Collectors.toList()); + list.addAll(strings); + + Iterator stringIterator = list.distributedIterator("iterator_{list}", 10); + + // read some elements using iterator + List actual = new ArrayList<>(); + for (int i = 0; i < 64; i++) { + if (stringIterator.hasNext()) { + actual.add(stringIterator.next()); + } + } + + // create another iterator instance using the same name + RList set2 = redisson.getList("list", StringCodec.INSTANCE); + Iterator stringIterator2 = set2.distributedIterator("iterator_{list}", 10); + + assertTrue(stringIterator2.hasNext()); + + // read all remaining elements + stringIterator2.forEachRemaining(actual::add); + stringIterator.forEachRemaining(actual::add); + + assertThat(actual).containsAll(strings); + assertThat(actual).hasSize(strings.size()); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index 642e7b01e..b6e260101 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -2,14 +2,17 @@ package org.redisson; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -18,6 +21,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assumptions; @@ -1540,5 +1545,39 @@ public class RedissonScoredSortedSetTest extends BaseTest { assertThat(out.getScore("three")).isEqualTo(9); } - + @Test + public void testDistributedIterator() { + RScoredSortedSet set = redisson.getScoredSortedSet("set", StringCodec.INSTANCE); + + // populate set with elements + Map stringsOne = IntStream.range(0, 128).boxed() + .collect(Collectors.toMap(i -> "one-" + i, Integer::doubleValue)); + Map stringsTwo = IntStream.range(0, 128).boxed() + .collect(Collectors.toMap(i -> "two-" + i, Integer::doubleValue));; + set.addAll(stringsOne); + set.addAll(stringsTwo); + + Iterator stringIterator = set.distributedIterator("iterator_{set}", "one*", 10); + + // read some elements using iterator + List strings = new ArrayList<>(); + for (int i = 0; i < 64; i++) { + if (stringIterator.hasNext()) { + strings.add(stringIterator.next()); + } + } + + // create another iterator instance using the same name + RScoredSortedSet set2 = redisson.getScoredSortedSet("set", StringCodec.INSTANCE); + Iterator stringIterator2 = set2.distributedIterator("iterator_{set}", "one*", 10); + + assertTrue(stringIterator2.hasNext()); + + // read all remaining elements + stringIterator2.forEachRemaining(strings::add); + stringIterator.forEachRemaining(strings::add); + + assertThat(strings).containsAll(stringsOne.keySet()); + assertThat(strings).hasSize(stringsOne.size()); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonSetMultimapTest.java b/redisson/src/test/java/org/redisson/RedissonSetMultimapTest.java index f37475509..dd2b68952 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetMultimapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetMultimapTest.java @@ -3,12 +3,16 @@ package org.redisson; import org.junit.jupiter.api.Test; import org.redisson.api.RSet; import org.redisson.api.RSetMultimap; +import org.redisson.client.codec.StringCodec; import java.io.Serializable; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RedissonSetMultimapTest extends BaseTest { @@ -520,4 +524,40 @@ public class RedissonSetMultimapTest extends BaseTest { assertThat(map4.get("2")).containsOnly("3"); } + @Test + public void testDistributedIterator() { + RSetMultimap map = redisson.getSetMultimap("set", StringCodec.INSTANCE); + + // populate set with elements + List stringsOne = IntStream.range(0, 128).mapToObj(i -> "one-" + i).collect(Collectors.toList()); + List stringsTwo = IntStream.range(0, 128).mapToObj(i -> "two-" + i).collect(Collectors.toList()); + map.putAll("someKey", stringsOne); + map.putAll("someKey", stringsTwo); + + Iterator stringIterator = map.get("someKey") + .distributedIterator("iterator_{set}", "one*", 10); + + // read some elements using iterator + List strings = new ArrayList<>(); + for (int i = 0; i < 64; i++) { + if (stringIterator.hasNext()) { + strings.add(stringIterator.next()); + } + } + + // create another iterator instance using the same name + RSetMultimap map2 = redisson.getSetMultimap("set", StringCodec.INSTANCE); + Iterator stringIterator2 = map2.get("someKey") + .distributedIterator("iterator_{set}", "one*", 10); + + + assertTrue(stringIterator2.hasNext()); + + // read all remaining elements + stringIterator2.forEachRemaining(strings::add); + stringIterator.forEachRemaining(strings::add); + + assertThat(strings).containsAll(stringsOne); + assertThat(strings).hasSize(stringsOne.size()); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonSetTest.java b/redisson/src/test/java/org/redisson/RedissonSetTest.java index b82f8bd0f..4c3f34d76 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetTest.java @@ -4,13 +4,17 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -865,4 +869,38 @@ public class RedissonSetTest extends BaseTest { Assertions.assertTrue(list.isEmpty()); } + + @Test + public void testDistributedIterator() { + RSet set = redisson.getSet("set", StringCodec.INSTANCE); + + // populate set with elements + List stringsOne = IntStream.range(0, 128).mapToObj(i -> "one-" + i).collect(Collectors.toList()); + List stringsTwo = IntStream.range(0, 128).mapToObj(i -> "two-" + i).collect(Collectors.toList()); + set.addAll(stringsOne); + set.addAll(stringsTwo); + + Iterator stringIterator = set.distributedIterator("iterator_{set}", "one*", 10); + + // read some elements using iterator + List strings = new ArrayList<>(); + for (int i = 0; i < 64; i++) { + if (stringIterator.hasNext()) { + strings.add(stringIterator.next()); + } + } + + // create another iterator instance using the same name + RSet set2 = redisson.getSet("set", StringCodec.INSTANCE); + Iterator stringIterator2 = set2.distributedIterator("iterator_{set}", "one*", 10); + + Assertions.assertTrue(stringIterator2.hasNext()); + + // read all remaining elements + stringIterator2.forEachRemaining(strings::add); + stringIterator.forEachRemaining(strings::add); + + assertThat(strings).containsAll(stringsOne); + assertThat(strings).hasSize(stringsOne.size()); + } } diff --git a/redisson/src/test/java/org/redisson/RedissonSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonSortedSetTest.java index cd682899a..988a92c3c 100644 --- a/redisson/src/test/java/org/redisson/RedissonSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSortedSetTest.java @@ -1,21 +1,27 @@ package org.redisson; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.RFuture; import org.redisson.api.RSortedSet; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.StringCodec; public class RedissonSortedSetTest extends BaseTest { @@ -398,5 +404,35 @@ public class RedissonSortedSetTest extends BaseTest { Assertions.assertEquals(5, set.size()); } + @Test + public void testDistributedIterator() { + RSortedSet set = redisson.getSortedSet("set", StringCodec.INSTANCE); + + // populate set with elements + List strings = IntStream.range(0, 128).mapToObj(i -> "one-" + i).collect(Collectors.toList()); + set.addAll(strings); + + Iterator stringIterator = set.distributedIterator("iterator_{set}", 10); + + // read some elements using iterator + List result = new ArrayList<>(); + for (int i = 0; i < 64; i++) { + if (stringIterator.hasNext()) { + result.add(stringIterator.next()); + } + } + + // create another iterator instance using the same name + RSortedSet set2 = redisson.getSortedSet("set", StringCodec.INSTANCE); + Iterator stringIterator2 = set2.distributedIterator("iterator_{set}", 10); + + assertTrue(stringIterator2.hasNext()); + // read all remaining elements + stringIterator2.forEachRemaining(result::add); + stringIterator.forEachRemaining(result::add); + + assertThat(result).containsAll(strings); + assertThat(result).hasSize(strings.size()); + } }