Merge pull request #3970 from Vorotyntsev/feature-3953/distributed_iterator

Added distributed iterator
pull/4031/head
Nikita Koksharov 3 years ago committed by GitHub
commit dc480ba79e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,6 +18,7 @@ package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
@ -318,6 +319,11 @@ public class RedissonLexSortedSet extends RedissonScoredSortedSet<String> implem
throw new UnsupportedOperationException();
}
@Override
public Iterator<String> distributedIterator(String iteratorName, int count) {
return distributedIterator(iteratorName, null, count);
}
@Override
public Comparator<? super String> comparator() {
return null;

@ -18,6 +18,7 @@ package org.redisson;
import org.redisson.api.*;
import org.redisson.api.listener.*;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
@ -27,6 +28,7 @@ import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.iterator.RedissonListIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.CountableListener;
@ -300,7 +302,55 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public List<V> get(int...indexes) {
return get(getAsync(indexes));
}
@Override
public Iterator<V> distributedIterator(final int count) {
String iteratorName = "__redisson_list_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, count);
}
@Override
public Iterator<V> distributedIterator(final String iteratorName, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return distributedScanIterator(iteratorName, count);
}
@Override
protected void remove(Object value) {
RedissonList.this.remove((V) value);
}
};
}
private ScanResult<Object> distributedScanIterator(String iteratorName, int count) {
return get(distributedScanIteratorAsync(iteratorName, count));
}
private RFuture<ScanResult<Object>> distributedScanIteratorAsync(String iteratorName, int count) {
return commandExecutor.evalWriteAsync(getRawName(), codec, EVAL_LIST_SCAN,
"local start_index = redis.call('get', KEYS[2]); "
+ "if start_index ~= false then "
+ "start_index = tonumber(start_index); "
+ "else "
+ "start_index = 0;"
+ "end;"
+ "if start_index == -1 then "
+ "return {0, {}};"
+ "end;"
+ "local end_index = start_index + ARGV[1];"
+ "local result; "
+ "result = redis.call('lrange', KEYS[1], start_index, end_index - 1); "
+ "if end_index > redis.call('llen', KEYS[1]) then "
+ "end_index = -1;"
+ "end; "
+ "redis.call('setex', KEYS[2], 3600, end_index);"
+ "return {end_index, result};",
Arrays.<Object>asList(getRawName(), iteratorName), count);
}
public RFuture<List<V>> getAsync(int...indexes) {
List<Integer> params = new ArrayList<Integer>();
for (Integer index : indexes) {

@ -30,6 +30,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -37,6 +38,7 @@ import org.redisson.client.protocol.convertor.BooleanNumberReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
/**
* List based Multimap Cache values holder
@ -363,7 +365,65 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
public List<V> get(int...indexes) {
return get(getAsync(indexes));
}
@Override
public Iterator<V> distributedIterator(final int count) {
String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, count);
}
@Override
public Iterator<V> distributedIterator(final String iteratorName, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return distributedScanIterator(iteratorName, count);
}
@Override
protected void remove(Object value) {
RedissonListMultimapValues.this.remove((V) value);
}
};
}
private ScanResult<Object> distributedScanIterator(String iteratorName, int count) {
return get(distributedScanIteratorAsync(iteratorName, count));
}
private RFuture<ScanResult<Object>> distributedScanIteratorAsync(String iteratorName, int count) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SSCAN,
"local cursor = redis.call('get', KEYS[3]); "
+ "if cursor ~= false then "
+ "cursor = tonumber(cursor); "
+ "else "
+ "cursor = 0;"
+ "end;"
+ "if start_index == -1 then "
+ "return {0, {}}; "
+ "end;"
+ "local end_index = start_index + ARGV[1];"
+ "local result; "
+ "result = redis.call('lrange', KEYS[1], start_index, end_index - 1); "
+ "if end_index > redis.call('llen', KEYS[1]) then "
+ "end_index = -1;"
+ "end; "
+ "redis.call('setex', KEYS[2], 3600, end_index);"
+ "local expireDate = 92233720368547758; "
+ "local expirations = redis.call('zmscore', KEYS[1], result[2])"
+ "for i = #expirations, 1, -1 do "
+ "if expirations[i] ~= false then "
+ "local expireDate = tonumber(expireDateScore) "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "table.remove(result[2], i);"
+ "end; "
+ "end; "
+ "end; "
+ "return {end_index, result[2]};",
Arrays.<Object>asList(timeoutSetName, getRawName(), iteratorName), System.currentTimeMillis(), count);
}
@Override
public RFuture<List<V>> getAsync(int...indexes) {
List<Object> params = new ArrayList<Object>();

@ -532,7 +532,78 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
};
}
@Override
public Iterator<V> distributedIterator(final String pattern) {
String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, pattern, 10);
}
@Override
public Iterator<V> distributedIterator(final int count) {
String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, null, count);
}
@Override
public Iterator<V> distributedIterator(final String iteratorName, final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return distributedScanIterator(iteratorName, pattern, count);
}
@Override
protected void remove(Object value) {
RedissonScoredSortedSet.this.remove((V) value);
}
};
}
private ScanResult<Object> distributedScanIterator(String iteratorName, String pattern, int count) {
return get(distributedScanIteratorAsync(iteratorName, pattern, count));
}
private RFuture<ScanResult<Object>> distributedScanIteratorAsync(String iteratorName, String pattern, int count) {
List<Object> args = new ArrayList<>(2);
if (pattern != null) {
args.add(pattern);
}
args.add(count);
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_ZSCAN,
"local cursor = redis.call('get', KEYS[3]); "
+ "if cursor ~= false then "
+ "cursor = tonumber(cursor); "
+ "else "
+ "cursor = 0;"
+ "end;"
+ "if start_index == -1 then "
+ "return {0, {}}; "
+ "end;"
+ "local result; "
+ "if (#ARGV == 2) then "
+ "result = redis.call('zscan', KEYS[1], cursor, 'match', ARGV[1], 'count', ARGV[2]); "
+ "else "
+ "result = redis.call('zscan', KEYS[1], cursor, 'count', ARGV[1]); "
+ "end;"
+ "local next_cursor = result[1]"
+ "if next_cursor ~= \"0\" then "
+ "redis.call('setex', KEYS[2], 3600, next_cursor);"
+ "else "
+ "redis.call('setex', KEYS[2], 3600, -1);"
+ "end; "
+ "local res = {};"
+ "for i, value in ipairs(result[2]) do "
+ "if i % 2 == 0 then "
+ "table.insert(res, result[2][i-1]); "
+ "end; "
+ "end;"
+ "return {result[1], res};",
Arrays.<Object>asList(getRawName(), iteratorName), args.toArray());
}
@Override
public Object[] toArray() {
List<Object> res = (List<Object>) get(valueRangeAsync(0, -1));

@ -111,7 +111,72 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
};
}
@Override
public Iterator<V> distributedIterator(final String pattern) {
String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, pattern, 10);
}
@Override
public Iterator<V> distributedIterator(final int count) {
String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, null, count);
}
@Override
public Iterator<V> distributedIterator(final String iteratorName, final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return distributedScanIterator(iteratorName, pattern, count);
}
@Override
protected void remove(Object value) {
RedissonSet.this.remove((V) value);
}
};
}
private ScanResult<Object> distributedScanIterator(String iteratorName, String pattern, int count) {
return get(distributedScanIteratorAsync(iteratorName, pattern, count));
}
private RFuture<ScanResult<Object>> distributedScanIteratorAsync(String iteratorName, String pattern, int count) {
List<Object> args = new ArrayList<>(2);
if (pattern != null) {
args.add(pattern);
}
args.add(count);
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SSCAN,
"local cursor = redis.call('get', KEYS[2]); "
+ "if cursor ~= false then "
+ "cursor = tonumber(cursor); "
+ "else "
+ "cursor = 0;"
+ "end;"
+ "if start_index == -1 then "
+ "return {0, {}}; "
+ "end;"
+ "local result; "
+ "if (#ARGV == 2) then "
+ "result = redis.call('sscan', KEYS[1], cursor, 'match', ARGV[1], 'count', ARGV[2]); "
+ "else "
+ "result = redis.call('sscan', KEYS[1], cursor, 'count', ARGV[1]); "
+ "end;"
+ "local next_cursor = result[1]"
+ "if next_cursor ~= \"0\" then "
+ "redis.call('setex', KEYS[2], 3600, next_cursor);"
+ "else "
+ "redis.call('setex', KEYS[2], 3600, -1);"
+ "end; "
+ "return result;",
Arrays.<Object>asList(getRawName(), iteratorName), args.toArray());
}
@Override
public Iterator<V> iterator() {
return iterator(null);

@ -230,7 +230,84 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}
@Override
public Iterator<V> distributedIterator(final String pattern) {
String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, pattern, 10);
}
@Override
public Iterator<V> distributedIterator(final int count) {
String iteratorName = "__redisson_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, null, count);
}
@Override
public Iterator<V> distributedIterator(final String iteratorName, final String pattern, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return distributedScanIterator(iteratorName, pattern, count);
}
@Override
protected void remove(Object value) {
RedissonSetMultimapValues.this.remove((V) value);
}
};
}
private ScanResult<Object> distributedScanIterator(String iteratorName, String pattern, int count) {
return get(distributedScanIteratorAsync(iteratorName, pattern, count));
}
private RFuture<ScanResult<Object>> distributedScanIteratorAsync(String iteratorName, String pattern, int count) {
List<Object> args = new ArrayList<>(3);
args.add(System.currentTimeMillis());
if (pattern != null) {
args.add(pattern);
}
args.add(count);
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_SSCAN,
"local cursor = redis.call('get', KEYS[3]); "
+ "if cursor ~= false then "
+ "cursor = tonumber(cursor); "
+ "else"
+ " cursor = 0;"
+ "end;"
+ "if start_index == -1 then "
+ "return {0, {}}; "
+ "end;"
+ "local result; "
+ "if (#ARGV == 3) then "
+ "result = redis.call('sscan', KEYS[2], cursor, 'match', ARGV[2], 'count', ARGV[3]); "
+ "else"
+ "result = redis.call('sscan', KEYS[2], cursor, 'count', ARGV[2]); "
+ "end;"
+ "local next_cursor = result[1]"
+ "if next_cursor ~= \"0\" then "
+ "redis.call('setex', KEYS[3], 3600, next_cursor);"
+ "else "
+ "redis.call('setex', KEYS[3], 3600, -1);"
+ "end; "
+ "local expireDate = 92233720368547758; "
+ "local expirations = redis.call('zmscore', KEYS[1], result[2])"
+ "for i = #expirations, 1, -1 do "
+ "if expirations[i] ~= false then "
+ "local expireDate = tonumber(expireDateScore) "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "table.remove(result[2], i);"
+ "end; "
+ "end; "
+ "end; "
+ "return result;",
Arrays.<Object>asList(timeoutSetName, getRawName(), iteratorName), args.toArray());
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() {

@ -18,10 +18,12 @@ package org.redisson;
import io.netty.buffer.ByteBuf;
import org.redisson.api.*;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
@ -32,6 +34,8 @@ import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.*;
import static org.redisson.client.protocol.RedisCommands.EVAL_LIST_SCAN;
/**
*
* @author Nikita Koksharov
@ -387,7 +391,55 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
return res;
}
@Override
public Iterator<V> distributedIterator(final int count) {
String iteratorName = "__redisson_sorted_set_cursor_{" + getRawName() + "}";
return distributedIterator(iteratorName, count);
}
@Override
public Iterator<V> distributedIterator(final String iteratorName, final int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return distributedScanIterator(iteratorName, count);
}
@Override
protected void remove(Object value) {
RedissonSortedSet.this.remove((V) value);
}
};
}
private ScanResult<Object> distributedScanIterator(String iteratorName, int count) {
return get(distributedScanIteratorAsync(iteratorName, count));
}
private RFuture<ScanResult<Object>> distributedScanIteratorAsync(String iteratorName, int count) {
return commandExecutor.evalWriteAsync(getRawName(), codec, EVAL_LIST_SCAN,
"local start_index = redis.call('get', KEYS[2]); "
+ "if start_index ~= false then "
+ "start_index = tonumber(start_index); "
+ "else "
+ "start_index = 0;"
+ "end;"
+ "if start_index == -1 then "
+ "return {0, {}}; "
+ "end;"
+ "local end_index = start_index + ARGV[1];"
+ "local result; "
+ "result = redis.call('lrange', KEYS[1], start_index, end_index - 1); "
+ "if end_index > redis.call('llen', KEYS[1]) then "
+ "end_index = -1;"
+ "end; "
+ "redis.call('setex', KEYS[2], 3600, end_index);"
+ "return {end_index, result};",
Arrays.<Object>asList(getRawName(), iteratorName), count);
}
// TODO optimize: get three values each time instead of single
public BinarySearchResult<V> binarySearch(V value, Codec codec) {
int size = list.size();

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.util.Iterator;
import java.util.List;
import java.util.RandomAccess;
@ -36,7 +37,26 @@ public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<
* @return list of elements
*/
List<V> get(int...indexes);
/**
* Returns element iterator that can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* See {@linkplain RList#distributedIterator(String, int)} for creating different iterators.
* @param count batch size
* @return shared elements iterator
*/
Iterator<V> distributedIterator(int count);
/**
* Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* Iterator name must be resolved to the same hash slot as list name.
* @param count batch size
* @param iteratorName redis object name to which cursor will be saved
* @return shared elements iterator
*/
Iterator<V> distributedIterator(String iteratorName, int count);
/**
* Returns <code>RMapReduce</code> object associated with this map
*
@ -143,5 +163,4 @@ public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<
* @return listener id
*/
int addListener(ObjectListener listener);
}

@ -468,6 +468,35 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
Iterator<V> iterator(String pattern, int count);
/**
* Returns element iterator that can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* See {@linkplain RSet#distributedIterator(String, String, int)} for creating different iterators.
* @param count batch size
* @return shared elements iterator
*/
Iterator<V> distributedIterator(int count);
/**
* Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* See {@linkplain RSet#distributedIterator(String, String, int)} for creating different iterators.
* @param pattern element pattern
* @return shared elements iterator
*/
Iterator<V> distributedIterator(String pattern);
/**
* Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* Iterator name must be resolved to the same hash slot as set name.
* @param pattern element pattern
* @param count batch size
* @param iteratorName redis object name to which cursor will be saved
* @return shared elements iterator
*/
Iterator<V> distributedIterator(String iteratorName, String pattern, int count);
/**
* Returns <code>true</code> if this sorted set contains encoded state of the specified element.
*

@ -154,7 +154,36 @@ public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set
* @return iterator
*/
Iterator<V> iterator(String pattern);
/**
* Returns element iterator that can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* See {@linkplain RSet#distributedIterator(String, String, int)} for creating different iterators.
* @param count batch size
* @return shared elements iterator
*/
Iterator<V> distributedIterator(int count);
/**
* Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* See {@linkplain RSet#distributedIterator(String, String, int)} for creating different iterators.
* @param pattern element pattern
* @return shared elements iterator
*/
Iterator<V> distributedIterator(String pattern);
/**
* Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* Iterator name must be resolved to the same hash slot as set name.
* @param pattern element pattern
* @param count batch size
* @param iteratorName redis object name to which cursor will be saved
* @return shared elements iterator
*/
Iterator<V> distributedIterator(String iteratorName, String pattern, int count);
/**
* Returns <code>RMapReduce</code> object associated with this object
*

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.SortedSet;
import org.redisson.api.mapreduce.RCollectionMapReduce;
@ -55,4 +56,23 @@ public interface RSortedSet<V> extends SortedSet<V>, RObject {
*/
boolean trySetComparator(Comparator<? super V> comparator);
/**
* Returns element iterator that can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* See {@linkplain RList#distributedIterator(String, int)} for creating different iterators.
* @param count batch size
* @return shared elements iterator
*/
Iterator<V> distributedIterator(int count);
/**
* Returns iterator over elements that match specified pattern. Iterator can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.
* Iterator name must be resolved to the same hash slot as list name.
* @param count batch size
* @param iteratorName redis object name to which cursor will be saved
* @return shared elements iterator
*/
Iterator<V> distributedIterator(String iteratorName, int count);
}

