Fixed - TimeSeries.iterator() doesn't respect the ordering. #3145

pull/3209/head
Nikita Koksharov 4 years ago
parent 3e578c1af0
commit 88d1c9507c

@ -481,59 +481,45 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
System.currentTimeMillis(), startScore, limit);
}
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(name, client, startPos, pattern, count);
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, int count) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(name, client, startPos, count);
return get(f);
}
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, int count) {
List<Object> params = new ArrayList<>();
params.add(startPos);
params.add(System.currentTimeMillis());
if (pattern != null) {
params.add(pattern);
}
params.add(count);
return commandExecutor.evalReadAsync(client, name, codec, RedisCommands.EVAL_ZSCAN,
"local result = {}; "
+ "local res; "
+ "if (#ARGV == 4) then "
+ " res = redis.call('zscan', KEYS[1], ARGV[1], 'match', ARGV[3], 'count', ARGV[4]); "
+ "else "
+ " res = redis.call('zscan', KEYS[1], ARGV[1], 'count', ARGV[3]); "
+ "end;"
+ "for i, value in ipairs(res[2]) do "
+ "if i % 2 ~= 0 then " +
"local expirationDate = redis.call('zscore', KEYS[2], value); " +
"if tonumber(expirationDate) > tonumber(ARGV[2]) then " +
"local t, val = struct.unpack('LLc0', value); " +
"table.insert(result, val);" +
"end;"
+ "end;"
+ "end;"
+ "return {res[1], result};",
+ "local res = redis.call('zrange', KEYS[1], ARGV[1], tonumber(ARGV[1]) + tonumber(ARGV[3]) - 1); "
+ "for i, value in ipairs(res) do "
+ "local expirationDate = redis.call('zscore', KEYS[2], value); " +
"if tonumber(expirationDate) > tonumber(ARGV[2]) then " +
"local t, val = struct.unpack('LLc0', value); " +
"table.insert(result, val);" +
"end;"
+ "end;" +
"local nextPos = tonumber(ARGV[1]) + tonumber(ARGV[3]); " +
"if #res < tonumber(ARGV[3]) then " +
"nextPos = 0;" +
"end;"
+ "return {nextPos, result};",
Arrays.asList(name, getTimeoutSetName()),
params.toArray());
}
@Override
public Iterator<V> iterator(int count) {
return iterator(null, count);
}
@Override
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}
@Override
public Iterator<V> iterator(String pattern, int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern, count);
return scanIterator(getName(), client, nextIterPos, count);
}
@Override
@ -546,7 +532,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
@Override
public Iterator<V> iterator() {
return iterator(null);
return iterator(10);
}
@Override
@ -559,16 +545,6 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
return toStream(iterator(count));
}
@Override
public Stream<V> stream(String pattern, int count) {
return toStream(iterator(pattern, count));
}
@Override
public Stream<V> stream(String pattern) {
return toStream(iterator(pattern));
}
@Override
public void destroy() {
if (evictionScheduler != null) {

@ -219,15 +219,6 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
Stream<V> stream();
/**
* Returns stream of elements in this time-series collection.
* If <code>pattern</code> is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @return stream of elements
*/
Stream<V> stream(String pattern);
/**
* Returns stream of elements in this time-series collection.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
@ -237,26 +228,6 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
Stream<V> stream(int count);
/**
* Returns stream of elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return stream of elements
*/
Stream<V> stream(String pattern, int count);
/**
* Returns an iterator over elements in this set.
* If <code>pattern</code> is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @return iterator
*/
Iterator<V> iterator(String pattern);
/**
* Returns an iterator over elements in this time-series collection.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
@ -266,15 +237,4 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
Iterator<V> iterator(int count);
/**
* Returns an iterator over elements in this time-series collection.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only elements match this pattern are loaded.
*
* @param pattern - search pattern
* @param count - size of elements batch
* @return iterator
*/
Iterator<V> iterator(String pattern, int count);
}

@ -44,7 +44,7 @@ public class RedissonTimeSeriesReactive<V> {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonTimeSeries) instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
return ((RedissonTimeSeries) instance).scanIteratorAsync(instance.getName(), client, nextIterPos, 10);
}
});
}

@ -43,7 +43,7 @@ public class RedissonTimeSeriesRx<V> {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonTimeSeries) instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
return ((RedissonTimeSeries) instance).scanIteratorAsync(instance.getName(), client, nextIterPos, 10);
}
}.create();
}

@ -43,12 +43,15 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testIterator() {
RTimeSeries<String> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(3, "30");
Iterator<String> iter = t.iterator(2);
assertThat(iter.next()).isEqualTo("10");
assertThat(iter.next()).isEqualTo("30");
for (int i = 0; i < 19; i++) {
t.add(i, "" + i*10);
}
Iterator<String> iter = t.iterator(3);
for (int i = 0; i < 19; i++) {
assertThat(iter.hasNext()).isTrue();
assertThat(iter.next()).isEqualTo("" + i*10);
}
assertThat(iter.hasNext()).isFalse();
}

Loading…
Cancel
Save