Feature - labels support for RTimeSeries object. #4553

pull/4612/head
Nikita Koksharov 2 years ago
parent a483625e13
commit 47f6a3926b

@ -156,12 +156,12 @@ public class Redisson implements RedissonClient {
}
@Override
public <V> RTimeSeries<V> getTimeSeries(String name) {
public <V, L> RTimeSeries<V, L> getTimeSeries(String name) {
return new RedissonTimeSeries<>(evictionScheduler, commandExecutor, name);
}
@Override
public <V> RTimeSeries<V> getTimeSeries(String name, Codec codec) {
public <V, L> RTimeSeries<V, L> getTimeSeries(String name, Codec codec) {
return new RedissonTimeSeries<>(codec, evictionScheduler, commandExecutor, name);
}

@ -460,17 +460,17 @@ public class RedissonReactive implements RedissonReactiveClient {
}
@Override
public <V> RTimeSeriesReactive<V> getTimeSeries(String name) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(evictionScheduler, commandExecutor, name);
public <V, L> RTimeSeriesReactive<V, L> getTimeSeries(String name) {
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<V, L>(evictionScheduler, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesReactive<V>(timeSeries, this), RTimeSeriesReactive.class);
new RedissonTimeSeriesReactive<V, L>(timeSeries, this), RTimeSeriesReactive.class);
}
@Override
public <V> RTimeSeriesReactive<V> getTimeSeries(String name, Codec codec) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(codec, evictionScheduler, commandExecutor, name);
public <V, L> RTimeSeriesReactive<V, L> getTimeSeries(String name, Codec codec) {
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<V, L>(codec, evictionScheduler, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesReactive<V>(timeSeries, this), RTimeSeriesReactive.class);
new RedissonTimeSeriesReactive<V, L>(timeSeries, this), RTimeSeriesReactive.class);
}
@Override

@ -439,17 +439,17 @@ public class RedissonRx implements RedissonRxClient {
}
@Override
public <V> RTimeSeriesRx<V> getTimeSeries(String name) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(evictionScheduler, commandExecutor, name);
public <V, L> RTimeSeriesRx<V, L> getTimeSeries(String name) {
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<V, L>(evictionScheduler, commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesRx<V>(timeSeries, this), RTimeSeriesRx.class);
new RedissonTimeSeriesRx<V, L>(timeSeries, this), RTimeSeriesRx.class);
}
@Override
public <V> RTimeSeriesRx<V> getTimeSeries(String name, Codec codec) {
RTimeSeries<V> timeSeries = new RedissonTimeSeries<V>(codec, evictionScheduler, commandExecutor, name);
public <V, L> RTimeSeriesRx<V, L> getTimeSeries(String name, Codec codec) {
RTimeSeries<V, L> timeSeries = new RedissonTimeSeries<V, L>(codec, evictionScheduler, commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, timeSeries,
new RedissonTimeSeriesRx<V>(timeSeries, this), RTimeSeriesRx.class);
new RedissonTimeSeriesRx<V, L>(timeSeries, this), RTimeSeriesRx.class);
}
@Override