@ -241,6 +241,7 @@ public interface RedisCommands {
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> EVAL_LIST_REVERSE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<>(true));
RedisCommand<List<Integer>> EVAL_INT_LIST = new RedisCommand("EVAL", new ObjectListReplayDecoder<Integer>(), new IntegerReplayConvertor());
RedisCommand<ListScanResult<Object>> EVAL_LIST_SCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<Object>()));
RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>());
RedisCommand<Object> EVAL_OBJECT = new RedisCommand<Object>("EVAL");
RedisCommand<Object> EVAL_MAP_VALUE = new RedisCommand<Object>("EVAL", new MapValueDecoder());

@ -3,11 +3,15 @@ package org.redisson;
import org.junit.jupiter.api.Test;
import org.redisson.api.RList;
import org.redisson.api.RListMultimap;
import org.redisson.client.codec.StringCodec;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RedissonListMultimapTest extends BaseTest {
@ -360,5 +364,37 @@ public class RedissonListMultimapTest extends BaseTest {
assertThat(allValues).containsExactlyElementsOf(values);
}
@Test
public void testDistributedIterator() {
RListMultimap<String, String> map = redisson.getListMultimap("set", StringCodec.INSTANCE);
// populate set with elements
List<String> stringsOne = IntStream.range(0, 64).mapToObj(i -> "" + i).collect(Collectors.toList());
map.putAll("someKey", stringsOne);
Iterator<String> stringIterator = map.get("someKey")
.distributedIterator("iterator_{set}", 10);
// read some elements using iterator
List<String> strings = new ArrayList<>();
for (int i = 0; i < 20; i++) {
if (stringIterator.hasNext()) {
strings.add(stringIterator.next());
}
}
// create another iterator instance using the same name
RListMultimap<String, String> map2 = redisson.getListMultimap("set", StringCodec.INSTANCE);
Iterator<String> stringIterator2 = map2.get("someKey")
.distributedIterator("iterator_{set}", 10);
assertTrue(stringIterator2.hasNext());
// read all remaining elements
stringIterator2.forEachRemaining(strings::add);
stringIterator.forEachRemaining(strings::add);
assertThat(strings).containsAll(stringsOne);
assertThat(strings).hasSize(stringsOne.size());
}
}

