Feature - labels support for RTimeSeries object. #4553

pull/4623/head
Nikita Koksharov 2 years ago
parent 8dbae74267
commit f053131438

@ -23,9 +23,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.TimeSeriesEntryReplayDecoder;
import org.redisson.client.protocol.decoder.TimeSeriesSingleEntryReplayDecoder;
import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.iterator.RedissonBaseIterator;
@ -286,7 +284,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
@Override
public RFuture<TimeSeriesEntry<V, L>> getEntryAsync(long timestamp) {
return commandExecutor.evalReadAsync(getRawName(), codec, ENTRY,
return commandExecutor.evalReadAsync(getRawName(), codec, EVAL_ENTRY,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[2]);" +
"if #values == 0 then " +
"return nil;" +
@ -329,6 +327,59 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
System.currentTimeMillis(), timestamp);
}
@Override
public V getAndRemove(long timestamp) {
return get(getAndRemoveAsync(timestamp));
}
@Override
public RFuture<V> getAndRemoveAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[2]);" +
"if #values == 0 then " +
"return nil;" +
"end;" +
"local expirationDate = redis.call('zscore', KEYS[2], values[1]); " +
"if expirationDate ~= false and tonumber(expirationDate) <= tonumber(ARGV[1]) then " +
"return nil;" +
"end;" +
"redis.call('zrem', KEYS[2], values[1]); " +
"redis.call('zrem', KEYS[1], values[1]); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[1]); " +
"return val;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), timestamp);
}
@Override
public TimeSeriesEntry<V, L> getAndRemoveEntry(long timestamp) {
return get(getAndRemoveEntryAsync(timestamp));
}
@Override
public RFuture<TimeSeriesEntry<V, L>> getAndRemoveEntryAsync(long timestamp) {
return commandExecutor.evalWriteAsync(getRawName(), codec, EVAL_ENTRY,
"local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[2]);" +
"if #values == 0 then " +
"return nil;" +
"end;" +
"local expirationDate = redis.call('zscore', KEYS[2], values[1]); " +
"if expirationDate ~= false and tonumber(expirationDate) <= tonumber(ARGV[1]) then " +
"return nil;" +
"end;" +
"redis.call('zrem', KEYS[2], values[1]); " +
"redis.call('zrem', KEYS[1], values[1]); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[1]); " +
"if n == 2 then " +
"return {n, ARGV[2], val};" +
"end;" +
"return {n, ARGV[2], val, label};",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), timestamp);
}
@Override
public V last() {
return get(lastAsync());
@ -339,9 +390,19 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
return listAsync(-1, 1, RedisCommands.EVAL_FIRST_LIST);
}
@Override
public TimeSeriesEntry<V, L> lastEntry() {
return get(lastEntryAsync());
}
@Override
public RFuture<TimeSeriesEntry<V, L>> lastEntryAsync() {
return listEntriesAsync(-1, 1, EVAL_FIRST_ENTRY);
}
@Override
public RFuture<Collection<V>> lastAsync(int count) {
return listAsync(-1, count, RedisCommands.EVAL_LIST);
return listAsync(-1, count, RedisCommands.EVAL_LIST_REVERSE);
}
@Override
@ -354,6 +415,16 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
return listAsync(0, 1, RedisCommands.EVAL_FIRST_LIST);
}
@Override
public TimeSeriesEntry<V, L> firstEntry() {
return get(firstEntryAsync());
}
@Override
public RFuture<TimeSeriesEntry<V, L>> firstEntryAsync() {
return listEntriesAsync(0, 1, EVAL_FIRST_ENTRY);
}
@Override
public RFuture<Collection<V>> firstAsync(int count) {
return listAsync(0, count, RedisCommands.EVAL_LIST);
@ -364,9 +435,29 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
return get(listAsync(0, count, RedisCommands.EVAL_LIST));
}
@Override
public Collection<TimeSeriesEntry<V, L>> firstEntries(int count) {
return get(firstEntriesAsync(count));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V, L>>> firstEntriesAsync(int count) {
return listEntriesAsync(0, count, EVAL_ENTRIES);
}
@Override
public Collection<V> last(int count) {
return get(listAsync(-1, count, RedisCommands.EVAL_LIST_REVERSE));
return get(lastAsync(count));
}
@Override
public Collection<TimeSeriesEntry<V, L>> lastEntries(int count) {
return get(lastEntriesAsync(count));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V, L>>> lastEntriesAsync(int count) {
return listEntriesAsync(-2, count, EVAL_ENTRIES_REVERSE);
}
@Override
@ -427,6 +518,33 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
System.currentTimeMillis(), startScore, limit);
}
private <T> RFuture<T> listEntriesAsync(int startScore, int limit, RedisCommand<?> evalCommandType) {
return commandExecutor.evalReadAsync(getRawName(), codec, evalCommandType,
"local values;" +
"if ARGV[2] == '0' then " +
"values = redis.call('zrangebyscore', KEYS[2], ARGV[1], '+inf', 'withscores', 'limit', 0, ARGV[3]);" +
"else " +
"values = redis.call('zrevrangebyscore', KEYS[2], '+inf', ARGV[1], 'withscores', 'limit', 0, ARGV[3]);" +
"end; " +
"local result = {}; " +
"for i=1, #values, 2 do " +
"local score = redis.call('zscore', KEYS[1], values[i]); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[i]); " +
"table.insert(result, val);" +
"if n == 2 then " +
"label = 0; " +
"end; " +
"table.insert(result, label);" +
"table.insert(result, n);" +
"table.insert(result, score);" +
"end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), startScore, limit);
}
@Override
public int removeRange(long startTimestamp, long endTimestamp) {
return get(removeRangeAsync(startTimestamp, endTimestamp));
@ -475,10 +593,15 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
return entryRangeAsync(true, startTimestamp, endTimestamp, 0);
}
private static final RedisCommand<List<TimeSeriesEntry<Object, Object>>> ENTRIES =
private static final RedisCommand<Object> EVAL_FIRST_ENTRY = new RedisCommand<>("EVAL", new TimeSeriesFirstEntryReplayDecoder() {});
private static final RedisCommand<List<TimeSeriesEntry<Object, Object>>> EVAL_ENTRIES =
new RedisCommand<>("EVAL", new TimeSeriesEntryReplayDecoder());
private static final RedisCommand<TimeSeriesEntry<Object, Object>> ENTRY =
private static final RedisCommand<List<TimeSeriesEntry<Object, Object>>> EVAL_ENTRIES_REVERSE =
new RedisCommand<>("EVAL", new TimeSeriesEntryReplayDecoder(true));
private static final RedisCommand<TimeSeriesEntry<Object, Object>> EVAL_ENTRY =
new RedisCommand<>("EVAL", new TimeSeriesSingleEntryReplayDecoder());
@Override
@ -487,7 +610,7 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
}
private RFuture<Collection<TimeSeriesEntry<V, L>>> entryRangeAsync(boolean reverse, long startTimestamp, long endTimestamp, int limit) {
return commandExecutor.evalReadAsync(getRawName(), codec, ENTRIES,
return commandExecutor.evalReadAsync(getRawName(), codec, EVAL_ENTRIES,
"local result = {}; " +
"local from = ARGV[2]; " +
"local to = ARGV[3]; " +
@ -649,6 +772,33 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
return pollAsync(-1, count, RedisCommands.EVAL_LIST_REVERSE);
}
@Override
public Collection<TimeSeriesEntry<V, L>> pollFirstEntries(int count) {
return get(pollFirstEntriesAsync(count));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V, L>>> pollFirstEntriesAsync(int count) {
if (count <= 0) {
return new CompletableFutureWrapper<>(Collections.emptyList());
}
return pollEntriesAsync(0, count, EVAL_ENTRIES);
}
@Override
public Collection<TimeSeriesEntry<V, L>> pollLastEntries(int count) {
return get(pollLastEntriesAsync(count));
}
@Override
public RFuture<Collection<TimeSeriesEntry<V, L>>> pollLastEntriesAsync(int count) {
if (count <= 0) {
return new CompletableFutureWrapper<>(Collections.emptyList());
}
return pollEntriesAsync(-1, count, EVAL_ENTRIES_REVERSE);
}
@Override
public V pollFirst() {
return get(pollFirstAsync());
@ -669,6 +819,26 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
return pollAsync(-1, 1, RedisCommands.EVAL_FIRST_LIST);
}
@Override
public TimeSeriesEntry<V, L> pollFirstEntry() {
return get(pollFirstEntryAsync());
}
@Override
public RFuture<TimeSeriesEntry<V, L>> pollFirstEntryAsync() {
return pollEntriesAsync(0, 1, EVAL_FIRST_ENTRY);
}
@Override
public TimeSeriesEntry<V, L> pollLastEntry() {
return get(pollLastEntryAsync());
}
@Override
public RFuture<TimeSeriesEntry<V, L>> pollLastEntryAsync() {
return pollEntriesAsync(-1, 1, EVAL_FIRST_ENTRY);
}
private <T> RFuture<T> pollAsync(int startScore, int limit, RedisCommand<?> command) {
return commandExecutor.evalWriteAsync(getRawName(), codec, command,
"local values;" +
@ -690,6 +860,35 @@ public class RedissonTimeSeries<V, L> extends RedissonExpirable implements RTime
System.currentTimeMillis(), startScore, limit);
}
private <T> RFuture<T> pollEntriesAsync(int startScore, int limit, RedisCommand<?> command) {
return commandExecutor.evalWriteAsync(getRawName(), codec, command,
"local values;" +
"if ARGV[2] == '0' then " +
"values = redis.call('zrangebyscore', KEYS[2], ARGV[1], '+inf', 'withscores', 'limit', 0, ARGV[3]);" +
"else " +
"values = redis.call('zrevrangebyscore', KEYS[2], '+inf', ARGV[1], 'withscores', 'limit', 0, ARGV[3]);" +
"end; " +
"local result = {}; " +
"for i=1, #values, 2 do " +
"local score = redis.call('zscore', KEYS[1], values[i]); " +
"redis.call('zrem', KEYS[2], values[i]); " +
"redis.call('zrem', KEYS[1], values[i]); " +
"local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[i]); " +
"table.insert(result, val);" +
"if n == 2 then " +
"label = 0; " +
"end; " +
"table.insert(result, label);" +
"table.insert(result, n);" +
"table.insert(result, score);" +
"end;" +
"return result;",
Arrays.asList(getRawName(), getTimeoutSetName()),
System.currentTimeMillis(), startScore, limit);
}
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, int count) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(name, client, startPos, count);
return get(f);

@ -139,14 +139,37 @@ public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesA
boolean remove(long timestamp);
/**
* Removes and returns the head elements or {@code null} if this time-series collection is empty.
* Removes and returns object by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @return object or <code>null</code> if it doesn't exist
*/
V getAndRemove(long timestamp);
/**
* Removes and returns entry by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @return entry or <code>null</code> if it doesn't exist
*/
TimeSeriesEntry<V, L> getAndRemoveEntry(long timestamp);
/**
* Removes and returns the head elements
*
* @param count - elements amount
* @return the head element,
* or {@code null} if this time-series collection is empty
* @return collection of head elements
*/
Collection<V> pollFirst(int count);
/**
* Removes and returns head entries
*
* @param count - entries amount
* @return collection of head entries
*/
Collection<TimeSeriesEntry<V, L>> pollFirstEntries(int count);
/**
* Removes and returns the tail elements or {@code null} if this time-series collection is empty.
*
@ -155,6 +178,14 @@ public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesA
*/
Collection<V> pollLast(int count);
/**
* Removes and returns tail entries
*
* @param count - entries amount
* @return collection of tail entries
*/
Collection<TimeSeriesEntry<V, L>> pollLastEntries(int count);
/**
* Removes and returns the head element or {@code null} if this time-series collection is empty.
*
@ -163,6 +194,14 @@ public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesA
*/
V pollFirst();
/**
* Removes and returns head entry or {@code null} if this time-series collection is empty.
*
* @return the head entry,
* or {@code null} if this time-series collection is empty
*/
TimeSeriesEntry<V, L> pollFirstEntry();
/**
* Removes and returns the tail element or {@code null} if this time-series collection is empty.
*
@ -170,6 +209,13 @@ public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesA
*/
V pollLast();
/**
* Removes and returns the tail entry or {@code null} if this time-series collection is empty.
*
* @return the tail entry or {@code null} if this time-series collection is empty
*/
TimeSeriesEntry<V, L> pollLastEntry();
/**
* Returns the tail element or {@code null} if this time-series collection is empty.
*
@ -177,6 +223,13 @@ public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesA
*/
V last();
/**
* Returns the tail entry or {@code null} if this time-series collection is empty.
*
* @return the tail entry or {@code null} if this time-series collection is empty
*/
TimeSeriesEntry<V, L> lastEntry();
/**
* Returns the head element or {@code null} if this time-series collection is empty.
*
@ -184,6 +237,13 @@ public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesA
*/
V first();
/**
* Returns the head entry or {@code null} if this time-series collection is empty.
*
* @return the head entry or {@code null} if this time-series collection is empty
*/
TimeSeriesEntry<V, L> firstEntry();
/**
* Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty.
*
@ -206,6 +266,14 @@ public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesA
*/
Collection<V> last(int count);
/**
* Returns the tail entries of this time-series collection.
*
* @param count - entries amount
* @return the tail entries
*/
Collection<TimeSeriesEntry<V, L>> lastEntries(int count);
/**
* Returns the head elements of this time-series collection.
*
@ -214,6 +282,14 @@ public interface RTimeSeries<V, L> extends RExpirable, Iterable<V>, RTimeSeriesA
*/
Collection<V> first(int count);
/**
* Returns the head entries of this time-series collection.
*
* @param count - entries amount
* @return the head entries
*/
Collection<TimeSeriesEntry<V, L>> firstEntries(int count);
/**
* Removes values within timestamp range. Including boundary values.
*

@ -145,14 +145,37 @@ public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
RFuture<Boolean> removeAsync(long timestamp);
/**
* Removes and returns the head elements or {@code null} if this time-series collection is empty.
* Removes and returns object by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @return object or <code>null</code> if it doesn't exist
*/
RFuture<V> getAndRemoveAsync(long timestamp);
/**
* Removes and returns entry by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @return entry or <code>null</code> if it doesn't exist
*/
RFuture<TimeSeriesEntry<V, L>> getAndRemoveEntryAsync(long timestamp);
/**
* Removes and returns the head elements
*
* @param count - elements amount
* @return the head element,
* or {@code null} if this time-series collection is empty
* @return collection of head elements
*/
RFuture<Collection<V>> pollFirstAsync(int count);
/**
* Removes and returns head entries
*
* @param count - entries amount
* @return collection of head entries
*/
RFuture<Collection<TimeSeriesEntry<V, L>>> pollFirstEntriesAsync(int count);
/**
* Removes and returns the tail elements or {@code null} if this time-series collection is empty.
*
@ -161,6 +184,14 @@ public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
*/
RFuture<Collection<V>> pollLastAsync(int count);
/**
* Removes and returns tail entries
*
* @param count - entries amount
* @return collection of tail entries
*/
RFuture<Collection<TimeSeriesEntry<V, L>>> pollLastEntriesAsync(int count);
/**
* Removes and returns the head element or {@code null} if this time-series collection is empty.
*
@ -169,6 +200,14 @@ public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
*/
RFuture<V> pollFirstAsync();
/**
* Removes and returns head entry or {@code null} if this time-series collection is empty.
*
* @return the head entry,
* or {@code null} if this time-series collection is empty
*/
RFuture<TimeSeriesEntry<V, L>> pollFirstEntryAsync();
/**
* Removes and returns the tail element or {@code null} if this time-series collection is empty.
*
@ -176,6 +215,13 @@ public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
*/
RFuture<V> pollLastAsync();
/**
* Removes and returns the tail entry or {@code null} if this time-series collection is empty.
*
* @return the tail entry or {@code null} if this time-series collection is empty
*/
RFuture<TimeSeriesEntry<V, L>> pollLastEntryAsync();
/**
* Returns the tail element or {@code null} if this time-series collection is empty.
*
@ -183,6 +229,13 @@ public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
*/
RFuture<V> lastAsync();
/**
* Returns the tail entry or {@code null} if this time-series collection is empty.
*
* @return the tail entry or {@code null} if this time-series collection is empty
*/
RFuture<TimeSeriesEntry<V, L>> lastEntryAsync();
/**
* Returns the head element or {@code null} if this time-series collection is empty.
*
@ -190,6 +243,13 @@ public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
*/
RFuture<V> firstAsync();
/**
* Returns the head entry or {@code null} if this time-series collection is empty.
*
* @return the head entry or {@code null} if this time-series collection is empty
*/
RFuture<TimeSeriesEntry<V, L>> firstEntryAsync();
/**
* Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty.
*
@ -212,6 +272,14 @@ public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
*/
RFuture<Collection<V>> lastAsync(int count);
/**
* Returns the tail entries of this time-series collection.
*
* @param count - entries amount
* @return the tail entries
*/
RFuture<Collection<TimeSeriesEntry<V, L>>> lastEntriesAsync(int count);
/**
* Returns the head elements of this time-series collection.
*
@ -220,6 +288,14 @@ public interface RTimeSeriesAsync<V, L> extends RExpirableAsync {
*/
RFuture<Collection<V>> firstAsync(int count);
/**
* Returns the head entries of this time-series collection.
*
* @param count - entries amount
* @return the head entries
*/
RFuture<Collection<TimeSeriesEntry<V, L>>> firstEntriesAsync(int count);
/**
* Removes values within timestamp range. Including boundary values.
*

@ -154,6 +154,22 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<Boolean> remove(long timestamp);
/**
* Removes and returns object by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @return object or <code>null</code> if it doesn't exist
*/
Mono<V> getAndRemove(long timestamp);
/**
* Removes and returns entry by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @return entry or <code>null</code> if it doesn't exist
*/
Mono<TimeSeriesEntry<V, L>> getAndRemoveEntry(long timestamp);
/**
* Removes and returns the head elements or {@code null} if this time-series collection is empty.
*
@ -163,6 +179,14 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<Collection<V>> pollFirst(int count);
/**
* Removes and returns head entries
*
* @param count - entries amount
* @return collection of head entries
*/
Mono<Collection<TimeSeriesEntry<V, L>>> pollFirstEntries(int count);
/**
* Removes and returns the tail elements or {@code null} if this time-series collection is empty.
*
@ -171,6 +195,14 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<Collection<V>> pollLast(int count);
/**
* Removes and returns tail entries
*
* @param count - entries amount
* @return collection of tail entries
*/
Mono<Collection<TimeSeriesEntry<V, L>>> pollLastEntries(int count);
/**
* Removes and returns the head element or {@code null} if this time-series collection is empty.
*
@ -179,6 +211,14 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<V> pollFirst();
/**
* Removes and returns head entry or {@code null} if this time-series collection is empty.
*
* @return the head entry,
* or {@code null} if this time-series collection is empty
*/
Mono<TimeSeriesEntry<V, L>> pollFirstEntry();
/**
* Removes and returns the tail element or {@code null} if this time-series collection is empty.
*
@ -186,6 +226,13 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<V> pollLast();
/**
* Removes and returns the tail entry or {@code null} if this time-series collection is empty.
*
* @return the tail entry or {@code null} if this time-series collection is empty
*/
Mono<TimeSeriesEntry<V, L>> pollLastEntry();
/**
* Returns the tail element or {@code null} if this time-series collection is empty.
*
@ -193,6 +240,13 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<V> last();
/**
* Returns the tail entry or {@code null} if this time-series collection is empty.
*
* @return the tail entry or {@code null} if this time-series collection is empty
*/
Mono<TimeSeriesEntry<V, L>> lastEntry();
/**
* Returns the head element or {@code null} if this time-series collection is empty.
*
@ -200,6 +254,13 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<V> first();
/**
* Returns the head entry or {@code null} if this time-series collection is empty.
*
* @return the head entry or {@code null} if this time-series collection is empty
*/
Mono<TimeSeriesEntry<V, L>> firstEntry();
/**
* Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty.
*
@ -222,6 +283,14 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<Collection<V>> last(int count);
/**
* Returns the tail entries of this time-series collection.
*
* @param count - entries amount
* @return the tail entries
*/
Mono<Collection<TimeSeriesEntry<V, L>>> lastEntries(int count);
/**
* Returns the head elements of this time-series collection.
*
@ -230,6 +299,14 @@ public interface RTimeSeriesReactive<V, L> extends RExpirableReactive {
*/
Mono<Collection<V>> first(int count);
/**
* Returns the head entries of this time-series collection.
*
* @param count - entries amount
* @return the head entries
*/
Mono<Collection<TimeSeriesEntry<V, L>>> firstEntries(int count);
/**
* Removes values within timestamp range. Including boundary values.
*

@ -152,6 +152,22 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Single<Boolean> remove(long timestamp);
/**
* Removes and returns object by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @return object or <code>null</code> if it doesn't exist
*/
Maybe<V> getAndRemove(long timestamp);
/**
* Removes and returns entry by specified <code>timestamp</code>.
*
* @param timestamp - object timestamp
* @return entry or <code>null</code> if it doesn't exist
*/
Maybe<TimeSeriesEntry<V, L>> getAndRemoveEntry(long timestamp);
/**
* Removes and returns the head elements or {@code null} if this time-series collection is empty.
*
@ -161,6 +177,14 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Single<Collection<V>> pollFirst(int count);
/**
* Removes and returns head entries
*
* @param count - entries amount
* @return collection of head entries
*/
Single<Collection<TimeSeriesEntry<V, L>>> pollFirstEntries(int count);
/**
* Removes and returns the tail elements or {@code null} if this time-series collection is empty.
*
@ -169,6 +193,14 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Single<Collection<V>> pollLast(int count);
/**
* Removes and returns tail entries
*
* @param count - entries amount
* @return collection of tail entries
*/
Single<Collection<TimeSeriesEntry<V, L>>> pollLastEntries(int count);
/**
* Removes and returns the head element or {@code null} if this time-series collection is empty.
*
@ -177,6 +209,14 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Maybe<V> pollFirst();
/**
* Removes and returns head entry or {@code null} if this time-series collection is empty.
*
* @return the head entry,
* or {@code null} if this time-series collection is empty
*/
Maybe<TimeSeriesEntry<V, L>> pollFirstEntry();
/**
* Removes and returns the tail element or {@code null} if this time-series collection is empty.
*
@ -184,6 +224,13 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Maybe<V> pollLast();
/**
* Removes and returns the tail entry or {@code null} if this time-series collection is empty.
*
* @return the tail entry or {@code null} if this time-series collection is empty
*/
Maybe<TimeSeriesEntry<V, L>> pollLastEntry();
/**
* Returns the tail element or {@code null} if this time-series collection is empty.
*
@ -191,6 +238,13 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Maybe<V> last();
/**
* Returns the tail entry or {@code null} if this time-series collection is empty.
*
* @return the tail entry or {@code null} if this time-series collection is empty
*/
Maybe<TimeSeriesEntry<V, L>> lastEntry();
/**
* Returns the head element or {@code null} if this time-series collection is empty.
*
@ -198,6 +252,13 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Maybe<V> first();
/**
* Returns the head entry or {@code null} if this time-series collection is empty.
*
* @return the head entry or {@code null} if this time-series collection is empty
*/
Maybe<TimeSeriesEntry<V, L>> firstEntry();
/**
* Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty.
*
@ -220,6 +281,14 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Single<Collection<V>> last(int count);
/**
* Returns the tail entries of this time-series collection.
*
* @param count - entries amount
* @return the tail entries
*/
Single<Collection<TimeSeriesEntry<V, L>>> lastEntries(int count);
/**
* Returns the head elements of this time-series collection.
*
@ -228,6 +297,14 @@ public interface RTimeSeriesRx<V, L> extends RExpirableRx {
*/
Single<Collection<V>> first(int count);
/**
* Returns the head entries of this time-series collection.
*
* @param count - entries amount
* @return the head entries
*/
Single<Collection<TimeSeriesEntry<V, L>>> firstEntries(int count);
/**
* Removes values within timestamp range. Including boundary values.
*

@ -22,6 +22,7 @@ import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
@ -31,6 +32,16 @@ import java.util.List;
*/
public class TimeSeriesEntryReplayDecoder implements MultiDecoder<List<TimeSeriesEntry<Object, Object>>> {
private boolean reverse;
public TimeSeriesEntryReplayDecoder() {
this(false);
}
public TimeSeriesEntryReplayDecoder(boolean reverse) {
this.reverse = reverse;
}
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
if (paramNum % 4 == 2 || paramNum % 4 == 3) {
@ -50,6 +61,9 @@ public class TimeSeriesEntryReplayDecoder implements MultiDecoder<List<TimeSerie
}
result.add(new TimeSeriesEntry<>((Long) parts.get(i + 3), parts.get(i), label));
}
if (reverse) {
Collections.reverse(result);
}
return result;
}

@ -0,0 +1,48 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.api.TimeSeriesEntry;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class TimeSeriesFirstEntryReplayDecoder implements MultiDecoder<Object> {
private final TimeSeriesEntryReplayDecoder decoder = new TimeSeriesEntryReplayDecoder();
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
return decoder.getDecoder(codec, paramNum, state);
}
@Override
public Object decode(List<Object> parts, State state) {
List<TimeSeriesEntry<Object, Object>> list = decoder.decode(parts, state);
if (!list.isEmpty()) {
return list.get(0);
}
return null;
}
}