@ -25,11 +25,13 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.TimeSeriesEntryReplayDecoder;
import org.redisson.client.protocol.decoder.TimeSeriesSingleEntryReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.misc.CompletableFutureWrapper;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -40,7 +42,7 @@ import java.util.stream.Stream;
* @author Nikita Koksharov
*
*/
public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSeries<V> {
public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTimeSeries<V, L> {
private final EvictionScheduler evictionScheduler;
@ -76,6 +78,16 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
return addAllAsync(Collections.singletonMap(timestamp, object));
}
@Override
public void add(long timestamp, V object, L label) {
addAll(Collections.singletonList(new TimeSeriesEntry<>(timestamp, object, label)));
}
@Override
public RFuture<Void> addAsync(long timestamp, V object, L label) {
return addAllAsync(Collections.singletonList(new TimeSeriesEntry<>(timestamp, object, label)));
}
@Override
public void addAll(Map<Long, V> objects) {
addAll(objects, 0, null);
@ -91,6 +103,16 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
return addAllAsync(Collections.singletonMap(timestamp, object), timeToLive, timeUnit);
}
@Override
public void add(long timestamp, V object, L label, Duration timeToLive) {
addAll(Collections.singletonList(new TimeSeriesEntry<>(timestamp, object, label)), timeToLive);
}
@Override
public RFuture<Void> addAsync(long timestamp, V object, L label, Duration timeToLive) {
return addAllAsync(Collections.singletonList(new TimeSeriesEntry<>(timestamp, object, label)), timeToLive);
}
@Override
public void addAll(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit) {
get(addAllAsync(objects, timeToLive, timeUnit));
@ -123,7 +145,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
if (timeToLive > 0) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"for i = 2, #ARGV, 3 do " +
"local val = struct.pack('Bc0Lc0', string.len(ARGV[i+1]), ARGV[i+1], string.len(ARGV[i+2]), ARGV[i+2]); " +
"local val = struct.pack('BBc0Lc0Lc0', 2, string.len(ARGV[i+1]), ARGV[i+1], string.len(ARGV[i+2]), ARGV[i+2], 0, ''); " +
"redis.call('zadd', KEYS[1], ARGV[i], val); " +
"redis.call('zadd', KEYS[2], ARGV[1], val); " +
"end; ",
@ -137,7 +159,82 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"expirationTime = tonumber(lastValues[2]); " +
"end; " +
"for i = 2, #ARGV, 3 do " +
"local val = struct.pack('Bc0Lc0', string.len(ARGV[i+1]), ARGV[i+1], string.len(ARGV[i+2]), ARGV[i+2]); " +
"local val = struct.pack('BBc0Lc0Lc0', 2, string.len(ARGV[i+1]), ARGV[i+1], string.len(ARGV[i+2]), ARGV[i+2], 0, ''); " +
"redis.call('zadd', KEYS[1], ARGV[i], val); " +
"redis.call('zadd', KEYS[2], expirationTime + 1, val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
params.toArray());
}
@Override
public void addAll(Collection<TimeSeriesEntry<V, L>> entries) {
addAll(entries, null);
}
@Override
public RFuture<Void> addAllAsync(Collection<TimeSeriesEntry<V, L>> entries) {
return addAllAsync(entries, null);
}
@Override
public void addAll(Collection<TimeSeriesEntry<V, L>> entries, Duration timeToLive) {
get(addAllAsync(entries, timeToLive));
}
@Override
public RFuture<Void> addAllAsync(Collection<TimeSeriesEntry<V, L>> entries, Duration timeToLive) {
long expirationTime = System.currentTimeMillis();
if (timeToLive != null) {
expirationTime += timeToLive.toMillis();
} else {
expirationTime += TimeUnit.DAYS.toMillis(365 * 100);
}
List<Object> params = new ArrayList<>();
params.add(expirationTime);
for (TimeSeriesEntry<V, L> entry : entries) {
params.add(entry.getTimestamp());
byte[] random = new byte[16];
ThreadLocalRandom.current().nextBytes(random);
if (entry.getLabel() == null) {
params.add(2);
} else {
params.add(3);
}
params.add(random);
encode(params, entry.getValue());
if (entry.getLabel() == null) {
params.add("");
} else {
encode(params, entry.getLabel());
}
}
if (timeToLive != null) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"for i = 2, #ARGV, 5 do " +
"local val = struct.pack('BBc0Lc0Lc0', ARGV[i+1], " +
"string.len(ARGV[i+2]), ARGV[i+2], " +
"string.len(ARGV[i+3]), ARGV[i+3], " +
"string.len(ARGV[i+4]), ARGV[i+4]); " +
"redis.call('zadd', KEYS[1], ARGV[i], val); " +
"redis.call('zadd', KEYS[2], ARGV[1], val); " +
"end; ",
Arrays.asList(getRawName(), getTimeoutSetName()),
params.toArray());
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
"local expirationTime = ARGV[1]; " +
"local lastValues = redis.call('zrange', KEYS[2], -1, -1, 'withscores'); " +
"if (#lastValues > 0 and tonumber(lastValues[2]) > tonumber(ARGV[1])) then " +
"expirationTime = tonumber(lastValues[2]); " +
"end; " +
"for i = 2, #ARGV, 5 do " +
"local val = struct.pack('BBc0Lc0Lc0', ARGV[i+1]," +
"string.len(ARGV[i+2]), ARGV[i+2], " +
"string.len(ARGV[i+3]), ARGV[i+3], " +
"string.len(ARGV[i+4]), ARGV[i+4]); " +
"redis.call('zadd', KEYS[1], ARGV[i], val); " +
"redis.call('zadd', KEYS[2], expirationTime + 1, val); " +
"end; ",
@ -176,12 +273,38 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"if expirationDate ~= false and tonumber(expirationDate) <= tonumber(ARGV[1]) then " +
"return nil;" +
"end;" +
"local t, val = struct.unpack('Bc0Lc0', values[1]); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[1]); " +
"return val;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), timestamp);
}
@Override
public TimeSeriesEntry<V, L> getEntry(long timestamp) {
return get(getEntryAsync(timestamp));
}
@Override
public RFuture<TimeSeriesEntry<V, L>> getEntryAsync(long timestamp) {
return commandExecutor.evalReadAsync(getRawName(), codec, ENTRY,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[2]);" +
"if #values == 0 then " +
"return nil;" +
"end;" +
"local expirationDate = redis.call('zscore', KEYS[2], values[1]); " +
"if expirationDate ~= false and tonumber(expirationDate) <= tonumber(ARGV[1]) then " +
"return nil;" +
"end;" +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[1]); " +
"if n == 2 then " +
"return {n, ARGV[2], val};" +
"end;" +
"return {n, ARGV[2], val, label};",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), timestamp);
}
@Override
public boolean remove(long timestamp) {
return get(removeAsync(timestamp));
@ -296,7 +419,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"local result = {}; " +
"for i, v in ipairs(values) do " +
"local t, val = struct.unpack('Bc0Lc0', v); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', v); " +
"table.insert(result, val);" +
"end;" +
"return result;",
@ -338,29 +461,32 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
}
@Override
public Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp) {
public Collection<TimeSeriesEntry<V, L>> entryRange(long startTimestamp, long endTimestamp) {
return get(entryRangeAsync(false, startTimestamp, endTimestamp, 0));
}
@Override
public Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp) {
public Collection<TimeSeriesEntry<V, L>> entryRangeReversed(long startTimestamp, long endTimestamp) {
return get(entryRangeAsync(true, startTimestamp, endTimestamp, 0));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp) {
public RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp) {
return entryRangeAsync(true, startTimestamp, endTimestamp, 0);
}
private static final RedisCommand<List<TimeSeriesEntry<Object>>> ENTRIES =
new RedisCommand<>("EVAL", new TimeSeriesEntryReplayDecoder<>());
private static final RedisCommand<List<TimeSeriesEntry<Object, Object>>> ENTRIES =
new RedisCommand<>("EVAL", new TimeSeriesEntryReplayDecoder());
private static final RedisCommand<TimeSeriesEntry<Object, Object>> ENTRY =
new RedisCommand<>("EVAL", new TimeSeriesSingleEntryReplayDecoder());
@Override
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp) {
public RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeAsync(long startTimestamp, long endTimestamp) {
return entryRangeAsync(false, startTimestamp, endTimestamp, 0);
}
private RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(boolean reverse, long startTimestamp, long endTimestamp, int limit) {
private RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeAsync(boolean reverse, long startTimestamp, long endTimestamp, int limit) {
return commandExecutor.evalReadAsync(getRawName(), codec, ENTRIES,
"local result = {}; " +
"local from = ARGV[2]; " +
@ -385,20 +511,25 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"for i=1, #values, 2 do " +
"local expirationDate = redis.call('zscore', KEYS[2], values[i]);" +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('Bc0Lc0', values[i]); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[i]); " +
"table.insert(result, val);" +
"if n == 2 then " +
"label = 0; " +
"end; " +
"table.insert(result, label);" +
"table.insert(result, n);" +
"table.insert(result, values[i+1]);" +
"end;" +
"end;" +
"if limit == 0 or #result/2 == tonumber(ARGV[4]) or #values/2 < tonumber(limit) then " +
"if limit == 0 or #result/4 == tonumber(ARGV[4]) or #values/2 < limit then " +
"return result;" +
"end;" +
"from = '(' .. values[#values];" +
"limit = tonumber(ARGV[4]) - #result/2;" +
"limit = tonumber(ARGV[4]) - #result/4;" +
"end;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), startTimestamp, endTimestamp, limit, Boolean.compare(reverse, false));
System.currentTimeMillis(), startTimestamp, endTimestamp, limit, Boolean.compare(reverse, false), encode((Object) null));
}
@Override
@ -456,7 +587,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"for i=1, #values, 2 do " +
"local expirationDate = redis.call('zscore', KEYS[2], values[i]);" +
"if tonumber(expirationDate) > tonumber(ARGV[1]) then " +
"local t, val = struct.unpack('Bc0Lc0', values[i]); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[i]); " +
"table.insert(result, val);" +
"end;" +
"end;" +
@ -472,22 +603,22 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
}
@Override
public Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp, int limit) {
public Collection<TimeSeriesEntry<V, L>> entryRange(long startTimestamp, long endTimestamp, int limit) {
return get(entryRangeAsync(startTimestamp, endTimestamp, limit));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit) {
public RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit) {
return entryRangeAsync(false, startTimestamp, endTimestamp, limit);
}
@Override
public Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp, int limit) {
public Collection<TimeSeriesEntry<V, L>> entryRangeReversed(long startTimestamp, long endTimestamp, int limit) {
return get(entryRangeReversedAsync(startTimestamp, endTimestamp, limit));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit) {
public RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit) {
return entryRangeAsync(true, startTimestamp, endTimestamp, limit);
}
@ -551,7 +682,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
"for i, v in ipairs(values) do " +
"redis.call('zrem', KEYS[2], v); " +
"redis.call('zrem', KEYS[1], v); " +
"local t, val = struct.unpack('Bc0Lc0', v); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', v); " +
"table.insert(result, val);" +
"end;" +
"return result;",
@ -576,7 +707,7 @@ public class RedissonTimeSeries<V> extends RedissonExpirable implements RTimeSer
+ "for i, value in ipairs(res) do "
+ "local expirationDate = redis.call('zscore', KEYS[2], value); " +
"if tonumber(expirationDate) > tonumber(ARGV[2]) then " +
"local t, val = struct.unpack('Bc0Lc0', value); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', value); " +
"table.insert(result, val);" +
"end;"
+ "end;" +

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
@ -26,18 +27,30 @@ import java.util.stream.Stream;
*
* @author Nikita Koksharov
*
* @param <V> value type
* @param <L> label type
*/
public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsync<V>, RDestroyable {
public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesAsync<V, L>, RDestroyable {
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @param timestamp object timestamp
* @param object object itself
*/
void add(long timestamp, V object);
/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
*/
void add(long timestamp, V object, L label);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
@ -46,6 +59,13 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
void addAll(Map<Long, V> objects);
/**
* Adds all entries collection to this time-series collection.
*
* @param entries collection of time series entries
*/
void addAll(Collection<TimeSeriesEntry<V, L>> entries);
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
@ -57,6 +77,17 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
void add(long timestamp, V object, long timeToLive, TimeUnit timeUnit);
/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
* @param timeToLive time to live interval
*/
void add(long timestamp, V object, L label, Duration timeToLive);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
@ -67,6 +98,15 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
void addAll(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit);
/**
* Adds all time series entries collection to this time-series collection.
* Specified time to live interval applied to all entries defined in collection.
*
* @param entries collection of time series entries
* @param timeToLive time to live interval
*/
void addAll(Collection<TimeSeriesEntry<V, L>> entries, Duration timeToLive);
/**
* Returns size of this set.
*
@ -82,6 +122,14 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
*/
V get(long timestamp);
/**
* Returns time series entry by specified <code>timestamp</code> or <code>null</code> if it doesn't exist.
*
* @param timestamp object timestamp
* @return time series entry
*/
TimeSeriesEntry<V, L> getEntry(long timestamp);
/**
* Removes object by specified <code>timestamp</code>.
*
@ -220,7 +268,7 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
* @param endTimestamp - end timestamp
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp);
Collection<TimeSeriesEntry<V, L>> entryRange(long startTimestamp, long endTimestamp);
/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
@ -230,7 +278,7 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
* @param limit result size limit
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRange(long startTimestamp, long endTimestamp, int limit);
Collection<TimeSeriesEntry<V, L>> entryRange(long startTimestamp, long endTimestamp, int limit);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
@ -239,7 +287,7 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
* @param endTimestamp - end timestamp
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp);
Collection<TimeSeriesEntry<V, L>> entryRangeReversed(long startTimestamp, long endTimestamp);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
@ -249,7 +297,7 @@ public interface RTimeSeries<V> extends RExpirable, Iterable<V>, RTimeSeriesAsyn
* @param limit result size limit
* @return elements collection
*/
Collection<TimeSeriesEntry<V>> entryRangeReversed(long startTimestamp, long endTimestamp, int limit);
Collection<TimeSeriesEntry<V, L>> entryRangeReversed(long startTimestamp, long endTimestamp, int limit);
/**
* Returns stream of elements in this time-series collection.

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -24,19 +25,32 @@ import java.util.concurrent.TimeUnit;
*
* @author Nikita Koksharov
*
* @param <V> value type
* @param <L> label type
*
*/
public interface RTimeSeriesAsync<V> extends RExpirableAsync {
public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @param timestamp object timestamp
* @param object object itself
* @return void
*/
RFuture<Void> addAsync(long timestamp, V object);
/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
*/
RFuture<Void> addAsync(long timestamp, V object, L label);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
@ -46,6 +60,14 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Void> addAllAsync(Map<Long, V> objects);
/**
* Adds all entries collection to this time-series collection.
*
* @param entries collection of time series entries
* @return void
*/
RFuture<Void> addAllAsync(Collection<TimeSeriesEntry<V, L>> entries);
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
@ -58,6 +80,18 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Void> addAsync(long timestamp, V object, long timeToLive, TimeUnit timeUnit);
/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
* @param timeToLive time to live interval
* @return void
*/
RFuture<Void> addAsync(long timestamp, V object, L label, Duration timeToLive);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
@ -69,6 +103,16 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<Void> addAllAsync(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit);
/**
* Adds all time series entries collection to this time-series collection.
* Specified time to live interval applied to all entries defined in collection.
*
* @param entries collection of time series entries
* @param timeToLive time to live interval
* @return void
*/
RFuture<Void> addAllAsync(Collection<TimeSeriesEntry<V, L>> entries, Duration timeToLive);
/**
* Returns size of this set.
*
@ -84,6 +128,14 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
*/
RFuture<V> getAsync(long timestamp);
/**
* Returns time series entry by specified <code>timestamp</code> or <code>null</code> if it doesn't exist.
*
* @param timestamp object timestamp
* @return time series entry
*/
RFuture<TimeSeriesEntry<V, L>> getEntryAsync(long timestamp);
/**
* Removes object by specified <code>timestamp</code>.
*
@ -222,7 +274,7 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
* @param endTimestamp - end timestamp
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp);
RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeAsync(long startTimestamp, long endTimestamp);
/**
* Returns ordered entries of this time-series collection within timestamp range. Including boundary values.
@ -232,7 +284,7 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
* @param limit result size limit
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit);
RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeAsync(long startTimestamp, long endTimestamp, int limit);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
@ -241,7 +293,7 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
* @param endTimestamp - end timestamp
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp);
RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
@ -251,6 +303,6 @@ public interface RTimeSeriesAsync<V> extends RExpirableAsync {
* @param limit result size limit
* @return elements collection
*/
RFuture<Collection<TimeSeriesEntry<V>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit);
RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeReversedAsync(long startTimestamp, long endTimestamp, int limit);
}

@ -18,6 +18,7 @@ package org.redisson.api;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -27,8 +28,10 @@ import java.util.concurrent.TimeUnit;
*
* @author Nikita Koksharov
*
* @param <V> value type
* @param <L> label type
*/
public interface RTimeSeriesReactive<V> extends RExpirableReactive {
public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
/**
* Returns iterator over collection elements
@ -41,12 +44,23 @@ public interface RTimeSeriesReactive<V> extends RExpirableReactive {
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @param timestamp object timestamp
* @param object object itself
* @return void
*/
Mono<Void> add(long timestamp, V object);
/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
* @return void
*/
Mono<Void> add(long timestamp, V object, L label);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
@ -56,6 +70,14 @@ public interface RTimeSeriesReactive<V> extends RExpirableReactive {
*/
Mono<Void> addAll(Map<Long, V> objects);
/**
* Adds all entries collection to this time-series collection.
*
* @param entries collection of time series entries
* @return void
*/
Mono<Void> addAll(Collection<TimeSeriesEntry<V, L>> entries);
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
@ -68,6 +90,18 @@ public interface RTimeSeriesReactive<V> extends RExpirableReactive {
*/
Mono<Void> add(long timestamp, V object, long timeToLive, TimeUnit timeUnit);
/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
* @param timeToLive time to live interval
* @return void
*/
Mono<Void> add(long timestamp, V object, L label, Duration timeToLive);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
@ -79,6 +113,16 @@ public interface RTimeSeriesReactive<V> extends RExpirableReactive {
*/
Mono<Void> addAll(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit);
/**
* Adds all time series entries collection to this time-series collection.
* Specified time to live interval applied to all entries defined in collection.
*
* @param entries collection of time series entries
* @param timeToLive time to live interval
* @return void
*/
Mono<Void> addAll(Collection<TimeSeriesEntry<V, L>> entries, Duration timeToLive);
/**
* Returns size of this set.
*
@ -94,6 +138,14 @@ public interface RTimeSeriesReactive<V> extends RExpirableReactive {
*/
Mono<V> get(long timestamp);
/**
* Returns time series entry by specified <code>timestamp</code> or <code>null</code> if it doesn't exist.
*
* @param timestamp object timestamp
* @return time series entry
*/
Mono<TimeSeriesEntry<V, L>> getEntry(long timestamp);
/**
* Removes object by specified <code>timestamp</code>.
*
@ -212,7 +264,7 @@ public interface RTimeSeriesReactive<V> extends RExpirableReactive {
* @param endTimestamp - end timestamp
* @return elements collection
*/
Mono<Collection<TimeSeriesEntry<V>>> entryRange(long startTimestamp, long endTimestamp);
Mono<Collection<TimeSeriesEntry<V, L>>> entryRange(long startTimestamp, long endTimestamp);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
@ -221,6 +273,6 @@ public interface RTimeSeriesReactive<V> extends RExpirableReactive {
* @param endTimestamp - end timestamp
* @return elements collection
*/
Mono<Collection<TimeSeriesEntry<V>>> entryRangeReversed(long startTimestamp, long endTimestamp);
Mono<Collection<TimeSeriesEntry<V, L>>> entryRangeReversed(long startTimestamp, long endTimestamp);
}

@ -20,6 +20,7 @@ import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -29,8 +30,10 @@ import java.util.concurrent.TimeUnit;
*
* @author Nikita Koksharov
*
* @param <V> value type
* @param <L> label type
*/
public interface RTimeSeriesRx<V> extends RExpirableRx {
public interface RTimeSeriesRx<V, L> extends RExpirableRx {
/**
* Returns iterator over collection elements
@ -43,12 +46,22 @@ public interface RTimeSeriesRx<V> extends RExpirableRx {
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @param object - object itself
* @param timestamp object timestamp
* @param object object itself
* @return void
*/
Completable add(long timestamp, V object);
/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
*/
Completable add(long timestamp, V object, L label);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
@ -58,6 +71,13 @@ public interface RTimeSeriesRx<V> extends RExpirableRx {
*/
Completable addAll(Map<Long, V> objects);
/**
* Adds all entries collection to this time-series collection.
*
* @param entries collection of time series entries
*/
Completable addAll(Collection<TimeSeriesEntry<V, L>> entries);
/**
* Adds element to this time-series collection
* by specified <code>timestamp</code>.
@ -70,6 +90,17 @@ public interface RTimeSeriesRx<V> extends RExpirableRx {
*/
Completable add(long timestamp, V object, long timeToLive, TimeUnit timeUnit);
/**
* Adds element with <code>label</code> to this time-series collection
* by specified <code>timestamp</code>.
*
* @param timestamp object timestamp
* @param object object itself
* @param label object label
* @param timeToLive time to live interval
*/
Completable add(long timestamp, V object, L label, Duration timeToLive);
/**
* Adds all elements contained in the specified map to this time-series collection.
* Map contains of timestamp mapped by object.
@ -81,6 +112,15 @@ public interface RTimeSeriesRx<V> extends RExpirableRx {
*/
Completable addAll(Map<Long, V> objects, long timeToLive, TimeUnit timeUnit);
/**
* Adds all time series entries collection to this time-series collection.
* Specified time to live interval applied to all entries defined in collection.
*
* @param entries collection of time series entries
* @param timeToLive time to live interval
*/
Completable addAll(Collection<TimeSeriesEntry<V, L>> entries, Duration timeToLive);
/**
* Returns size of this set.
*
@ -96,6 +136,14 @@ public interface RTimeSeriesRx<V> extends RExpirableRx {
*/
Maybe<V> get(long timestamp);
/**
* Returns time series entry by specified <code>timestamp</code> or <code>null</code> if it doesn't exist.
*
* @param timestamp object timestamp
* @return time series entry
*/
Maybe<TimeSeriesEntry<V, L>> getEntry(long timestamp);
/**
* Removes object by specified <code>timestamp</code>.
*
@ -214,7 +262,7 @@ public interface RTimeSeriesRx<V> extends RExpirableRx {
* @param endTimestamp - end timestamp
* @return elements collection
*/
Single<Collection<TimeSeriesEntry<V>>> entryRange(long startTimestamp, long endTimestamp);
Single<Collection<TimeSeriesEntry<V, L>>> entryRange(long startTimestamp, long endTimestamp);
/**
* Returns entries of this time-series collection in reverse order within timestamp range. Including boundary values.
@ -223,6 +271,6 @@ public interface RTimeSeriesRx<V> extends RExpirableRx {
* @param endTimestamp - end timestamp
* @return elements collection
*/
Single<Collection<TimeSeriesEntry<V>>> entryRangeReversed(long startTimestamp, long endTimestamp);
Single<Collection<TimeSeriesEntry<V, L>>> entryRangeReversed(long startTimestamp, long endTimestamp);
}

@ -38,22 +38,24 @@ public interface RedissonClient {
/**
* Returns time-series instance by <code>name</code>
*
* @param <V> type of value
* @param <V> value type
* @param <L> label type
* @param name - name of instance
* @return RTimeSeries object
*/
<V> RTimeSeries<V> getTimeSeries(String name);
<V, L> RTimeSeries<V, L> getTimeSeries(String name);
/**
* Returns time-series instance by <code>name</code>
* using provided <code>codec</code> for values.
*
* @param <V> type of value
* @param <V> value type
* @param <L> label type
* @param name - name of instance
* @param codec - codec for values
* @return RTimeSeries object
*/
<V> RTimeSeries<V> getTimeSeries(String name, Codec codec);
<V, L> RTimeSeries<V, L> getTimeSeries(String name, Codec codec);
/**
* Returns stream instance by <code>name</code>

@ -36,22 +36,24 @@ public interface RedissonReactiveClient {
/**
* Returns time-series instance by <code>name</code>
*
* @param <V> type of value
* @param <V> value type
* @param <L> label type
* @param name - name of instance
* @return RTimeSeries object
*/
<V> RTimeSeriesReactive<V> getTimeSeries(String name);
<V, L> RTimeSeriesReactive<V, L> getTimeSeries(String name);
/**
* Returns time-series instance by <code>name</code>
* using provided <code>codec</code> for values.
*
* @param <V> type of value
* @param <V> value type
* @param <L> label type
* @param name - name of instance
* @param codec - codec for values
* @return RTimeSeries object
*/
<V> RTimeSeriesReactive<V> getTimeSeries(String name, Codec codec);
<V, L> RTimeSeriesReactive<V, L> getTimeSeries(String name, Codec codec);
/**
* Returns stream instance by <code>name</code>

@ -35,22 +35,24 @@ public interface RedissonRxClient {
/**
* Returns time-series instance by <code>name</code>
*
* @param <V> type of value
* @param <V> value type
* @param <L> label type
* @param name - name of instance
* @return RTimeSeries object
*/
<V> RTimeSeriesRx<V> getTimeSeries(String name);
<V, L> RTimeSeriesRx<V, L> getTimeSeries(String name);
/**
* Returns time-series instance by <code>name</code>
* using provided <code>codec</code> for values.
*
* @param <V> type of value
* @param <V> value type
* @param <L> label type
* @param name - name of instance
* @param codec - codec for values
* @return RTimeSeries object
*/
<V> RTimeSeriesRx<V> getTimeSeries(String name, Codec codec);
<V, L> RTimeSeriesRx<V, L> getTimeSeries(String name, Codec codec);
/**
* Returns stream instance by <code>name</code>

@ -22,17 +22,27 @@ import java.util.Objects;
*
* @author Nikita Koksharov
*
* @param <V> value type
* @param <L> label type
*/
public class TimeSeriesEntry<V> {
public class TimeSeriesEntry<V, L> {
private long timestamp;
private V value;
private L label;
public TimeSeriesEntry(long timestamp, V value) {
this.timestamp = timestamp;
this.value = value;
}
public TimeSeriesEntry(long timestamp, V value, L label) {
this.timestamp = timestamp;
this.value = value;
this.label = label;
}
public long getTimestamp() {
return timestamp;
}
@ -41,18 +51,21 @@ public class TimeSeriesEntry<V> {
return value;
}
public L getLabel() {
return label;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TimeSeriesEntry<?> entry = (TimeSeriesEntry<?>) o;
return timestamp == entry.timestamp
&& Objects.equals(value, entry.value);
TimeSeriesEntry<?, ?> that = (TimeSeriesEntry<?, ?>) o;
return timestamp == that.timestamp && value.equals(that.value) && Objects.equals(label, that.label);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, value);
return Objects.hash(timestamp, value, label);
}
@Override

@ -28,23 +28,27 @@ import java.util.List;
*
* @author Nikita Koksharov
*
* @param <T> type of element
*/
public class TimeSeriesEntryReplayDecoder<T> implements MultiDecoder<List<TimeSeriesEntry<T>>> {
public class TimeSeriesEntryReplayDecoder implements MultiDecoder<List<TimeSeriesEntry<Object, Object>>> {
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
if (paramNum % 2 != 0) {
if (paramNum % 4 == 2 || paramNum % 4 == 3) {
return LongCodec.INSTANCE.getValueDecoder();
}
return MultiDecoder.super.getDecoder(codec, paramNum, state);
}
@Override
public List<TimeSeriesEntry<T>> decode(List<Object> parts, State state) {
List<TimeSeriesEntry<T>> result = new ArrayList<>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new TimeSeriesEntry<T>((Long) parts.get(i + 1), (T) parts.get(i)));
public List<TimeSeriesEntry<Object, Object>> decode(List<Object> parts, State state) {
List<TimeSeriesEntry<Object, Object>> result = new ArrayList<>();
for (int i = 0; i < parts.size(); i += 4) {
Long n = (Long) parts.get(i + 2);
Object label = null;
if (n == 3) {
label = parts.get(i + 1);
}
result.add(new TimeSeriesEntry<>((Long) parts.get(i + 3), parts.get(i), label));
}
return result;
}

@ -0,0 +1,51 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.api.TimeSeriesEntry;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class TimeSeriesSingleEntryReplayDecoder implements MultiDecoder<TimeSeriesEntry<Object, Object>> {
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
if (paramNum == 0 || paramNum == 1) {
return LongCodec.INSTANCE.getValueDecoder();
}
return MultiDecoder.super.getDecoder(codec, paramNum, state);
}
@Override
public TimeSeriesEntry<Object, Object> decode(List<Object> parts, State state) {
Long n = (Long) parts.get(0);
Object label = null;
if (n == 3) {
label = parts.get(3);
}
return new TimeSeriesEntry<>((Long) parts.get(1), parts.get(2), label);
}
}

@ -29,14 +29,15 @@ import reactor.core.publisher.Flux;
*
* @author Nikita Koksharov
*
* @param <V> value
* @param <V> value type
* @param <L> label type
*/
public class RedissonTimeSeriesReactive<V> {
public class RedissonTimeSeriesReactive<V, L> {
private final RTimeSeries<V> instance;
private final RTimeSeries<V, L> instance;
private final RedissonReactiveClient redisson;
public RedissonTimeSeriesReactive(RTimeSeries<V> instance, RedissonReactiveClient redisson) {
public RedissonTimeSeriesReactive(RTimeSeries<V, L> instance, RedissonReactiveClient redisson) {
this.instance = instance;
this.redisson = redisson;
}

@ -28,14 +28,15 @@ import org.redisson.client.RedisClient;
*
* @author Nikita Koksharov
*
* @param <V> value
* @param <V> value type
* @param <L> label type
*/
public class RedissonTimeSeriesRx<V> {
public class RedissonTimeSeriesRx<V, L> {
private final RTimeSeries<V> instance;
private final RTimeSeries<V, L> instance;
private final RedissonRxClient redisson;
public RedissonTimeSeriesRx(RTimeSeries<V> instance, RedissonRxClient redisson) {
public RedissonTimeSeriesRx(RTimeSeries<V, L> instance, RedissonRxClient redisson) {
this.instance = instance;
this.redisson = redisson;
}

@ -1005,7 +1005,7 @@ public class RedissonTest extends BaseTest {
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(4, "40");
t.add(2, "20");
t.add(1, "10", 1, TimeUnit.SECONDS);

@ -18,12 +18,12 @@ public class RedissonTimeSeriesReactiveTest extends BaseReactiveTest {
@Test
public void testOrder() {
RTimeSeriesReactive<String> t = redisson.getTimeSeries("test");
RTimeSeriesReactive<String, Object> t = redisson.getTimeSeries("test");
sync(t.add(4, "40"));
sync(t.add(2, "20"));
sync(t.add(1, "10", 1, TimeUnit.SECONDS));
Collection<TimeSeriesEntry<String>> r11 = sync(t.entryRange(1, 5));
Collection<TimeSeriesEntry<String, Object>> r11 = sync(t.entryRange(1, 5));
assertThat(r11).containsExactly(new TimeSeriesEntry<>(1,"10"),
new TimeSeriesEntry<>(2, "20"),
new TimeSeriesEntry<>(4, "40"));

@ -21,7 +21,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testMultipleValues() {
RTimeSeries<String> ts = redisson.getTimeSeries("test");
RTimeSeries<String, Object> ts = redisson.getTimeSeries("test");
for (int i=0;i < 10000;i++){
ts.add(System.currentTimeMillis(), "my-value",60,TimeUnit.DAYS);
}
@ -30,7 +30,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testPutAll() {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
Map<Long, String> map = new HashMap<>();
map.put(1L, "1");
map.put(2L, "2");
@ -42,20 +42,20 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testOrder() {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(4, "40");
t.add(2, "20");
t.add(2, "20", "label2");
t.add(1, "10", 1, TimeUnit.SECONDS);
Collection<TimeSeriesEntry<String>> r11 = t.entryRange(1, 5);
Collection<TimeSeriesEntry<String, Object>> r11 = t.entryRange(1, 5);
assertThat(r11).containsExactly(new TimeSeriesEntry<>(1,"10"),
new TimeSeriesEntry<>(2, "20"),
new TimeSeriesEntry<>(2, "20", "label2"),
new TimeSeriesEntry<>(4, "40"));
}
@Test
public void testCleanup() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10", 1, TimeUnit.SECONDS);
Thread.sleep(5000);
@ -65,7 +65,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testIterator() {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
for (int i = 0; i < 19; i++) {
t.add(i, "" + i*10);
}
@ -80,7 +80,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testRangeReversed() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "20");
t.add(3, "30");
@ -89,7 +89,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
assertThat(t.rangeReversed(1, 4, 2)).containsExactly("40", "30");
assertThat(t.rangeReversed(1, 4, 0)).containsExactly("40", "30", "20", "10");
RTimeSeries<String> t2 = redisson.getTimeSeries("test2");
RTimeSeries<String, Object> t2 = redisson.getTimeSeries("test2");
t2.add(1, "10");
t2.add(2, "20");
t2.add(3, "30", 1, TimeUnit.SECONDS);
@ -102,7 +102,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testRange() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "10");
t.add(3, "30");
@ -111,7 +111,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
assertThat(t.range(1, 4, 2)).containsExactly("10", "10");
assertThat(t.range(1, 4, 0)).containsExactly("10", "10", "30", "40");
RTimeSeries<String> t2 = redisson.getTimeSeries("test2");
RTimeSeries<String, Object> t2 = redisson.getTimeSeries("test2");
t2.add(1, "10");
t2.add(2, "10", 1, TimeUnit.SECONDS);
t2.add(3, "30");
@ -124,7 +124,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testRemove() {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "10");
t.add(3, "30");
@ -140,15 +140,45 @@ public class RedissonTimeSeriesTest extends BaseTest {
assertThat(t.size()).isEqualTo(1);
}
@Test
public void testGetEntry() {
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "10");
t.add(3, "30");
t.add(4, "40");
assertThat(t.size()).isEqualTo(4);
assertThat(t.get(3)).isEqualTo("30");
assertThat(t.getEntry(3).getValue()).isEqualTo("30");
}
@Test
public void testLabel() {
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "20", "label2");
t.add(3, "30", "label3");
TimeSeriesEntry<String, Object> ee = t.getEntry(2);
assertThat(ee.getTimestamp()).isEqualTo(2);
assertThat(ee.getValue()).isEqualTo("20");
assertThat(ee.getLabel()).isEqualTo("label2");
}
@Test
public void test() {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "10");
t.add(3, "30");
t.add(4, "40");
assertThat(t.size()).isEqualTo(4);
assertThat(t.get(3)).isEqualTo("30");
TimeSeriesEntry<String, Object> ee = t.getEntry(2);
assertThat(ee.getTimestamp()).isEqualTo(2);
assertThat(ee.getValue()).isEqualTo("10");
assertThat(ee.getLabel()).isNull();
assertThat(t.first()).isEqualTo("10");
assertThat(t.first(2)).containsExactly("10", "10");
@ -158,16 +188,15 @@ public class RedissonTimeSeriesTest extends BaseTest {
Collection<String> r = t.range(1, 3);
assertThat(r).containsExactly("10", "10", "30");
Collection<TimeSeriesEntry<String>> r11 = t.entryRange(1, 3);
Collection<TimeSeriesEntry<String, Object>> r11 = t.entryRange(1, 3);
assertThat(r11).containsExactly(new TimeSeriesEntry<>(1,"10"),
new TimeSeriesEntry<>(2, "10"),
new TimeSeriesEntry<>(3, "30"));
Collection<TimeSeriesEntry<String>> r12 = t.entryRangeReversed(1, 3);
Collection<TimeSeriesEntry<String, Object>> r12 = t.entryRangeReversed(1, 3);
assertThat(r12).containsExactly(new TimeSeriesEntry<>(3, "30"),
new TimeSeriesEntry<>(2, "10"),
new TimeSeriesEntry<>(1,"10"));
Collection<String> r1 = t.range(1, 3);
assertThat(r1).containsExactly("10", "10", "30");
@ -180,7 +209,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testTTLLast() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "10");
t.add(3, "30");
@ -209,7 +238,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testTTLFirst() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10", 1, TimeUnit.SECONDS);
t.add(2, "10");
t.add(3, "30");
@ -238,7 +267,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testPoll() throws InterruptedException {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "20");
t.add(3, "30");
@ -251,7 +280,7 @@ public class RedissonTimeSeriesTest extends BaseTest {
@Test
public void testPollList() {
RTimeSeries<String> t = redisson.getTimeSeries("test");
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "20");
t.add(3, "30");

@ -18,12 +18,12 @@ public class RedissonTimeSeriesRxTest extends BaseRxTest {
@Test
public void testOrder() {
RTimeSeriesRx<String> t = redisson.getTimeSeries("test");
RTimeSeriesRx<String, Object> t = redisson.getTimeSeries("test");
sync(t.add(4, "40"));
sync(t.add(2, "20"));
sync(t.add(1, "10", 1, TimeUnit.SECONDS));
Collection<TimeSeriesEntry<String>> r11 = sync(t.entryRange(1, 5));
Collection<TimeSeriesEntry<String, Object>> r11 = sync(t.entryRange(1, 5));
assertThat(r11).containsExactly(new TimeSeriesEntry<>(1,"10"),
new TimeSeriesEntry<>(2, "20"),
new TimeSeriesEntry<>(4, "40"));

Loading…
Cancel
Save