From f053131438d5fb3f0071dd8cda80c160c1c88c5f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 21 Oct 2022 12:55:58 +0300 Subject: [PATCH] Feature - labels support for RTimeSeries object. #4553 --- .../java/org/redisson/RedissonTimeSeries.java | 217 +++++++++++++++++- .../java/org/redisson/api/RTimeSeries.java | 82 ++++++- .../org/redisson/api/RTimeSeriesAsync.java | 82 ++++++- .../org/redisson/api/RTimeSeriesReactive.java | 77 +++++++ .../java/org/redisson/api/RTimeSeriesRx.java | 77 +++++++ .../decoder/TimeSeriesEntryReplayDecoder.java | 14 ++ .../TimeSeriesFirstEntryReplayDecoder.java | 48 ++++ .../org/redisson/RedissonTimeSeriesTest.java | 137 +++++++++++ 8 files changed, 719 insertions(+), 15 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/client/protocol/decoder/TimeSeriesFirstEntryReplayDecoder.java diff --git a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java index c29e843a9..3acff7522 100644 --- a/redisson/src/main/java/org/redisson/RedissonTimeSeries.java +++ b/redisson/src/main/java/org/redisson/RedissonTimeSeries.java @@ -23,9 +23,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.client.protocol.decoder.TimeSeriesEntryReplayDecoder; -import org.redisson.client.protocol.decoder.TimeSeriesSingleEntryReplayDecoder; +import org.redisson.client.protocol.decoder.*; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.iterator.RedissonBaseIterator; @@ -286,7 +284,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime @Override public RFuture> getEntryAsync(long timestamp) { - return commandExecutor.evalReadAsync(getRawName(), codec, ENTRY, + return commandExecutor.evalReadAsync(getRawName(), codec, EVAL_ENTRY, "local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[2]);" + "if #values == 0 then " + "return nil;" + @@ -329,6 +327,59 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime System.currentTimeMillis(), timestamp); } + @Override + public V getAndRemove(long timestamp) { + return get(getAndRemoveAsync(timestamp)); + } + + @Override + public RFuture getAndRemoveAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_OBJECT, + "local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[2]);" + + "if #values == 0 then " + + "return nil;" + + "end;" + + + "local expirationDate = redis.call('zscore', KEYS[2], values[1]); " + + "if expirationDate ~= false and tonumber(expirationDate) <= tonumber(ARGV[1]) then " + + "return nil;" + + "end;" + + "redis.call('zrem', KEYS[2], values[1]); " + + "redis.call('zrem', KEYS[1], values[1]); " + + "local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[1]); " + + "return val;", + Arrays.asList(getRawName(), getTimeoutSetName()), + System.currentTimeMillis(), timestamp); + } + + @Override + public TimeSeriesEntry getAndRemoveEntry(long timestamp) { + return get(getAndRemoveEntryAsync(timestamp)); + } + + @Override + public RFuture> getAndRemoveEntryAsync(long timestamp) { + return commandExecutor.evalWriteAsync(getRawName(), codec, EVAL_ENTRY, + "local values = redis.call('zrangebyscore', KEYS[1], ARGV[2], ARGV[2]);" + + "if #values == 0 then " + + "return nil;" + + "end;" + + + "local expirationDate = redis.call('zscore', KEYS[2], values[1]); " + + "if expirationDate ~= false and tonumber(expirationDate) <= tonumber(ARGV[1]) then " + + "return nil;" + + "end;" + + "redis.call('zrem', KEYS[2], values[1]); " + + "redis.call('zrem', KEYS[1], values[1]); " + + "local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[1]); " + + "if n == 2 then " + + "return {n, ARGV[2], val};" + + "end;" + + "return {n, ARGV[2], val, label};", + Arrays.asList(getRawName(), getTimeoutSetName()), + System.currentTimeMillis(), timestamp); + } + @Override public V last() { return get(lastAsync()); @@ -339,9 +390,19 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime return listAsync(-1, 1, RedisCommands.EVAL_FIRST_LIST); } + @Override + public TimeSeriesEntry lastEntry() { + return get(lastEntryAsync()); + } + + @Override + public RFuture> lastEntryAsync() { + return listEntriesAsync(-1, 1, EVAL_FIRST_ENTRY); + } + @Override public RFuture> lastAsync(int count) { - return listAsync(-1, count, RedisCommands.EVAL_LIST); + return listAsync(-1, count, RedisCommands.EVAL_LIST_REVERSE); } @Override @@ -354,6 +415,16 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime return listAsync(0, 1, RedisCommands.EVAL_FIRST_LIST); } + @Override + public TimeSeriesEntry firstEntry() { + return get(firstEntryAsync()); + } + + @Override + public RFuture> firstEntryAsync() { + return listEntriesAsync(0, 1, EVAL_FIRST_ENTRY); + } + @Override public RFuture> firstAsync(int count) { return listAsync(0, count, RedisCommands.EVAL_LIST); @@ -364,9 +435,29 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime return get(listAsync(0, count, RedisCommands.EVAL_LIST)); } + @Override + public Collection> firstEntries(int count) { + return get(firstEntriesAsync(count)); + } + + @Override + public RFuture>> firstEntriesAsync(int count) { + return listEntriesAsync(0, count, EVAL_ENTRIES); + } + @Override public Collection last(int count) { - return get(listAsync(-1, count, RedisCommands.EVAL_LIST_REVERSE)); + return get(lastAsync(count)); + } + + @Override + public Collection> lastEntries(int count) { + return get(lastEntriesAsync(count)); + } + + @Override + public RFuture>> lastEntriesAsync(int count) { + return listEntriesAsync(-2, count, EVAL_ENTRIES_REVERSE); } @Override @@ -427,6 +518,33 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime System.currentTimeMillis(), startScore, limit); } + private RFuture listEntriesAsync(int startScore, int limit, RedisCommand evalCommandType) { + return commandExecutor.evalReadAsync(getRawName(), codec, evalCommandType, + "local values;" + + "if ARGV[2] == '0' then " + + "values = redis.call('zrangebyscore', KEYS[2], ARGV[1], '+inf', 'withscores', 'limit', 0, ARGV[3]);" + + "else " + + "values = redis.call('zrevrangebyscore', KEYS[2], '+inf', ARGV[1], 'withscores', 'limit', 0, ARGV[3]);" + + "end; " + + + "local result = {}; " + + "for i=1, #values, 2 do " + + "local score = redis.call('zscore', KEYS[1], values[i]); " + + "local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[i]); " + + "table.insert(result, val);" + + "if n == 2 then " + + "label = 0; " + + "end; " + + "table.insert(result, label);" + + "table.insert(result, n);" + + "table.insert(result, score);" + + "end;" + + "return result;", + Arrays.asList(getRawName(), getTimeoutSetName()), + System.currentTimeMillis(), startScore, limit); + } + + @Override public int removeRange(long startTimestamp, long endTimestamp) { return get(removeRangeAsync(startTimestamp, endTimestamp)); @@ -475,10 +593,15 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime return entryRangeAsync(true, startTimestamp, endTimestamp, 0); } - private static final RedisCommand>> ENTRIES = + private static final RedisCommand EVAL_FIRST_ENTRY = new RedisCommand<>("EVAL", new TimeSeriesFirstEntryReplayDecoder() {}); + + private static final RedisCommand>> EVAL_ENTRIES = new RedisCommand<>("EVAL", new TimeSeriesEntryReplayDecoder()); - private static final RedisCommand> ENTRY = + private static final RedisCommand>> EVAL_ENTRIES_REVERSE = + new RedisCommand<>("EVAL", new TimeSeriesEntryReplayDecoder(true)); + + private static final RedisCommand> EVAL_ENTRY = new RedisCommand<>("EVAL", new TimeSeriesSingleEntryReplayDecoder()); @Override @@ -487,7 +610,7 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime } private RFuture>> entryRangeAsync(boolean reverse, long startTimestamp, long endTimestamp, int limit) { - return commandExecutor.evalReadAsync(getRawName(), codec, ENTRIES, + return commandExecutor.evalReadAsync(getRawName(), codec, EVAL_ENTRIES, "local result = {}; " + "local from = ARGV[2]; " + "local to = ARGV[3]; " + @@ -649,6 +772,33 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime return pollAsync(-1, count, RedisCommands.EVAL_LIST_REVERSE); } + @Override + public Collection> pollFirstEntries(int count) { + return get(pollFirstEntriesAsync(count)); + } + + @Override + public RFuture>> pollFirstEntriesAsync(int count) { + if (count <= 0) { + return new CompletableFutureWrapper<>(Collections.emptyList()); + } + + return pollEntriesAsync(0, count, EVAL_ENTRIES); + } + + @Override + public Collection> pollLastEntries(int count) { + return get(pollLastEntriesAsync(count)); + } + + @Override + public RFuture>> pollLastEntriesAsync(int count) { + if (count <= 0) { + return new CompletableFutureWrapper<>(Collections.emptyList()); + } + return pollEntriesAsync(-1, count, EVAL_ENTRIES_REVERSE); + } + @Override public V pollFirst() { return get(pollFirstAsync()); @@ -669,6 +819,26 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime return pollAsync(-1, 1, RedisCommands.EVAL_FIRST_LIST); } + @Override + public TimeSeriesEntry pollFirstEntry() { + return get(pollFirstEntryAsync()); + } + + @Override + public RFuture> pollFirstEntryAsync() { + return pollEntriesAsync(0, 1, EVAL_FIRST_ENTRY); + } + + @Override + public TimeSeriesEntry pollLastEntry() { + return get(pollLastEntryAsync()); + } + + @Override + public RFuture> pollLastEntryAsync() { + return pollEntriesAsync(-1, 1, EVAL_FIRST_ENTRY); + } + private RFuture pollAsync(int startScore, int limit, RedisCommand command) { return commandExecutor.evalWriteAsync(getRawName(), codec, command, "local values;" + @@ -690,6 +860,35 @@ public class RedissonTimeSeries extends RedissonExpirable implements RTime System.currentTimeMillis(), startScore, limit); } + private RFuture pollEntriesAsync(int startScore, int limit, RedisCommand command) { + return commandExecutor.evalWriteAsync(getRawName(), codec, command, + "local values;" + + "if ARGV[2] == '0' then " + + "values = redis.call('zrangebyscore', KEYS[2], ARGV[1], '+inf', 'withscores', 'limit', 0, ARGV[3]);" + + "else " + + "values = redis.call('zrevrangebyscore', KEYS[2], '+inf', ARGV[1], 'withscores', 'limit', 0, ARGV[3]);" + + "end; " + + + "local result = {}; " + + "for i=1, #values, 2 do " + + "local score = redis.call('zscore', KEYS[1], values[i]); " + + "redis.call('zrem', KEYS[2], values[i]); " + + "redis.call('zrem', KEYS[1], values[i]); " + + "local n, t, val, label = struct.unpack('BBc0Lc0Lc0', values[i]); " + + "table.insert(result, val);" + + "if n == 2 then " + + "label = 0; " + + "end; " + + "table.insert(result, label);" + + "table.insert(result, n);" + + "table.insert(result, score);" + + "end;" + + "return result;", + Arrays.asList(getRawName(), getTimeoutSetName()), + System.currentTimeMillis(), startScore, limit); + } + + public ListScanResult scanIterator(String name, RedisClient client, long startPos, int count) { RFuture> f = scanIteratorAsync(name, client, startPos, count); return get(f); diff --git a/redisson/src/main/java/org/redisson/api/RTimeSeries.java b/redisson/src/main/java/org/redisson/api/RTimeSeries.java index 41033e419..1d62d42b1 100644 --- a/redisson/src/main/java/org/redisson/api/RTimeSeries.java +++ b/redisson/src/main/java/org/redisson/api/RTimeSeries.java @@ -139,14 +139,37 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesA boolean remove(long timestamp); /** - * Removes and returns the head elements or {@code null} if this time-series collection is empty. + * Removes and returns object by specified timestamp. + * + * @param timestamp - object timestamp + * @return object or null if it doesn't exist + */ + V getAndRemove(long timestamp); + + /** + * Removes and returns entry by specified timestamp. + * + * @param timestamp - object timestamp + * @return entry or null if it doesn't exist + */ + TimeSeriesEntry getAndRemoveEntry(long timestamp); + + /** + * Removes and returns the head elements * * @param count - elements amount - * @return the head element, - * or {@code null} if this time-series collection is empty + * @return collection of head elements */ Collection pollFirst(int count); + /** + * Removes and returns head entries + * + * @param count - entries amount + * @return collection of head entries + */ + Collection> pollFirstEntries(int count); + /** * Removes and returns the tail elements or {@code null} if this time-series collection is empty. * @@ -155,6 +178,14 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesA */ Collection pollLast(int count); + /** + * Removes and returns tail entries + * + * @param count - entries amount + * @return collection of tail entries + */ + Collection> pollLastEntries(int count); + /** * Removes and returns the head element or {@code null} if this time-series collection is empty. * @@ -163,6 +194,14 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesA */ V pollFirst(); + /** + * Removes and returns head entry or {@code null} if this time-series collection is empty. + * + * @return the head entry, + * or {@code null} if this time-series collection is empty + */ + TimeSeriesEntry pollFirstEntry(); + /** * Removes and returns the tail element or {@code null} if this time-series collection is empty. * @@ -170,6 +209,13 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesA */ V pollLast(); + /** + * Removes and returns the tail entry or {@code null} if this time-series collection is empty. + * + * @return the tail entry or {@code null} if this time-series collection is empty + */ + TimeSeriesEntry pollLastEntry(); + /** * Returns the tail element or {@code null} if this time-series collection is empty. * @@ -177,6 +223,13 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesA */ V last(); + /** + * Returns the tail entry or {@code null} if this time-series collection is empty. + * + * @return the tail entry or {@code null} if this time-series collection is empty + */ + TimeSeriesEntry lastEntry(); + /** * Returns the head element or {@code null} if this time-series collection is empty. * @@ -184,6 +237,13 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesA */ V first(); + /** + * Returns the head entry or {@code null} if this time-series collection is empty. + * + * @return the head entry or {@code null} if this time-series collection is empty + */ + TimeSeriesEntry firstEntry(); + /** * Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty. * @@ -206,6 +266,14 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesA */ Collection last(int count); + /** + * Returns the tail entries of this time-series collection. + * + * @param count - entries amount + * @return the tail entries + */ + Collection> lastEntries(int count); + /** * Returns the head elements of this time-series collection. * @@ -214,6 +282,14 @@ public interface RTimeSeries extends RExpirable, Iterable, RTimeSeriesA */ Collection first(int count); + /** + * Returns the head entries of this time-series collection. + * + * @param count - entries amount + * @return the head entries + */ + Collection> firstEntries(int count); + /** * Removes values within timestamp range. Including boundary values. * diff --git a/redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java b/redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java index 26cda1663..7e2cc8332 100644 --- a/redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java +++ b/redisson/src/main/java/org/redisson/api/RTimeSeriesAsync.java @@ -145,14 +145,37 @@ public interface RTimeSeriesAsync extends RExpirableAsync { RFuture removeAsync(long timestamp); /** - * Removes and returns the head elements or {@code null} if this time-series collection is empty. + * Removes and returns object by specified timestamp. + * + * @param timestamp - object timestamp + * @return object or null if it doesn't exist + */ + RFuture getAndRemoveAsync(long timestamp); + + /** + * Removes and returns entry by specified timestamp. + * + * @param timestamp - object timestamp + * @return entry or null if it doesn't exist + */ + RFuture> getAndRemoveEntryAsync(long timestamp); + + /** + * Removes and returns the head elements * * @param count - elements amount - * @return the head element, - * or {@code null} if this time-series collection is empty + * @return collection of head elements */ RFuture> pollFirstAsync(int count); + /** + * Removes and returns head entries + * + * @param count - entries amount + * @return collection of head entries + */ + RFuture>> pollFirstEntriesAsync(int count); + /** * Removes and returns the tail elements or {@code null} if this time-series collection is empty. * @@ -161,6 +184,14 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture> pollLastAsync(int count); + /** + * Removes and returns tail entries + * + * @param count - entries amount + * @return collection of tail entries + */ + RFuture>> pollLastEntriesAsync(int count); + /** * Removes and returns the head element or {@code null} if this time-series collection is empty. * @@ -169,6 +200,14 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture pollFirstAsync(); + /** + * Removes and returns head entry or {@code null} if this time-series collection is empty. + * + * @return the head entry, + * or {@code null} if this time-series collection is empty + */ + RFuture> pollFirstEntryAsync(); + /** * Removes and returns the tail element or {@code null} if this time-series collection is empty. * @@ -176,6 +215,13 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture pollLastAsync(); + /** + * Removes and returns the tail entry or {@code null} if this time-series collection is empty. + * + * @return the tail entry or {@code null} if this time-series collection is empty + */ + RFuture> pollLastEntryAsync(); + /** * Returns the tail element or {@code null} if this time-series collection is empty. * @@ -183,6 +229,13 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture lastAsync(); + /** + * Returns the tail entry or {@code null} if this time-series collection is empty. + * + * @return the tail entry or {@code null} if this time-series collection is empty + */ + RFuture> lastEntryAsync(); + /** * Returns the head element or {@code null} if this time-series collection is empty. * @@ -190,6 +243,13 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture firstAsync(); + /** + * Returns the head entry or {@code null} if this time-series collection is empty. + * + * @return the head entry or {@code null} if this time-series collection is empty + */ + RFuture> firstEntryAsync(); + /** * Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty. * @@ -212,6 +272,14 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture> lastAsync(int count); + /** + * Returns the tail entries of this time-series collection. + * + * @param count - entries amount + * @return the tail entries + */ + RFuture>> lastEntriesAsync(int count); + /** * Returns the head elements of this time-series collection. * @@ -220,6 +288,14 @@ public interface RTimeSeriesAsync extends RExpirableAsync { */ RFuture> firstAsync(int count); + /** + * Returns the head entries of this time-series collection. + * + * @param count - entries amount + * @return the head entries + */ + RFuture>> firstEntriesAsync(int count); + /** * Removes values within timestamp range. Including boundary values. * diff --git a/redisson/src/main/java/org/redisson/api/RTimeSeriesReactive.java b/redisson/src/main/java/org/redisson/api/RTimeSeriesReactive.java index 4c69397cb..3d6ba501c 100644 --- a/redisson/src/main/java/org/redisson/api/RTimeSeriesReactive.java +++ b/redisson/src/main/java/org/redisson/api/RTimeSeriesReactive.java @@ -154,6 +154,22 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono remove(long timestamp); + /** + * Removes and returns object by specified timestamp. + * + * @param timestamp - object timestamp + * @return object or null if it doesn't exist + */ + Mono getAndRemove(long timestamp); + + /** + * Removes and returns entry by specified timestamp. + * + * @param timestamp - object timestamp + * @return entry or null if it doesn't exist + */ + Mono> getAndRemoveEntry(long timestamp); + /** * Removes and returns the head elements or {@code null} if this time-series collection is empty. * @@ -163,6 +179,14 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono> pollFirst(int count); + /** + * Removes and returns head entries + * + * @param count - entries amount + * @return collection of head entries + */ + Mono>> pollFirstEntries(int count); + /** * Removes and returns the tail elements or {@code null} if this time-series collection is empty. * @@ -171,6 +195,14 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono> pollLast(int count); + /** + * Removes and returns tail entries + * + * @param count - entries amount + * @return collection of tail entries + */ + Mono>> pollLastEntries(int count); + /** * Removes and returns the head element or {@code null} if this time-series collection is empty. * @@ -179,6 +211,14 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono pollFirst(); + /** + * Removes and returns head entry or {@code null} if this time-series collection is empty. + * + * @return the head entry, + * or {@code null} if this time-series collection is empty + */ + Mono> pollFirstEntry(); + /** * Removes and returns the tail element or {@code null} if this time-series collection is empty. * @@ -186,6 +226,13 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono pollLast(); + /** + * Removes and returns the tail entry or {@code null} if this time-series collection is empty. + * + * @return the tail entry or {@code null} if this time-series collection is empty + */ + Mono> pollLastEntry(); + /** * Returns the tail element or {@code null} if this time-series collection is empty. * @@ -193,6 +240,13 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono last(); + /** + * Returns the tail entry or {@code null} if this time-series collection is empty. + * + * @return the tail entry or {@code null} if this time-series collection is empty + */ + Mono> lastEntry(); + /** * Returns the head element or {@code null} if this time-series collection is empty. * @@ -200,6 +254,13 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono first(); + /** + * Returns the head entry or {@code null} if this time-series collection is empty. + * + * @return the head entry or {@code null} if this time-series collection is empty + */ + Mono> firstEntry(); + /** * Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty. * @@ -222,6 +283,14 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono> last(int count); + /** + * Returns the tail entries of this time-series collection. + * + * @param count - entries amount + * @return the tail entries + */ + Mono>> lastEntries(int count); + /** * Returns the head elements of this time-series collection. * @@ -230,6 +299,14 @@ public interface RTimeSeriesReactive extends RExpirableReactive { */ Mono> first(int count); + /** + * Returns the head entries of this time-series collection. + * + * @param count - entries amount + * @return the head entries + */ + Mono>> firstEntries(int count); + /** * Removes values within timestamp range. Including boundary values. * diff --git a/redisson/src/main/java/org/redisson/api/RTimeSeriesRx.java b/redisson/src/main/java/org/redisson/api/RTimeSeriesRx.java index 59c6f5187..6992eab63 100644 --- a/redisson/src/main/java/org/redisson/api/RTimeSeriesRx.java +++ b/redisson/src/main/java/org/redisson/api/RTimeSeriesRx.java @@ -152,6 +152,22 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Single remove(long timestamp); + /** + * Removes and returns object by specified timestamp. + * + * @param timestamp - object timestamp + * @return object or null if it doesn't exist + */ + Maybe getAndRemove(long timestamp); + + /** + * Removes and returns entry by specified timestamp. + * + * @param timestamp - object timestamp + * @return entry or null if it doesn't exist + */ + Maybe> getAndRemoveEntry(long timestamp); + /** * Removes and returns the head elements or {@code null} if this time-series collection is empty. * @@ -161,6 +177,14 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Single> pollFirst(int count); + /** + * Removes and returns head entries + * + * @param count - entries amount + * @return collection of head entries + */ + Single>> pollFirstEntries(int count); + /** * Removes and returns the tail elements or {@code null} if this time-series collection is empty. * @@ -169,6 +193,14 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Single> pollLast(int count); + /** + * Removes and returns tail entries + * + * @param count - entries amount + * @return collection of tail entries + */ + Single>> pollLastEntries(int count); + /** * Removes and returns the head element or {@code null} if this time-series collection is empty. * @@ -177,6 +209,14 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Maybe pollFirst(); + /** + * Removes and returns head entry or {@code null} if this time-series collection is empty. + * + * @return the head entry, + * or {@code null} if this time-series collection is empty + */ + Maybe> pollFirstEntry(); + /** * Removes and returns the tail element or {@code null} if this time-series collection is empty. * @@ -184,6 +224,13 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Maybe pollLast(); + /** + * Removes and returns the tail entry or {@code null} if this time-series collection is empty. + * + * @return the tail entry or {@code null} if this time-series collection is empty + */ + Maybe> pollLastEntry(); + /** * Returns the tail element or {@code null} if this time-series collection is empty. * @@ -191,6 +238,13 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Maybe last(); + /** + * Returns the tail entry or {@code null} if this time-series collection is empty. + * + * @return the tail entry or {@code null} if this time-series collection is empty + */ + Maybe> lastEntry(); + /** * Returns the head element or {@code null} if this time-series collection is empty. * @@ -198,6 +252,13 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Maybe first(); + /** + * Returns the head entry or {@code null} if this time-series collection is empty. + * + * @return the head entry or {@code null} if this time-series collection is empty + */ + Maybe> firstEntry(); + /** * Returns timestamp of the head timestamp or {@code null} if this time-series collection is empty. * @@ -220,6 +281,14 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Single> last(int count); + /** + * Returns the tail entries of this time-series collection. + * + * @param count - entries amount + * @return the tail entries + */ + Single>> lastEntries(int count); + /** * Returns the head elements of this time-series collection. * @@ -228,6 +297,14 @@ public interface RTimeSeriesRx extends RExpirableRx { */ Single> first(int count); + /** + * Returns the head entries of this time-series collection. + * + * @param count - entries amount + * @return the head entries + */ + Single>> firstEntries(int count); + /** * Removes values within timestamp range. Including boundary values. * diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/TimeSeriesEntryReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/TimeSeriesEntryReplayDecoder.java index 7b8f450df..29e708e8d 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/TimeSeriesEntryReplayDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/TimeSeriesEntryReplayDecoder.java @@ -22,6 +22,7 @@ import org.redisson.client.handler.State; import org.redisson.client.protocol.Decoder; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -31,6 +32,16 @@ import java.util.List; */ public class TimeSeriesEntryReplayDecoder implements MultiDecoder>> { + private boolean reverse; + + public TimeSeriesEntryReplayDecoder() { + this(false); + } + + public TimeSeriesEntryReplayDecoder(boolean reverse) { + this.reverse = reverse; + } + @Override public Decoder getDecoder(Codec codec, int paramNum, State state) { if (paramNum % 4 == 2 || paramNum % 4 == 3) { @@ -50,6 +61,9 @@ public class TimeSeriesEntryReplayDecoder implements MultiDecoder((Long) parts.get(i + 3), parts.get(i), label)); } + if (reverse) { + Collections.reverse(result); + } return result; } diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/TimeSeriesFirstEntryReplayDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/TimeSeriesFirstEntryReplayDecoder.java new file mode 100644 index 000000000..1daf7433f --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/TimeSeriesFirstEntryReplayDecoder.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2013-2022 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import org.redisson.api.TimeSeriesEntry; +import org.redisson.client.codec.Codec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +import java.util.List; + +/** + * + * @author Nikita Koksharov + * + */ +public class TimeSeriesFirstEntryReplayDecoder implements MultiDecoder { + + private final TimeSeriesEntryReplayDecoder decoder = new TimeSeriesEntryReplayDecoder(); + + @Override + public Decoder getDecoder(Codec codec, int paramNum, State state) { + return decoder.getDecoder(codec, paramNum, state); + } + + @Override + public Object decode(List parts, State state) { + List> list = decoder.decode(parts, state); + if (!list.isEmpty()) { + return list.get(0); + } + return null; + } + +} diff --git a/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java b/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java index 5c0c77dcd..8b0af7bb4 100644 --- a/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTimeSeriesTest.java @@ -4,6 +4,7 @@ import org.junit.jupiter.api.Test; import org.redisson.api.RTimeSeries; import org.redisson.api.TimeSeriesEntry; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -11,6 +12,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; +import static org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS.s; /** * @@ -166,6 +168,53 @@ public class RedissonTimeSeriesTest extends BaseTest { assertThat(ee.getLabel()).isEqualTo("label2"); } + @Test + public void testGetAndRemoveEntry() { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10", "100"); + t.add(2, "20"); + t.add(3, "30", "300", Duration.ofSeconds(2)); + t.add(4, "40"); + + TimeSeriesEntry e1 = t.getAndRemoveEntry(1); + assertThat(e1.getValue()).isEqualTo("10"); + assertThat(e1.getTimestamp()).isEqualTo(1); + assertThat(e1.getLabel()).isEqualTo("100"); + + TimeSeriesEntry e2 = t.getAndRemoveEntry(2); + assertThat(e2.getValue()).isEqualTo("20"); + assertThat(e2.getTimestamp()).isEqualTo(2); + assertThat(e2.getLabel()).isNull(); + + TimeSeriesEntry e3 = t.getAndRemoveEntry(3); + assertThat(e3.getValue()).isEqualTo("30"); + assertThat(e3.getTimestamp()).isEqualTo(3); + assertThat(e3.getLabel()).isEqualTo("300"); + + TimeSeriesEntry e4 = t.getAndRemoveEntry(4); + assertThat(e4.getValue()).isEqualTo("40"); + assertThat(e4.getTimestamp()).isEqualTo(4); + assertThat(e4.getLabel()).isNull(); + } + + + @Test + public void testGetAndRemove() { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10", "100"); + t.add(2, "20"); + t.add(3, "30", "300", Duration.ofSeconds(2)); + t.add(4, "40"); + + String s1 = t.getAndRemove(1); + assertThat(s1).isEqualTo("10"); + String s2 = t.getAndRemove(2); + assertThat(s2).isEqualTo("20"); + String s3 = t.getAndRemove(3); + assertThat(s3).isEqualTo("30"); + assertThat(t.size()).isEqualTo(1); + } + @Test public void test() { RTimeSeries t = redisson.getTimeSeries("test"); @@ -265,6 +314,34 @@ public class RedissonTimeSeriesTest extends BaseTest { assertThat(r2).containsExactly("30", "10"); } + @Test + public void testPollLastEntries() { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10"); + t.add(2, "20", "200"); + t.add(3, "30"); + + Collection> s = t.pollLastEntries(2); + assertThat(s).containsExactly(new TimeSeriesEntry<>(2, "20", "200"), + new TimeSeriesEntry<>(3, "30")); + + assertThat(t.size()).isEqualTo(1); + } + + @Test + public void testPollFirstEntries() { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10", "100"); + t.add(2, "20"); + t.add(3, "30"); + + Collection> s = t.pollFirstEntries(2); + assertThat(s).containsExactly(new TimeSeriesEntry<>(1, "10", "100"), + new TimeSeriesEntry<>(2, "20")); + + assertThat(t.size()).isEqualTo(1); + } + @Test public void testPoll() throws InterruptedException { RTimeSeries t = redisson.getTimeSeries("test"); @@ -294,4 +371,64 @@ public class RedissonTimeSeriesTest extends BaseTest { assertThat(t.size()).isEqualTo(2); } + @Test + public void testPollFirstEntry() { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10", "100"); + t.add(2, "20"); + t.add(3, "30"); + + TimeSeriesEntry e = t.pollFirstEntry(); + assertThat(e).isEqualTo(new TimeSeriesEntry<>(1, "10", "100")); + + assertThat(t.size()).isEqualTo(2); + + TimeSeriesEntry ee = t.firstEntry(); + assertThat(ee).isEqualTo(new TimeSeriesEntry<>(2, "20")); + } + + @Test + public void testPollLastEntry() { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10", "100"); + t.add(2, "20"); + t.add(3, "30"); + + TimeSeriesEntry e = t.pollLastEntry(); + assertThat(e).isEqualTo(new TimeSeriesEntry<>(3, "30")); + + assertThat(t.size()).isEqualTo(2); + + TimeSeriesEntry ee = t.lastEntry(); + assertThat(ee).isEqualTo(new TimeSeriesEntry<>(2, "20")); + } + + @Test + public void testLastEntries() { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10"); + t.add(2, "20", "200"); + t.add(3, "30"); + + Collection> s = t.lastEntries(2); + assertThat(s).containsExactly(new TimeSeriesEntry<>(2, "20", "200"), + new TimeSeriesEntry<>(3, "30")); + + assertThat(t.size()).isEqualTo(3); + } + + @Test + public void testFirstEntries() { + RTimeSeries t = redisson.getTimeSeries("test"); + t.add(1, "10", "100"); + t.add(2, "20"); + t.add(3, "30"); + + Collection> s = t.firstEntries(2); + assertThat(s).containsExactly(new TimeSeriesEntry<>(1, "10", "100"), + new TimeSeriesEntry<>(2, "20")); + + assertThat(t.size()).isEqualTo(3); + } + }