From be4ffed6ef0875cd59bb47f89fb442bea5858440 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 31 May 2023 14:41:10 +0300 Subject: [PATCH] Feature - pollFirstEntries(...) and pollLastEntries(...) methods added to RScoredSortedSet object. #4897 --- .../org/redisson/RedissonScoredSortedSet.java | 80 +++++++++++++++++++ .../org/redisson/api/RScoredSortedSet.java | 55 +++++++++++++ .../redisson/api/RScoredSortedSetAsync.java | 53 ++++++++++++ .../api/RScoredSortedSetReactive.java | 60 +++++++++++++- .../org/redisson/api/RScoredSortedSetRx.java | 59 +++++++++++++- .../client/protocol/RedisCommands.java | 28 +++++++ .../decoder/ListFirstObjectDecoder.java | 22 ++++- .../redisson/RedissonScoredSortedSetTest.java | 33 ++++++++ 8 files changed, 386 insertions(+), 4 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index fb3e5f78e..7fc6a60c0 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -132,6 +132,86 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc Collections.singletonList(getRawName()), from, to); } + @Override + public ScoredEntry pollFirstEntry() { + return get(pollFirstEntryAsync()); + } + + @Override + public ScoredEntry pollLastEntry() { + return get(pollLastEntryAsync()); + } + + @Override + public RFuture> pollFirstEntryAsync() { + return pollEntry(0, 0, RedisCommands.EVAL_FIRST_LIST_ENTRY); + } + + @Override + public RFuture> pollLastEntryAsync() { + return pollEntry(-1, -1, RedisCommands.EVAL_FIRST_LIST_ENTRY); + } + + @Override + public List> pollFirstEntries(int count) { + return get(pollFirstEntriesAsync(count)); + } + + @Override + public List> pollLastEntries(int count) { + return get(pollLastEntriesAsync(count)); + } + + @Override + public RFuture>> pollFirstEntriesAsync(int count) { + if (count <= 0) { + return new CompletableFutureWrapper<>(Collections.emptyList()); + } + + return poll(0, count-1, RedisCommands.EVAL_LIST_ENTRY); + } + + @Override + public RFuture>> pollLastEntriesAsync(int count) { + if (count <= 0) { + return new CompletableFutureWrapper<>(Collections.emptyList()); + } + return poll(-count, -1, RedisCommands.EVAL_LIST_ENTRY); + } + + private RFuture pollEntry(int from, int to, RedisCommand command) { + return commandExecutor.evalWriteAsync(getRawName(), codec, command, + "local v = redis.call('zrange', KEYS[1], ARGV[1], ARGV[2], 'withscores'); " + + "if #v > 0 then " + + "redis.call('zremrangebyrank', KEYS[1], ARGV[1], ARGV[2]); " + + "return v; " + + "end " + + "return v;", + Collections.singletonList(getRawName()), from, to); + } + + @Override + public List> pollFirstEntries(Duration duration, int count) { + return get(pollFirstEntriesAsync(duration, count)); + } + + @Override + public List> pollLastEntries(Duration duration, int count) { + return get(pollLastEntriesAsync(duration, count)); + } + + @Override + public RFuture>> pollFirstEntriesAsync(Duration duration, int count) { + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_ENTRIES, + duration.getSeconds(), 1, getRawName(), "MIN", "COUNT", count); + } + + @Override + public RFuture>> pollLastEntriesAsync(Duration duration, int count) { + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_ENTRIES, + duration.getSeconds(), 1, getRawName(), "MAX", "COUNT", count); + } + @Override public V pollFirst(long timeout, TimeUnit unit) { return get(pollFirstAsync(timeout, unit)); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index b45cff06d..1f6bcf31d 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -244,6 +244,7 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * Requires Redis 7.0.0 and higher. * * @param duration how long to wait before giving up + * @param count entries amount * @return the head elements */ List pollFirst(Duration duration, int count); @@ -295,6 +296,34 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ V pollFirst(); + /** + * Removes and returns the head entry (value and its score) or {@code null} if this sorted set is empty. + * + * @return the head entry, + * or {@code null} if this sorted set is empty + */ + ScoredEntry pollFirstEntry(); + + /** + * Removes and returns the head entries (value and its score) of this sorted set. + * + * @param count entries amount + * @return the head entries of this sorted set + */ + List> pollFirstEntries(int count); + + /** + * Removes and returns the head entries (value and its score). + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count entries amount + * @return the head entries + */ + List> pollFirstEntries(Duration duration, int count); + + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. * @@ -302,6 +331,32 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ V pollLast(); + /** + * Removes and returns the tail entry (value and its score) or {@code null} if this sorted set is empty. + * + * @return the tail entry or {@code null} if this sorted set is empty + */ + ScoredEntry pollLastEntry(); + + /** + * Removes and returns the tail entries (value and its score) of this sorted set. + * + * @param count entries amount + * @return the tail entries of this sorted set + */ + List> pollLastEntries(int count); + + /** + * Removes and returns the head entries (value and its score). + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count entries amount + * @return the tail entries + */ + List> pollLastEntries(Duration duration, int count); + /** * Returns the head element or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index ffb6faffe..96fefbe43 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -258,6 +258,33 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn */ RFuture pollFirstAsync(); + /** + * Removes and returns the head entry (value and its score) or {@code null} if this sorted set is empty. + * + * @return the head entry, + * or {@code null} if this sorted set is empty + */ + RFuture> pollFirstEntryAsync(); + + /** + * Removes and returns the head entries (value and its score) of this sorted set. + * + * @param count entries amount + * @return the head entries of this sorted set + */ + RFuture>> pollFirstEntriesAsync(int count); + + /** + * Removes and returns the head entries (value and its score). + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count entries amount + * @return the head entries + */ + RFuture>> pollFirstEntriesAsync(Duration duration, int count); + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. * @@ -265,6 +292,32 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn */ RFuture pollLastAsync(); + /** + * Removes and returns the tail entry (value and its score) or {@code null} if this sorted set is empty. + * + * @return the tail entry or {@code null} if this sorted set is empty + */ + RFuture> pollLastEntryAsync(); + + /** + * Removes and returns the tail entries (value and its score) of this sorted set. + * + * @param count entries amount + * @return the tail entries of this sorted set + */ + RFuture>> pollLastEntriesAsync(int count); + + /** + * Removes and returns the head entries (value and its score). + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count entries amount + * @return the tail entries + */ + RFuture>> pollLastEntriesAsync(Duration duration, int count); + /** * Returns the head element or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index 7633feac7..14ebb5c6c 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -15,6 +15,8 @@ */ package org.redisson.api; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Single; import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.client.protocol.RankedEntry; import org.redisson.client.protocol.ScoredEntry; @@ -22,7 +24,10 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -242,6 +247,33 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab */ Mono pollFirst(); + /** + * Removes and returns the head entry (value and its score) or {@code null} if this sorted set is empty. + * + * @return the head entry, + * or {@code null} if this sorted set is empty + */ + Maybe> pollFirstEntry(); + + /** + * Removes and returns the head entries (value and its score) of this sorted set. + * + * @param count entries amount + * @return the head entries of this sorted set + */ + Single>> pollFirstEntries(int count); + + /** + * Removes and returns the head entries (value and its score). + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count entries amount + * @return the head entries + */ + Single>> pollFirstEntries(Duration duration, int count); + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. * @@ -249,6 +281,32 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab */ Mono pollLast(); + /** + * Removes and returns the tail entry (value and its score) or {@code null} if this sorted set is empty. + * + * @return the tail entry or {@code null} if this sorted set is empty + */ + Maybe> pollLastEntry(); + + /** + * Removes and returns the tail entries (value and its score) of this sorted set. + * + * @param count entries amount + * @return the tail entries of this sorted set + */ + Single>> pollLastEntries(int count); + + /** + * Removes and returns the head entries (value and its score). + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count entries amount + * @return the tail entries + */ + Single>> pollLastEntries(Duration duration, int count); + /** * Returns the head element or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java index 95c1b4bc7..105def573 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java @@ -23,7 +23,10 @@ import org.redisson.client.protocol.RankedEntry; import org.redisson.client.protocol.ScoredEntry; import java.time.Duration; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -244,6 +247,34 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> */ Maybe pollFirst(); + + /** + * Removes and returns the head entry (value and its score) or {@code null} if this sorted set is empty. + * + * @return the head entry, + * or {@code null} if this sorted set is empty + */ + Maybe> pollFirstEntry(); + + /** + * Removes and returns the head entries (value and its score) of this sorted set. + * + * @param count entries amount + * @return the head entries of this sorted set + */ + Single>> pollFirstEntries(int count); + + /** + * Removes and returns the head entries (value and its score). + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count entries amount + * @return the head entries + */ + Single>> pollFirstEntries(Duration duration, int count); + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. * @@ -251,6 +282,32 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> */ Maybe pollLast(); + /** + * Removes and returns the tail entry (value and its score) or {@code null} if this sorted set is empty. + * + * @return the tail entry or {@code null} if this sorted set is empty + */ + Maybe> pollLastEntry(); + + /** + * Removes and returns the tail entries (value and its score) of this sorted set. + * + * @param count entries amount + * @return the tail entries of this sorted set + */ + Single>> pollLastEntries(int count); + + /** + * Removes and returns the head entries (value and its score). + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count entries amount + * @return the tail entries + */ + Single>> pollLastEntries(Duration duration, int count); + /** * Returns the head element or {@code null} if this sorted set is empty. * diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 2c9e7cee2..1f624bc31 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -117,6 +117,32 @@ public interface RedisCommands { RedisCommand> ZPOPMIN = new RedisCommand>("ZPOPMIN", new ObjectListReplayDecoder()); RedisCommand> ZPOPMAX = new RedisCommand>("ZPOPMAX", new ObjectListReplayDecoder()); + RedisCommand> BZMPOP_ENTRIES = new RedisCommand<>("BZMPOP", + new ListMultiDecoder2( + new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) { + @Override + public Object decode(List parts, State state) { + for (int i = 0; i < parts.size(); i+= 2) { + List> entries = (List>) parts.get(i + 1); + List map = new ArrayList<>(); + for (List entry : entries) { + map.add(new ScoredEntry((Double) entry.get(1), entry.get(0))); + } + return map; + } + return Collections.emptyList(); + } + }, + new CodecDecoder(), + new CodecDecoder() { + @Override + public Decoder getDecoder(Codec codec, int paramNum, State state) { + if ((paramNum + 1) % 2 == 0) { + return DoubleCodec.INSTANCE.getValueDecoder(); + } + return codec.getValueDecoder(); + } + })); RedisCommand>> ZMPOP = new RedisCommand<>("ZMPOP", new ListMultiDecoder2( new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) { @@ -403,7 +429,9 @@ public interface RedisCommands { RedisStrictCommand EVAL_LONG_SAFE = new RedisStrictCommand("EVAL", new LongReplayConvertor()); RedisStrictCommand EVAL_VOID = new RedisStrictCommand("EVAL", new VoidReplayConvertor()); RedisCommand EVAL_FIRST_LIST = new RedisCommand("EVAL", new ListFirstObjectDecoder()); + RedisCommand EVAL_FIRST_LIST_ENTRY = new RedisCommand("EVAL", new ListFirstObjectDecoder(new ScoredSortedSetReplayDecoder())); RedisCommand> EVAL_LIST = new RedisCommand>("EVAL", new ObjectListReplayDecoder()); + RedisCommand> EVAL_LIST_ENTRY = new RedisCommand<>("EVAL", new ScoredSortedSetReplayDecoder()); RedisCommand> EVAL_LIST_REVERSE = new RedisCommand>("EVAL", new ObjectListReplayDecoder<>(true)); RedisCommand> EVAL_INT_LIST = new RedisCommand("EVAL", new ObjectListReplayDecoder(), new IntegerReplayConvertor()); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListFirstObjectDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListFirstObjectDecoder.java index 321eb0bf8..eab2a7a5e 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListFirstObjectDecoder.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListFirstObjectDecoder.java @@ -15,7 +15,9 @@ */ package org.redisson.client.protocol.decoder; +import org.redisson.client.codec.Codec; import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.convertor.Convertor; import java.util.List; @@ -27,18 +29,34 @@ import java.util.List; */ public class ListFirstObjectDecoder implements MultiDecoder { - private final Convertor convertor; + private MultiDecoder inner; + private Convertor convertor; public ListFirstObjectDecoder() { - this(null); + this((Convertor) null); } public ListFirstObjectDecoder(Convertor convertor) { this.convertor = convertor; } + public ListFirstObjectDecoder(MultiDecoder inner) { + this.inner = inner; + } + + @Override + public Decoder getDecoder(Codec codec, int paramNum, State state) { + if (inner != null) { + return inner.getDecoder(codec, paramNum, state); + } + return MultiDecoder.super.getDecoder(codec, paramNum, state); + } + @Override public Object decode(List parts, State state) { + if (inner != null) { + parts = (List) inner.decode(parts, state); + } if (!parts.isEmpty()) { return parts.get(0); } diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index 132996c29..0ddbbbdf3 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -27,6 +27,39 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class RedissonScoredSortedSetTest extends BaseTest { + @Test + public void testPollEntryDuration() { + RScoredSortedSet set = redisson.getScoredSortedSet("test"); + set.add(1.1, "v1"); + set.add(1.2, "v2"); + set.add(1.3, "v3"); + set.add(1.4, "v4"); + set.add(1.5, "v5"); + + List> v1 = set.pollFirstEntries(Duration.ofSeconds(1), 2); + assertThat(v1).containsOnly(new ScoredEntry<>(1.1, "v1"), new ScoredEntry<>(1.2, "v2")); + + List> v2 = set.pollLastEntries(Duration.ofSeconds(1), 2); + assertThat(v2).containsOnly(new ScoredEntry<>(1.4, "v4"), new ScoredEntry<>(1.5, "v5")); + + assertThat(set.size()).isEqualTo(1); + } + @Test + public void testPollEntry() { + RScoredSortedSet set = redisson.getScoredSortedSet("test"); + set.add(1.1, "v1"); + set.add(1.2, "v2"); + set.add(1.3, "v3"); + + ScoredEntry e = set.pollFirstEntry(); + assertThat(e).isEqualTo(new ScoredEntry<>(1.1, "v1")); + + ScoredEntry e2 = set.pollLastEntry(); + assertThat(e2).isEqualTo(new ScoredEntry<>(1.3, "v3")); + + assertThat(set.size()).isEqualTo(1); + } + @Test public void testEntryScanIterator() { RScoredSortedSet set = redisson.getScoredSortedSet("test");