Feature - added RTimeSeries.range() rangeReversed() entryRange() entryRangeReversed() method with limit parameter #4119

pull/4130/head
Nikita Koksharov 3 years ago
parent 8d0ba5918d
commit 9343db6c6f

@ -313,10 +313,6 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
public RFuture<Integer> removeRangeAsync(long startTimestamp, long endTimestamp) {
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[3]);" +
"if #values == 0 then " +
"return nil;" +
"end;" +
"local counter = 0; " +
"for i, v in ipairs(values) do " +
"local expirationDate = redis.call('zscore', KEYS[2], v); " +
@ -331,6 +327,11 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
System.currentTimeMillis(), startTimestamp, endTimestamp);
}
@Override
public Collection<V> range(long startTimestamp, long endTimestamp, int limit) {
return get(rangeAsync(startTimestamp, endTimestamp, limit));
}
@Override
public Collection<V> range(long startTimestamp, long endTimestamp) {
return get(rangeAsync(startTimestamp, endTimestamp));
@ -338,17 +339,17 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
@Override
public Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp) {
return get(entryRangeAsync(false, startTimestamp, endTimestamp));
return get(entryRangeAsync(false, startTimestamp, endTimestamp, 0));
}
@Override
public Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp) {
return get(entryRangeAsync(true, startTimestamp, endTimestamp));
return get(entryRangeAsync(true, startTimestamp, endTimestamp, 0));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp) {
return entryRangeAsync(true, startTimestamp, endTimestamp);
return entryRangeAsync(true, startTimestamp, endTimestamp, 0);
}
private static final RedisCommand<List<TimeSeriesEntry<Object>>> ENTRIES =
@ -356,55 +357,63 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
@Override
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp) {
return entryRangeAsync(false, startTimestamp, endTimestamp);
return entryRangeAsync(false, startTimestamp, endTimestamp, 0);
}
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(boolean reverse, long startTimestamp, long endTimestamp) {
private RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(boolean reverse, long startTimestamp, long endTimestamp, int limit) {
return commandExecutor.evalReadAsync(getRawName(), codec, ENTRIES,
"local result = {}; " +
"local from = ARGV[2]; " +
"local to = ARGV[3]; " +
"local limit = tonumber(ARGV[4]); " +
"local cmd = 'zrangebyscore'; " +
"if ARGV[5] ~= '0' then " +
"from = ARGV[3]; " +
"to = ARGV[2]; " +
"cmd = 'zrevrangebyscore';" +
"end; " +
"while true do " +
"local values;" +
"if ARGV[4] == '0' then " +
"values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[3], 'withscores');" +
"if ARGV[4] ~= '0' then " +
"values = redis.call(cmd, KEYS[1], from, to, 'withscores', 'limit', 0, limit);" +
"else " +
"values = redis.call('zrevrangebyscore', KEYS[1], ARGV[3], ARGV[2], 'withscores');" +
"end;" +
"if #values == 0 then " +
"return result;" +
"end;" +
"values = redis.call(cmd, KEYS[1], from, to, 'withscores');" +
"end; " +
"for i = 1, #values, 2 do " +
"local expirationDate = redis.call('zscore', KEYS[2], values[i]); " +
"for i=1, #values, 2 do " +
"local expirationDate = redis.call('zscore', KEYS[2], values[i]);" +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('Bc0Lc0', values[i]); " +
"table.insert(result, val);" +
"table.insert(result, values[i+1]);" +
"end;" +
"end;" +
"return result;",
"if limit == 0 or #result/2 == tonumber(ARGV[4]) or #values/2 < tonumber(limit) then " +
"return result;" +
"end;" +
"from = '(' .. values[#values];" +
"limit = tonumber(ARGV[4]) - #result/2;" +
"end;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), startTimestamp, endTimestamp, Boolean.compare(reverse, false));
System.currentTimeMillis(), startTimestamp, endTimestamp, limit, Boolean.compare(reverse, false));
}
@Override
public Collection<V> rangeReversed(long startTimestamp, long endTimestamp, int limit) {
return get(rangeReversedAsync(startTimestamp, endTimestamp, limit));
}
@Override
public RFuture<Collection<V>> rangeAsync(long startTimestamp, long endTimestamp) {
return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[3]);" +
"if #values == 0 then " +
"return nil;" +
"end;" +
return rangeAsync(startTimestamp, endTimestamp, 0);
}
"local result = {}; " +
"for i, v in ipairs(values) do " +
"local expirationDate = redis.call('zscore', KEYS[2], v); " +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('Bc0Lc0', v); " +
"table.insert(result, val);" +
"end;" +
"end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), startTimestamp, endTimestamp);
@Override
public RFuture<Collection<V>> rangeAsync(long startTimestamp, long endTimestamp, int limit) {
return rangeAsync(false, startTimestamp, endTimestamp, limit);
}
@Override
@ -414,23 +423,72 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
@Override
public RFuture<Collection<V>> rangeReversedAsync(long startTimestamp, long endTimestamp) {
return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local values = redis.call('zrevrangebyscore', KEYS[1], ARGV[3], ARGV[2]);" +
"if #values == 0 then " +
"return nil;" +
"end;" +
return rangeReversedAsync(startTimestamp, endTimestamp, 0);
}
@Override
public RFuture<Collection<V>> rangeReversedAsync(long startTimestamp, long endTimestamp, int limit) {
return rangeAsync(true, startTimestamp, endTimestamp, limit);
}
private RFuture<Collection<V>> rangeAsync(boolean reverse, long startTimestamp, long endTimestamp, int limit) {
return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for i, v in ipairs(values) do " +
"local expirationDate = redis.call('zscore', KEYS[2], v); " +
"local from = ARGV[2]; " +
"local to = ARGV[3]; " +
"local limit = tonumber(ARGV[4]); " +
"local cmd = 'zrangebyscore'; " +
"if ARGV[5] ~= '0' then " +
"from = ARGV[3]; " +
"to = ARGV[2]; " +
"cmd = 'zrevrangebyscore';" +
"end; " +
"while true do " +
"local values;" +
"if ARGV[4] ~= '0' then " +
"values = redis.call(cmd, KEYS[1], from, to, 'withscores', 'limit', 0, limit);" +
"else " +
"values = redis.call(cmd, KEYS[1], from, to, 'withscores');" +
"end; " +
"for i=1, #values, 2 do " +
"local expirationDate = redis.call('zscore', KEYS[2], values[i]);" +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('Bc0Lc0', v); " +
"local t, val = struct.unpack('Bc0Lc0', values[i]); " +
"table.insert(result, val);" +
"end;" +
"end;" +
"return result;",
"if limit == 0 or #result == tonumber(ARGV[4]) or #values/2 < tonumber(limit) then " +
"return result;" +
"end;" +
"from = '(' .. values[#values];" +
"limit = tonumber(ARGV[4]) - #result;" +
"end;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), startTimestamp, endTimestamp);
System.currentTimeMillis(), startTimestamp, endTimestamp, limit, Boolean.compare(reverse, false));
}
@Override
public Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp, int limit) {
return get(entryRangeAsync(startTimestamp, endTimestamp, limit));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit) {
return entryRangeAsync(false, startTimestamp, endTimestamp, limit);
}
@Override
public Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp, int limit) {
return get(entryRangeReversedAsync(startTimestamp, endTimestamp, limit));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit) {
return entryRangeAsync(true, startTimestamp, endTimestamp, limit);
}
@Override

