Fixed - RTimeSeries doesn't handle same values for different timestamps. #4107

pull/4113/head
Nikita Koksharov 3 years ago
parent 557b519139
commit 69d29a6277

@ -31,6 +31,7 @@ import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@ -102,26 +103,47 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
@Override
public RFuture<Void> addAllAsync(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit) {
long expirationTime = 92233720368547758L;
long expirationTime = System.currentTimeMillis();
if (timeToLive > 0) {
expirationTime = System.currentTimeMillis() + timeUnit.toMillis(timeToLive);
expirationTime += timeUnit.toMillis(timeToLive);
} else {
expirationTime += TimeUnit.DAYS.toMillis(365 * 100);
}
List<Object> params = new ArrayList<>();
for (Map.Entry<Long, V> entry : objects.entrySet()) {
params.add(expirationTime);
params.add(entry.getKey());
byte[] random = new byte[16];
ThreadLocalRandom.current().nextBytes(random);
params.add(random);
params.add(encode(entry.getValue()));
}
if (timeToLive > 0) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"for i = 1, #ARGV, 4 do " +
"local val = struct.pack('Bc0Lc0', string.len(ARGV[i+2]), ARGV[i+2], string.len(ARGV[i+3]), ARGV[i+3]); " +
"redis.call('zadd', KEYS[1], ARGV[i+1], val); " +
"redis.call('zadd', KEYS[2], ARGV[i], val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
params.toArray());
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"for i = 1, #ARGV, 3 do " +
"local val = struct.pack('LLc0', tonumber(ARGV[i+1]), string.len(ARGV[i+2]), ARGV[i+2]); " +
"redis.call('zadd', KEYS[1], ARGV[i+1], val); " +
"redis.call('zadd', KEYS[2], ARGV[i], val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
params.toArray());
"for i = 1, #ARGV, 4 do " +
"local expirationTime = ARGV[i]; " +
"local values = redis.call('zrangebyscore', KEYS[2], ARGV[i], ARGV[i]); " +
"if #values > 0 then " +
"local lastValue = redis.call('zrange', KEYS[2], -1, -1, 'withscores'); " +
"expirationTime = tonumber(lastValue[2]) + 1; " +
"end; " +
"local val = struct.pack('Bc0Lc0', string.len(ARGV[i+2]), ARGV[i+2], string.len(ARGV[i+3]), ARGV[i+3]); " +
"redis.call('zadd', KEYS[1], ARGV[i+1], val); " +
"redis.call('zadd', KEYS[2], expirationTime, val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
params.toArray());
}
@Override
@ -155,7 +177,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"if expirationDate ~= false and tonumber(expirationDate) <= tonumber(ARGV[1]) then " +
"return nil;" +
"end;" +
"local t, val = struct.unpack('LLc0', values[1]); " +
"local t, val = struct.unpack('Bc0Lc0', values[1]); " +
"return val;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), timestamp);
@ -245,8 +267,8 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
return listTimestampAsync(-1, 1, RedisCommands.EVAL_FIRST_LIST);
}
private <T> RFuture<T> listTimestampAsync(int startScore, int limit, RedisCommand<?> evalCommandType) {
return commandExecutor.evalReadAsync(getRawName(), codec, evalCommandType,
private RFuture<Long> listTimestampAsync(int startScore, int limit, RedisCommand<?> evalCommandType) {
return commandExecutor.evalReadAsync(getRawName(), LongCodec.INSTANCE, evalCommandType,
"local values;" +
"if ARGV[2] == '0' then " +
"values = redis.call('zrangebyscore', KEYS[2], ARGV[1], '+inf', 'limit', 0, ARGV[3]);" +
@ -256,7 +278,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"local result = {}; " +
"for i, v in ipairs(values) do " +
"local t, val = struct.unpack('LLc0', v); " +
"local t = redis.call('zscore', KEYS[1], v); " +
"table.insert(result, t);" +
"end;" +
"return result;",
@ -275,7 +297,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"local result = {}; " +
"for i, v in ipairs(values) do " +
"local t, val = struct.unpack('LLc0', v); " +
"local t, val = struct.unpack('Bc0Lc0', v); " +
"table.insert(result, val);" +
"end;" +
"return result;",
@ -300,7 +322,6 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"for i, v in ipairs(values) do " +
"local expirationDate = redis.call('zscore', KEYS[2], v); " +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('LLc0', v); " +
"counter = counter + 1; " +
"redis.call('zrem', KEYS[2], v); " +
"redis.call('zrem', KEYS[1], v); " +
@ -356,7 +377,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"for i = 1, #values, 2 do " +
"local expirationDate = redis.call('zscore', KEYS[2], values[i]); " +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('LLc0', values[i]); " +
"local t, val = struct.unpack('Bc0Lc0', values[i]); " +
"table.insert(result, val);" +
"table.insert(result, values[i+1]);" +
"end;" +
@ -378,7 +399,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"for i, v in ipairs(values) do " +
"local expirationDate = redis.call('zscore', KEYS[2], v); " +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('LLc0', v); " +
"local t, val = struct.unpack('Bc0Lc0', v); " +
"table.insert(result, val);" +
"end;" +
"end;" +
@ -404,7 +425,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"for i, v in ipairs(values) do " +
"local expirationDate = redis.call('zscore', KEYS[2], v); " +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('LLc0', v); " +
"local t, val = struct.unpack('Bc0Lc0', v); " +
"table.insert(result, val);" +
"end;" +
"end;" +
@ -473,7 +494,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"for i, v in ipairs(values) do " +
"redis.call('zrem', KEYS[2], v); " +
"redis.call('zrem', KEYS[1], v); " +
"local t, val = struct.unpack('LLc0', v); " +
"local t, val = struct.unpack('Bc0Lc0', v); " +
"table.insert(result, val);" +
"end;" +
"return result;",
@ -498,7 +519,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
+ "for i, value in ipairs(res) do "
+ "local expirationDate = redis.call('zscore', KEYS[2], value); " +
"if tonumber(expirationDate) > tonumber(ARGV[2]) then " +
"local t, val = struct.unpack('LLc0', value); " +
"local t, val = struct.unpack('Bc0Lc0', value); " +
"table.insert(result, val);" +
"end;"
+ "end;" +

@ -17,6 +17,15 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testMultipleValues() {
RTimeSeries<String> ts = redisson.getTimeSeries("test");
for (int i=0;i < 10000;i++){
ts.add(System.currentTimeMillis(), "my-value",60,TimeUnit.DAYS);
}
assertThat(ts.size()).isEqualTo(10000);
}
@Test
public void testOrder() {
RTimeSeries<String> t = redisson.getTimeSeries("test");
@ -170,7 +179,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
}
@Test
public void testPoll() {
public void testPoll() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "20");

Loading…
Cancel
Save