@ -15,8 +15,11 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RedissonListTest extends BaseTest {
@ -1319,4 +1322,36 @@ public class RedissonListTest extends BaseTest {
assertThat(list).containsExactly(1, 2L, "3", "e");
}
@Test
public void testDistributedIterator() {
RList<String> list = redisson.getList("list", StringCodec.INSTANCE);
// populate list with elements
List<String> strings = IntStream.range(0, 128).mapToObj(i -> i + "").collect(Collectors.toList());
list.addAll(strings);
Iterator<String> stringIterator = list.distributedIterator("iterator_{list}", 10);
// read some elements using iterator
List<String> actual = new ArrayList<>();
for (int i = 0; i < 64; i++) {
if (stringIterator.hasNext()) {
actual.add(stringIterator.next());
}
}
// create another iterator instance using the same name
RList<String> set2 = redisson.getList("list", StringCodec.INSTANCE);
Iterator<String> stringIterator2 = set2.distributedIterator("iterator_{list}", 10);
assertTrue(stringIterator2.hasNext());
// read all remaining elements
stringIterator2.forEachRemaining(actual::add);
stringIterator.forEachRemaining(actual::add);
assertThat(actual).containsAll(strings);
assertThat(actual).hasSize(strings.size());
}
}

@ -2,14 +2,17 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@ -18,6 +21,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
@ -1540,5 +1545,39 @@ public class RedissonScoredSortedSetTest extends BaseTest {
assertThat(out.getScore("three")).isEqualTo(9);
}
@Test
public void testDistributedIterator() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("set", StringCodec.INSTANCE);
// populate set with elements
Map<String, Double> stringsOne = IntStream.range(0, 128).boxed()
.collect(Collectors.toMap(i -> "one-" + i, Integer::doubleValue));
Map<String, Double> stringsTwo = IntStream.range(0, 128).boxed()
.collect(Collectors.toMap(i -> "two-" + i, Integer::doubleValue));;
set.addAll(stringsOne);
set.addAll(stringsTwo);
Iterator<String> stringIterator = set.distributedIterator("iterator_{set}", "one*", 10);
// read some elements using iterator
List<String> strings = new ArrayList<>();
for (int i = 0; i < 64; i++) {
if (stringIterator.hasNext()) {
strings.add(stringIterator.next());
}
}
// create another iterator instance using the same name
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("set", StringCodec.INSTANCE);
Iterator<String> stringIterator2 = set2.distributedIterator("iterator_{set}", "one*", 10);
assertTrue(stringIterator2.hasNext());
// read all remaining elements
stringIterator2.forEachRemaining(strings::add);
stringIterator.forEachRemaining(strings::add);
assertThat(strings).containsAll(stringsOne.keySet());
assertThat(strings).hasSize(stringsOne.size());
}
}