@ -184,6 +184,16 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
Collection<V> range(long startTimestamp, long endTimestamp);
/**
* Returns ordered elements of this time-series collection within timestamp range. Including boundary values.
*
* @param startTimestamp start timestamp
* @param endTimestamp end timestamp
* @param limit result size limit
* @return elements collection
*/
Collection<V> range(long startTimestamp, long endTimestamp, int limit);
/**
* Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values.
*
@ -193,6 +203,16 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
Collection<V> rangeReversed(long startTimestamp, long endTimestamp);
/**
* Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values.
*
* @param startTimestamp start timestamp
* @param endTimestamp end timestamp
* @param limit result size limit
* @return elements collection
*/
Collection<V> rangeReversed(long startTimestamp, long endTimestamp, int limit);
/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
*
@ -202,6 +222,16 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp);
/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
*
* @param startTimestamp start timestamp
* @param endTimestamp end timestamp
* @param limit result size limit
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp, int limit);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
*
@ -211,6 +241,16 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
*
* @param startTimestamp start timestamp
* @param endTimestamp end timestamp
* @param limit result size limit
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp, int limit);
/**
* Returns stream of elements in this time-series collection.
* Elements are loaded in batch. Batch size is 10.

@ -186,6 +186,16 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Collection<V>> rangeAsync(long startTimestamp, long endTimestamp);
/**
* Returns ordered elements of this time-series collection within timestamp range. Including boundary values.
*
* @param startTimestamp start timestamp
* @param endTimestamp end timestamp
* @param limit result size limit
* @return elements collection
*/
RFuture<Collection<V>> rangeAsync(long startTimestamp, long endTimestamp, int limit);
/**
* Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values.
*
@ -195,6 +205,16 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Collection<V>> rangeReversedAsync(long startTimestamp, long endTimestamp);
/**
* Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values.
*
* @param startTimestamp start timestamp
* @param endTimestamp end timestamp
* @param limit result size limit
* @return elements collection
*/
RFuture<Collection<V>> rangeReversedAsync(long startTimestamp, long endTimestamp, int limit);
/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
*
@ -204,6 +224,16 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp);
/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
*
* @param startTimestamp start timestamp
* @param endTimestamp end timestamp
* @param limit result size limit
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
*
@ -213,4 +243,14 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
*
* @param startTimestamp start timestamp
* @param endTimestamp end timestamp
* @param limit result size limit
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit);
}

@ -78,6 +78,50 @@ public class RedissonTimeSeriesTest extends BaseTest {
assertThat(iter.hasNext()).isFalse();
}
@Test
public void testRangeReversed() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "20");
t.add(3, "30");
t.add(4, "40");
assertThat(t.rangeReversed(1, 4, 2)).containsExactly("40", "30");
assertThat(t.rangeReversed(1, 4, 0)).containsExactly("40", "30", "20", "10");
RTimeSeries<String> t2 = redisson.getTimeSeries("test2");
t2.add(1, "10");
t2.add(2, "20");
t2.add(3, "30", 1, TimeUnit.SECONDS);
t2.add(4, "40");
Thread.sleep(1200);
assertThat(t2.rangeReversed(1, 4, 2)).containsExactly("40", "20");
}
@Test
public void testRange() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "10");
t.add(3, "30");
t.add(4, "40");
assertThat(t.range(1, 4, 2)).containsExactly("10", "10");
assertThat(t.range(1, 4, 0)).containsExactly("10", "10", "30", "40");
RTimeSeries<String> t2 = redisson.getTimeSeries("test2");
t2.add(1, "10");
t2.add(2, "10", 1, TimeUnit.SECONDS);
t2.add(3, "30");
t2.add(4, "40");
Thread.sleep(1200);
assertThat(t2.range(1, 4, 2)).containsExactly("10", "30");
}
@Test
public void testRemove() {
RTimeSeries<String> t = redisson.getTimeSeries("test");

Loading…
Cancel
Save