diff --git a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java index 949eaa6b0..e8b3909b2 100644 --- a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java +++ b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java @@ -313,10 +313,6 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer public RFuture removeRangeAsync(long startTimestamp, long endTimestamp) { return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER, "local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[3]);" + - "if #values == 0 then " + - "return nil;" + - "end;" + - "local counter = 0; " + "for i, v in ipairs(values) do " + "local expirationDate = redis.call('zscore', KEYS[2], v); " + @@ -331,6 +327,11 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer System.currentTimeMillis(), startTimestamp, endTimestamp); } + @Override + public Collection range(long startTimestamp, long endTimestamp, int limit) { + return get(rangeAsync(startTimestamp, endTimestamp, limit)); + } + @Override public Collection range(long startTimestamp, long endTimestamp) { return get(rangeAsync(startTimestamp, endTimestamp)); @@ -338,17 +339,17 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer @Override public Collection> entryRange(long startTimestamp, long endTimestamp) { - return get(entryRangeAsync(false, startTimestamp, endTimestamp)); + return get(entryRangeAsync(false, startTimestamp, endTimestamp, 0)); } @Override public Collection> entryRangeReversed(long startTimestamp, long endTimestamp) { - return get(entryRangeAsync(true, startTimestamp, endTimestamp)); + return get(entryRangeAsync(true, startTimestamp, endTimestamp, 0)); } @Override public RFuture>> entryRangeReversedAsync(long startTimestamp, long endTimestamp) { - return entryRangeAsync(true, startTimestamp, endTimestamp); + return entryRangeAsync(true, startTimestamp, endTimestamp, 0); } private static final RedisCommand>> ENTRIES = @@ -356,55 +357,63 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer @Override public RFuture>> entryRangeAsync(long startTimestamp, long endTimestamp) { - return entryRangeAsync(false, startTimestamp, endTimestamp); + return entryRangeAsync(false, startTimestamp, endTimestamp, 0); } - public RFuture>> entryRangeAsync(boolean reverse, long startTimestamp, long endTimestamp) { + private RFuture>> entryRangeAsync(boolean reverse, long startTimestamp, long endTimestamp, int limit) { return commandExecutor.evalReadAsync(getRawName(), codec, ENTRIES, - "local result = {}; " + - + "local result = {}; " + + "local from = ARGV[2]; " + + "local to = ARGV[3]; " + + "local limit = tonumber(ARGV[4]); " + + + "local cmd = 'zrangebyscore'; " + + "if ARGV[5] ~= '0' then " + + "from = ARGV[3]; " + + "to = ARGV[2]; " + + "cmd = 'zrevrangebyscore';" + + "end; " + + + "while true do " + "local values;" + - "if ARGV[4] == '0' then " + - "values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[3], 'withscores');" + + "if ARGV[4] ~= '0' then " + + "values = redis.call(cmd, KEYS[1], from, to, 'withscores', 'limit', 0, limit);" + "else " + - "values = redis.call('zrevrangebyscore', KEYS[1], ARGV[3], ARGV[2], 'withscores');" + - "end;" + - "if #values == 0 then " + - "return result;" + - "end;" + + "values = redis.call(cmd, KEYS[1], from, to, 'withscores');" + + "end; " + - "for i = 1, #values, 2 do " + - "local expirationDate = redis.call('zscore', KEYS[2], values[i]); " + + "for i=1, #values, 2 do " + + "local expirationDate = redis.call('zscore', KEYS[2], values[i]);" + "if tonumber(expirationDate) > tonumber(ARGV[1]) then " + "local t, val = struct.unpack('Bc0Lc0', values[i]); " + "table.insert(result, val);" + "table.insert(result, values[i+1]);" + "end;" + "end;" + - "return result;", + + "if limit == 0 or #result/2 == tonumber(ARGV[4]) or #values/2 < tonumber(limit) then " + + "return result;" + + "end;" + + "from = '(' .. values[#values];" + + "limit = tonumber(ARGV[4]) - #result/2;" + + "end;", Arrays.asList(getRawName(), getTimeoutSetName()), - System.currentTimeMillis(), startTimestamp, endTimestamp, Boolean.compare(reverse, false)); + System.currentTimeMillis(), startTimestamp, endTimestamp, limit, Boolean.compare(reverse, false)); + } + + @Override + public Collection rangeReversed(long startTimestamp, long endTimestamp, int limit) { + return get(rangeReversedAsync(startTimestamp, endTimestamp, limit)); } @Override public RFuture> rangeAsync(long startTimestamp, long endTimestamp) { - return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_LIST, - "local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[3]);" + - "if #values == 0 then " + - "return nil;" + - "end;" + + return rangeAsync(startTimestamp, endTimestamp, 0); + } - "local result = {}; " + - "for i, v in ipairs(values) do " + - "local expirationDate = redis.call('zscore', KEYS[2], v); " + - "if tonumber(expirationDate) > tonumber(ARGV[1]) then " + - "local t, val = struct.unpack('Bc0Lc0', v); " + - "table.insert(result, val);" + - "end;" + - "end;" + - "return result;", - Arrays.asList(getRawName(), getTimeoutSetName()), - System.currentTimeMillis(), startTimestamp, endTimestamp); + @Override + public RFuture> rangeAsync(long startTimestamp, long endTimestamp, int limit) { + return rangeAsync(false, startTimestamp, endTimestamp, limit); } @Override @@ -414,23 +423,72 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer @Override public RFuture> rangeReversedAsync(long startTimestamp, long endTimestamp) { + return rangeReversedAsync(startTimestamp, endTimestamp, 0); + } + + @Override + public RFuture> rangeReversedAsync(long startTimestamp, long endTimestamp, int limit) { + return rangeAsync(true, startTimestamp, endTimestamp, limit); + } + + private RFuture> rangeAsync(boolean reverse, long startTimestamp, long endTimestamp, int limit) { return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_LIST, - "local values = redis.call('zrevrangebyscore', KEYS[1], ARGV[3], ARGV[2]);" + - "if #values == 0 then " + - "return nil;" + - "end;" + + "local result = {}; " + + "local from = ARGV[2]; " + + "local to = ARGV[3]; " + + "local limit = tonumber(ARGV[4]); " + + + "local cmd = 'zrangebyscore'; " + + "if ARGV[5] ~= '0' then " + + "from = ARGV[3]; " + + "to = ARGV[2]; " + + "cmd = 'zrevrangebyscore';" + + "end; " + + + "while true do " + + "local values;" + + "if ARGV[4] ~= '0' then " + + "values = redis.call(cmd, KEYS[1], from, to, 'withscores', 'limit', 0, limit);" + + "else " + + "values = redis.call(cmd, KEYS[1], from, to, 'withscores');" + + "end; " + - "local result = {}; " + - "for i, v in ipairs(values) do " + - "local expirationDate = redis.call('zscore', KEYS[2], v); " + + "for i=1, #values, 2 do " + + "local expirationDate = redis.call('zscore', KEYS[2], values[i]);" + "if tonumber(expirationDate) > tonumber(ARGV[1]) then " + - "local t, val = struct.unpack('Bc0Lc0', v); " + + "local t, val = struct.unpack('Bc0Lc0', values[i]); " + "table.insert(result, val);" + "end;" + "end;" + - "return result;", + + "if limit == 0 or #result == tonumber(ARGV[4]) or #values/2 < tonumber(limit) then " + + "return result;" + + "end;" + + "from = '(' .. values[#values];" + + "limit = tonumber(ARGV[4]) - #result;" + + "end;", Arrays.asList(getRawName(), getTimeoutSetName()), - System.currentTimeMillis(), startTimestamp, endTimestamp); + System.currentTimeMillis(), startTimestamp, endTimestamp, limit, Boolean.compare(reverse, false)); + } + + @Override + public Collection> entryRange(long startTimestamp, long endTimestamp, int limit) { + return get(entryRangeAsync(startTimestamp, endTimestamp, limit)); + } + + @Override + public RFuture>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit) { + return entryRangeAsync(false, startTimestamp, endTimestamp, limit); + } + + @Override + public Collection> entryRangeReversed(long startTimestamp, long endTimestamp, int limit) { + return get(entryRangeReversedAsync(startTimestamp, endTimestamp, limit)); + } + + @Override + public RFuture>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit) { + return entryRangeAsync(true, startTimestamp, endTimestamp, limit); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RTimeSeries.java b/redisson/src/main/java/org/redisson/api/RTimeSeries.java index da65fdf10..1283f48c5 100644 --- a/redisson/src/main/java/org/redisson/api/RTimeSeries.java +++ b/redisson/src/main/java/org/redisson/api/RTimeSeries.java @@ -184,6 +184,16 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesAsyn */ Collection range(long startTimestamp, long endTimestamp); + /** + * Returns ordered elements of this time-series collection within timestamp range. Including boundary values. + * + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param limit result size limit + * @return elements collection + */ + Collection range(long startTimestamp, long endTimestamp, int limit); + /** * Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values. * @@ -193,6 +203,16 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesAsyn */ Collection rangeReversed(long startTimestamp, long endTimestamp); + /** + * Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values. + * + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param limit result size limit + * @return elements collection + */ + Collection rangeReversed(long startTimestamp, long endTimestamp, int limit); + /** * Returns ordered entries of this time-series collection within timestamp range. Including boundary values. * @@ -202,6 +222,16 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesAsyn */ Collection> entryRange(long startTimestamp, long endTimestamp); + /** + * Returns ordered entries of this time-series collection within timestamp range. Including boundary values. + * + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param limit result size limit + * @return elements collection + */ + Collection> entryRange(long startTimestamp, long endTimestamp, int limit); + /** * Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values. * @@ -211,6 +241,16 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesAsyn */ Collection> entryRangeReversed(long startTimestamp, long endTimestamp); + /** + * Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values. + * + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param limit result size limit + * @return elements collection + */ + Collection> entryRangeReversed(long startTimestamp, long endTimestamp, int limit); + /** * Returns stream of elements in this time-series collection. * Elements are loaded in batch. Batch size is 10. diff --git a/redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java b/redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java index ab1e10762..7a9b9b619 100644 --- a/redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java +++ b/redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java @@ -186,6 +186,16 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture> rangeAsync(long startTimestamp, long endTimestamp); + /** + * Returns ordered elements of this time-series collection within timestamp range. Including boundary values. + * + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param limit result size limit + * @return elements collection + */ + RFuture> rangeAsync(long startTimestamp, long endTimestamp, int limit); + /** * Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values. * @@ -195,6 +205,16 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture> rangeReversedAsync(long startTimestamp, long endTimestamp); + /** + * Returns elements of this time-series collection in reverse order within timestamp range. Including boundary values. + * + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param limit result size limit + * @return elements collection + */ + RFuture> rangeReversedAsync(long startTimestamp, long endTimestamp, int limit); + /** * Returns ordered entries of this time-series collection within timestamp range. Including boundary values. * @@ -204,6 +224,16 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture>> entryRangeAsync(long startTimestamp, long endTimestamp); + /** + * Returns ordered entries of this time-series collection within timestamp range. Including boundary values. + * + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param limit result size limit + * @return elements collection + */ + RFuture>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit); + /** * Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values. * @@ -213,4 +243,14 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture>> entryRangeReversedAsync(long startTimestamp, long endTimestamp); + /** + * Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values. + * + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param limit result size limit + * @return elements collection + */ + RFuture>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit); + } diff --git a/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java b/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java index f7d38e8ba..c6c58ecd6 100644 --- a/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java @@ -78,6 +78,50 @@ public class RedissonTimeSeriesTest extends BaseTest { assertThat(iter.hasNext()).isFalse(); } + @Test + public void testRangeReversed() throws InterruptedException { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10"); + t.add(2, "20"); + t.add(3, "30"); + t.add(4, "40"); + + assertThat(t.rangeReversed(1, 4, 2)).containsExactly("40", "30"); + assertThat(t.rangeReversed(1, 4, 0)).containsExactly("40", "30", "20", "10"); + + RTimeSeries t2 = redisson.getTimeSeries("test2"); + t2.add(1, "10"); + t2.add(2, "20"); + t2.add(3, "30", 1, TimeUnit.SECONDS); + t2.add(4, "40"); + + Thread.sleep(1200); + + assertThat(t2.rangeReversed(1, 4, 2)).containsExactly("40", "20"); + } + + @Test + public void testRange() throws InterruptedException { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10"); + t.add(2, "10"); + t.add(3, "30"); + t.add(4, "40"); + + assertThat(t.range(1, 4, 2)).containsExactly("10", "10"); + assertThat(t.range(1, 4, 0)).containsExactly("10", "10", "30", "40"); + + RTimeSeries t2 = redisson.getTimeSeries("test2"); + t2.add(1, "10"); + t2.add(2, "10", 1, TimeUnit.SECONDS); + t2.add(3, "30"); + t2.add(4, "40"); + + Thread.sleep(1200); + + assertThat(t2.range(1, 4, 2)).containsExactly("10", "30"); + } + @Test public void testRemove() { RTimeSeries t = redisson.getTimeSeries("test");