@ -3,12 +3,16 @@ package org.redisson;
import org.junit.jupiter.api.Test;
import org.redisson.api.RSet;
import org.redisson.api.RSetMultimap;
import org.redisson.client.codec.StringCodec;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RedissonSetMultimapTest extends BaseTest {
@ -520,4 +524,40 @@ public class RedissonSetMultimapTest extends BaseTest {
assertThat(map4.get("2")).containsOnly("3");
}
@Test
public void testDistributedIterator() {
RSetMultimap<String, String> map = redisson.getSetMultimap("set", StringCodec.INSTANCE);
// populate set with elements
List<String> stringsOne = IntStream.range(0, 128).mapToObj(i -> "one-" + i).collect(Collectors.toList());
List<String> stringsTwo = IntStream.range(0, 128).mapToObj(i -> "two-" + i).collect(Collectors.toList());
map.putAll("someKey", stringsOne);
map.putAll("someKey", stringsTwo);
Iterator<String> stringIterator = map.get("someKey")
.distributedIterator("iterator_{set}", "one*", 10);
// read some elements using iterator
List<String> strings = new ArrayList<>();
for (int i = 0; i < 64; i++) {
if (stringIterator.hasNext()) {
strings.add(stringIterator.next());
}
}
// create another iterator instance using the same name
RSetMultimap<String, String> map2 = redisson.getSetMultimap("set", StringCodec.INSTANCE);
Iterator<String> stringIterator2 = map2.get("someKey")
.distributedIterator("iterator_{set}", "one*", 10);
assertTrue(stringIterator2.hasNext());
// read all remaining elements
stringIterator2.forEachRemaining(strings::add);
stringIterator.forEachRemaining(strings::add);
assertThat(strings).containsAll(stringsOne);
assertThat(strings).hasSize(stringsOne.size());
}
}

