From 69d29a6277ffc78dd7fe92a30f96e2538fbaa7f2 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 7 Feb 2022 12:26:41 +0300 Subject: [PATCH] Fixed - RTimeSeries doesn't handle same values for different timestamps. #4107 --- .../java/org/redisson/RedissonTimeSeries.java | 61 +++++++++++++------ .../org/redisson/RedissonTimeSeriesTest.java | 11 +++- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java index f0b8aa184..d65bc3bac 100644 --- a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java +++ b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java @@ -31,6 +31,7 @@ import org.redisson.iterator.RedissonBaseIterator; import org.redisson.misc.RedissonPromise; import java.util.*; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -102,26 +103,47 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer @Override public RFuture addAllAsync(Map objects, long timeToLive, TimeUnit timeUnit) { - long expirationTime = 92233720368547758L; + long expirationTime = System.currentTimeMillis(); if (timeToLive > 0) { - expirationTime = System.currentTimeMillis() + timeUnit.toMillis(timeToLive); + expirationTime += timeUnit.toMillis(timeToLive); + } else { + expirationTime += TimeUnit.DAYS.toMillis(365 * 100); } List params = new ArrayList<>(); for (Map.Entry entry : objects.entrySet()) { params.add(expirationTime); params.add(entry.getKey()); + byte[] random = new byte[16]; + ThreadLocalRandom.current().nextBytes(random); + params.add(random); params.add(encode(entry.getValue())); } + if (timeToLive > 0) { + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID, + "for i = 1, #ARGV, 4 do " + + "local val = struct.pack('Bc0Lc0', string.len(ARGV[i+2]), ARGV[i+2], string.len(ARGV[i+3]), ARGV[i+3]); " + + "redis.call('zadd', KEYS[1], ARGV[i+1], val); " + + "redis.call('zadd', KEYS[2], ARGV[i], val); " + + "end; ", + Arrays.asList(getRawName(), getTimeoutSetName()), + params.toArray()); + } return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID, - "for i = 1, #ARGV, 3 do " + - "local val = struct.pack('LLc0', tonumber(ARGV[i+1]), string.len(ARGV[i+2]), ARGV[i+2]); " + - "redis.call('zadd', KEYS[1], ARGV[i+1], val); " + - "redis.call('zadd', KEYS[2], ARGV[i], val); " + - "end; ", - Arrays.asList(getRawName(), getTimeoutSetName()), - params.toArray()); + "for i = 1, #ARGV, 4 do " + + "local expirationTime = ARGV[i]; " + + "local values = redis.call('zrangebyscore', KEYS[2], ARGV[i], ARGV[i]); " + + "if #values > 0 then " + + "local lastValue = redis.call('zrange', KEYS[2], -1, -1, 'withscores'); " + + "expirationTime = tonumber(lastValue[2]) + 1; " + + "end; " + + "local val = struct.pack('Bc0Lc0', string.len(ARGV[i+2]), ARGV[i+2], string.len(ARGV[i+3]), ARGV[i+3]); " + + "redis.call('zadd', KEYS[1], ARGV[i+1], val); " + + "redis.call('zadd', KEYS[2], expirationTime, val); " + + "end; ", + Arrays.asList(getRawName(), getTimeoutSetName()), + params.toArray()); } @Override @@ -155,7 +177,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer "if expirationDate ~= false and tonumber(expirationDate) <= tonumber(ARGV[1]) then " + "return nil;" + "end;" + - "local t, val = struct.unpack('LLc0', values[1]); " + + "local t, val = struct.unpack('Bc0Lc0', values[1]); " + "return val;", Arrays.asList(getRawName(), getTimeoutSetName()), System.currentTimeMillis(), timestamp); @@ -245,8 +267,8 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer return listTimestampAsync(-1, 1, RedisCommands.EVAL_FIRST_LIST); } - private RFuture listTimestampAsync(int startScore, int limit, RedisCommand evalCommandType) { - return commandExecutor.evalReadAsync(getRawName(), codec, evalCommandType, + private RFuture listTimestampAsync(int startScore, int limit, RedisCommand evalCommandType) { + return commandExecutor.evalReadAsync(getRawName(), LongCodec.INSTANCE, evalCommandType, "local values;" + "if ARGV[2] == '0' then " + "values = redis.call('zrangebyscore', KEYS[2], ARGV[1], '+inf', 'limit', 0, ARGV[3]);" + @@ -256,7 +278,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer "local result = {}; " + "for i, v in ipairs(values) do " + - "local t, val = struct.unpack('LLc0', v); " + + "local t = redis.call('zscore', KEYS[1], v); " + "table.insert(result, t);" + "end;" + "return result;", @@ -275,7 +297,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer "local result = {}; " + "for i, v in ipairs(values) do " + - "local t, val = struct.unpack('LLc0', v); " + + "local t, val = struct.unpack('Bc0Lc0', v); " + "table.insert(result, val);" + "end;" + "return result;", @@ -300,7 +322,6 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer "for i, v in ipairs(values) do " + "local expirationDate = redis.call('zscore', KEYS[2], v); " + "if tonumber(expirationDate) > tonumber(ARGV[1]) then " + - "local t, val = struct.unpack('LLc0', v); " + "counter = counter + 1; " + "redis.call('zrem', KEYS[2], v); " + "redis.call('zrem', KEYS[1], v); " + @@ -356,7 +377,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer "for i = 1, #values, 2 do " + "local expirationDate = redis.call('zscore', KEYS[2], values[i]); " + "if tonumber(expirationDate) > tonumber(ARGV[1]) then " + - "local t, val = struct.unpack('LLc0', values[i]); " + + "local t, val = struct.unpack('Bc0Lc0', values[i]); " + "table.insert(result, val);" + "table.insert(result, values[i+1]);" + "end;" + @@ -378,7 +399,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer "for i, v in ipairs(values) do " + "local expirationDate = redis.call('zscore', KEYS[2], v); " + "if tonumber(expirationDate) > tonumber(ARGV[1]) then " + - "local t, val = struct.unpack('LLc0', v); " + + "local t, val = struct.unpack('Bc0Lc0', v); " + "table.insert(result, val);" + "end;" + "end;" + @@ -404,7 +425,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer "for i, v in ipairs(values) do " + "local expirationDate = redis.call('zscore', KEYS[2], v); " + "if tonumber(expirationDate) > tonumber(ARGV[1]) then " + - "local t, val = struct.unpack('LLc0', v); " + + "local t, val = struct.unpack('Bc0Lc0', v); " + "table.insert(result, val);" + "end;" + "end;" + @@ -473,7 +494,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer "for i, v in ipairs(values) do " + "redis.call('zrem', KEYS[2], v); " + "redis.call('zrem', KEYS[1], v); " + - "local t, val = struct.unpack('LLc0', v); " + + "local t, val = struct.unpack('Bc0Lc0', v); " + "table.insert(result, val);" + "end;" + "return result;", @@ -498,7 +519,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTimeSer + "for i, value in ipairs(res) do " + "local expirationDate = redis.call('zscore', KEYS[2], value); " + "if tonumber(expirationDate) > tonumber(ARGV[2]) then " + - "local t, val = struct.unpack('LLc0', value); " + + "local t, val = struct.unpack('Bc0Lc0', value); " + "table.insert(result, val);" + "end;" + "end;" + diff --git a/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java b/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java index 9fc7993c9..ac8883e0d 100644 --- a/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java @@ -17,6 +17,15 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class RedissonTimeSeriesTest extends BaseTest { + @Test + public void testMultipleValues() { + RTimeSeries ts = redisson.getTimeSeries("test"); + for (int i=0;i < 10000;i++){ + ts.add(System.currentTimeMillis(), "my-value",60,TimeUnit.DAYS); + } + assertThat(ts.size()).isEqualTo(10000); + } + @Test public void testOrder() { RTimeSeries t = redisson.getTimeSeries("test"); @@ -170,7 +179,7 @@ public class RedissonTimeSeriesTest extends BaseTest { } @Test - public void testPoll() { + public void testPoll() throws InterruptedException { RTimeSeries t = redisson.getTimeSeries("test"); t.add(1, "10"); t.add(2, "20");