@ -4,6 +4,7 @@ import org.junit.jupiter.api.Test;
import org.redisson.api.RTimeSeries;
import org.redisson.api.TimeSeriesEntry;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@ -11,6 +12,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS.s;
/**
*
@ -166,6 +168,53 @@ public class RedissonTimeSeriesTest extends BaseTest {
assertThat(ee.getLabel()).isEqualTo("label2");
}
@Test
public void testGetAndRemoveEntry() {
RTimeSeries<String, String> t = redisson.getTimeSeries("test");
t.add(1, "10", "100");
t.add(2, "20");
t.add(3, "30", "300", Duration.ofSeconds(2));
t.add(4, "40");
TimeSeriesEntry<String, String> e1 = t.getAndRemoveEntry(1);
assertThat(e1.getValue()).isEqualTo("10");
assertThat(e1.getTimestamp()).isEqualTo(1);
assertThat(e1.getLabel()).isEqualTo("100");
TimeSeriesEntry<String, String> e2 = t.getAndRemoveEntry(2);
assertThat(e2.getValue()).isEqualTo("20");
assertThat(e2.getTimestamp()).isEqualTo(2);
assertThat(e2.getLabel()).isNull();
TimeSeriesEntry<String, String> e3 = t.getAndRemoveEntry(3);
assertThat(e3.getValue()).isEqualTo("30");
assertThat(e3.getTimestamp()).isEqualTo(3);
assertThat(e3.getLabel()).isEqualTo("300");
TimeSeriesEntry<String, String> e4 = t.getAndRemoveEntry(4);
assertThat(e4.getValue()).isEqualTo("40");
assertThat(e4.getTimestamp()).isEqualTo(4);
assertThat(e4.getLabel()).isNull();
}
@Test
public void testGetAndRemove() {
RTimeSeries<String, String> t = redisson.getTimeSeries("test");
t.add(1, "10", "100");
t.add(2, "20");
t.add(3, "30", "300", Duration.ofSeconds(2));
t.add(4, "40");
String s1 = t.getAndRemove(1);
assertThat(s1).isEqualTo("10");
String s2 = t.getAndRemove(2);
assertThat(s2).isEqualTo("20");
String s3 = t.getAndRemove(3);
assertThat(s3).isEqualTo("30");
assertThat(t.size()).isEqualTo(1);
}
@Test
public void test() {
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
@ -265,6 +314,34 @@ public class RedissonTimeSeriesTest extends BaseTest {
assertThat(r2).containsExactly("30", "10");
}
@Test
public void testPollLastEntries() {
RTimeSeries<String, String> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "20", "200");
t.add(3, "30");
Collection<TimeSeriesEntry<String, String>> s = t.pollLastEntries(2);
assertThat(s).containsExactly(new TimeSeriesEntry<>(2, "20", "200"),
new TimeSeriesEntry<>(3, "30"));
assertThat(t.size()).isEqualTo(1);
}
@Test
public void testPollFirstEntries() {
RTimeSeries<String, String> t = redisson.getTimeSeries("test");
t.add(1, "10", "100");
t.add(2, "20");
t.add(3, "30");
Collection<TimeSeriesEntry<String, String>> s = t.pollFirstEntries(2);
assertThat(s).containsExactly(new TimeSeriesEntry<>(1, "10", "100"),
new TimeSeriesEntry<>(2, "20"));
assertThat(t.size()).isEqualTo(1);
}
@Test
public void testPoll() throws InterruptedException {
RTimeSeries<String, Object> t = redisson.getTimeSeries("test");
@ -294,4 +371,64 @@ public class RedissonTimeSeriesTest extends BaseTest {
assertThat(t.size()).isEqualTo(2);
}
@Test
public void testPollFirstEntry() {
RTimeSeries<String, String> t = redisson.getTimeSeries("test");
t.add(1, "10", "100");
t.add(2, "20");
t.add(3, "30");
TimeSeriesEntry<String, String> e = t.pollFirstEntry();
assertThat(e).isEqualTo(new TimeSeriesEntry<>(1, "10", "100"));
assertThat(t.size()).isEqualTo(2);
TimeSeriesEntry<String, String> ee = t.firstEntry();
assertThat(ee).isEqualTo(new TimeSeriesEntry<>(2, "20"));
}
@Test
public void testPollLastEntry() {
RTimeSeries<String, String> t = redisson.getTimeSeries("test");
t.add(1, "10", "100");
t.add(2, "20");
t.add(3, "30");
TimeSeriesEntry<String, String> e = t.pollLastEntry();
assertThat(e).isEqualTo(new TimeSeriesEntry<>(3, "30"));
assertThat(t.size()).isEqualTo(2);
TimeSeriesEntry<String, String> ee = t.lastEntry();
assertThat(ee).isEqualTo(new TimeSeriesEntry<>(2, "20"));
}
@Test
public void testLastEntries() {
RTimeSeries<String, String> t = redisson.getTimeSeries("test");
t.add(1, "10");
t.add(2, "20", "200");
t.add(3, "30");
Collection<TimeSeriesEntry<String, String>> s = t.lastEntries(2);
assertThat(s).containsExactly(new TimeSeriesEntry<>(2, "20", "200"),
new TimeSeriesEntry<>(3, "30"));
assertThat(t.size()).isEqualTo(3);
}
@Test
public void testFirstEntries() {
RTimeSeries<String, String> t = redisson.getTimeSeries("test");
t.add(1, "10", "100");
t.add(2, "20");
t.add(3, "30");
Collection<TimeSeriesEntry<String, String>> s = t.firstEntries(2);
assertThat(s).containsExactly(new TimeSeriesEntry<>(1, "10", "100"),
new TimeSeriesEntry<>(2, "20"));
assertThat(t.size()).isEqualTo(3);
}
}

Loading…
Cancel
Save