@ -4,13 +4,17 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -865,4 +869,38 @@ public class RedissonSetTest extends BaseTest {
Assertions.assertTrue(list.isEmpty());
}
@Test
public void testDistributedIterator() {
RSet<String> set = redisson.getSet("set", StringCodec.INSTANCE);
// populate set with elements
List<String> stringsOne = IntStream.range(0, 128).mapToObj(i -> "one-" + i).collect(Collectors.toList());
List<String> stringsTwo = IntStream.range(0, 128).mapToObj(i -> "two-" + i).collect(Collectors.toList());
set.addAll(stringsOne);
set.addAll(stringsTwo);
Iterator<String> stringIterator = set.distributedIterator("iterator_{set}", "one*", 10);
// read some elements using iterator
List<String> strings = new ArrayList<>();
for (int i = 0; i < 64; i++) {
if (stringIterator.hasNext()) {
strings.add(stringIterator.next());
}
}
// create another iterator instance using the same name
RSet<String> set2 = redisson.getSet("set", StringCodec.INSTANCE);
Iterator<String> stringIterator2 = set2.distributedIterator("iterator_{set}", "one*", 10);
Assertions.assertTrue(stringIterator2.hasNext());
// read all remaining elements
stringIterator2.forEachRemaining(strings::add);
stringIterator.forEachRemaining(strings::add);
assertThat(strings).containsAll(stringsOne);
assertThat(strings).hasSize(stringsOne.size());
}
}

@ -1,21 +1,27 @@
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RSortedSet;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
public class RedissonSortedSetTest extends BaseTest {
@ -398,5 +404,35 @@ public class RedissonSortedSetTest extends BaseTest {
Assertions.assertEquals(5, set.size());
}
@Test
public void testDistributedIterator() {
RSortedSet<String> set = redisson.getSortedSet("set", StringCodec.INSTANCE);
// populate set with elements
List<String> strings = IntStream.range(0, 128).mapToObj(i -> "one-" + i).collect(Collectors.toList());
set.addAll(strings);
Iterator<String> stringIterator = set.distributedIterator("iterator_{set}", 10);
// read some elements using iterator
List<String> result = new ArrayList<>();
for (int i = 0; i < 64; i++) {
if (stringIterator.hasNext()) {
result.add(stringIterator.next());
}
}
// create another iterator instance using the same name
RSortedSet<String> set2 = redisson.getSortedSet("set", StringCodec.INSTANCE);
Iterator<String> stringIterator2 = set2.distributedIterator("iterator_{set}", 10);
assertTrue(stringIterator2.hasNext());
// read all remaining elements
stringIterator2.forEachRemaining(result::add);
stringIterator.forEachRemaining(result::add);
assertThat(result).containsAll(strings);
assertThat(result).hasSize(strings.size());
}
}

Loading…
Cancel
Save