diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index ec0f243c1..df9a6371a 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -30,6 +30,7 @@ import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.RedissonPromise; import java.math.BigDecimal; +import java.time.Duration; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; @@ -170,6 +171,28 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZPOPMAX_VALUE, getRawName(), toSeconds(timeout, unit)); } + @Override + public List pollFirst(Duration duration, int count) { + return get(pollFirstAsync(duration, count)); + } + + @Override + public RFuture> pollFirstAsync(Duration duration, int count) { + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_SINGLE_LIST, + duration.getSeconds(), 1, getRawName(), "MIN", "COUNT", count); + } + + @Override + public List pollLast(Duration duration, int count) { + return get(pollLastAsync(duration, count)); + } + + @Override + public RFuture> pollLastAsync(Duration duration, int count) { + return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_SINGLE_LIST, + duration.getSeconds(), 1, getRawName(), "MAX", "COUNT", count); + } + @Override public V random() { return get(randomAsync()); diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java index aedcdf2a0..3fde6036e 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSet.java @@ -15,6 +15,7 @@ */ package org.redisson.api; +import java.time.Duration; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -124,6 +125,8 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< /** * Removes and returns the head element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. * * @param timeout how long to wait before giving up, in units of * {@code unit} @@ -134,8 +137,20 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< */ V pollFirst(long timeout, TimeUnit unit); + /** + * Removes and returns the head elements. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @return the head elements + */ + List pollFirst(Duration duration, int count); + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. + *

+ * Requires Redis 5.0.0 and higher. * * @param timeout how long to wait before giving up, in units of * {@code unit} @@ -144,7 +159,17 @@ public interface RScoredSortedSet extends RScoredSortedSetAsync, Iterable< * @return the tail element or {@code null} if this sorted set is empty */ V pollLast(long timeout, TimeUnit unit); - + + /** + * Removes and returns the tail elements. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @return the tail elements + */ + List pollLast(Duration duration, int count); + /** * Removes and returns the head elements of this sorted set. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java index d38479c96..7d7ddad1e 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetAsync.java @@ -15,6 +15,7 @@ */ package org.redisson.api; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; @@ -79,6 +80,17 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn */ RFuture pollFirstAsync(long timeout, TimeUnit unit); + /** + * Removes and returns the head elements. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @return the head elements + */ + RFuture> pollFirstAsync(Duration duration, int count); + /** * Removes and returns the head element waiting if necessary for an element to become available. * @@ -105,7 +117,18 @@ public interface RScoredSortedSetAsync extends RExpirableAsync, RSortableAsyn * @return the tail element or {@code null} if this sorted set is empty */ RFuture pollLastAsync(long timeout, TimeUnit unit); - + + /** + * Removes and returns the tail elements. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @return the tail elements + */ + RFuture> pollLastAsync(Duration duration, int count); + /** * Removes and returns the head elements of this sorted set. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index f12fafe79..78f29eacf 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -20,6 +20,7 @@ import org.redisson.client.protocol.ScoredEntry; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; @@ -82,6 +83,16 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab */ Mono pollFirst(long timeout, TimeUnit unit); + /** + * Removes and returns the head elements. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @return the head elements + */ + Mono> pollFirst(Duration duration, int count); + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. *

@@ -94,7 +105,17 @@ public interface RScoredSortedSetReactive extends RExpirableReactive, RSortab * @return the tail element or {@code null} if this sorted set is empty */ Mono pollLast(long timeout, TimeUnit unit); - + + /** + * Removes and returns the tail elements. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @return the tail elements + */ + Mono> pollLast(Duration duration, int count); + /** * Removes and returns the head elements of this sorted set. * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java index b161b1605..748f4c931 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetRx.java @@ -21,6 +21,7 @@ import io.reactivex.rxjava3.core.Single; import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.client.protocol.ScoredEntry; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; @@ -83,6 +84,17 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> */ Maybe pollFirst(long timeout, TimeUnit unit); + /** + * Removes and returns the head elements. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @param count elements amount + * @return the head element + */ + Single> pollFirst(Duration duration, int count); + /** * Removes and returns the tail element or {@code null} if this sorted set is empty. *

@@ -95,7 +107,17 @@ public interface RScoredSortedSetRx extends RExpirableRx, RSortableRx> * @return the tail element or {@code null} if this sorted set is empty */ Maybe pollLast(long timeout, TimeUnit unit); - + + /** + * Removes and returns the tail elements. + *

+ * Requires Redis 7.0.0 and higher. + * + * @param duration how long to wait before giving up + * @return the tail elements + */ + Single> pollLast(Duration duration, int count); + /** * Removes and returns the head elements of this sorted set. * diff --git a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java index 659594a95..6df4b0179 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java +++ b/redisson/src/main/java/org/redisson/client/protocol/RedisCommands.java @@ -20,6 +20,7 @@ import org.redisson.api.RType; import org.redisson.api.StreamInfo; import org.redisson.api.StreamMessageId; import org.redisson.client.codec.StringCodec; +import org.redisson.client.handler.State; import org.redisson.client.protocol.convertor.*; import org.redisson.client.protocol.decoder.*; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; @@ -189,6 +190,18 @@ public interface RedisCommands { RedisCommand BRPOPLPUSH = new RedisCommand("BRPOPLPUSH"); RedisCommand> BLPOP = new RedisCommand>("BLPOP", new ObjectListReplayDecoder()); RedisCommand> BRPOP = new RedisCommand>("BRPOP", new ObjectListReplayDecoder()); + RedisCommand> BZMPOP_SINGLE_LIST = new RedisCommand<>("BZMPOP", new ListMultiDecoder2( + new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) { + @Override + public Object decode(List parts, State state) { + if (parts.isEmpty()) { + return parts; + } + return parts.get(1); + } + }, + new CodecDecoder(), + new SublistDecoder())); RedisCommand BLPOP_VALUE = new RedisCommand("BLPOP", new ListObjectDecoder(1)); RedisCommand BLMOVE = new RedisCommand("BLMOVE"); RedisCommand BRPOP_VALUE = new RedisCommand("BRPOP", new ListObjectDecoder(1)); @@ -197,7 +210,7 @@ public interface RedisCommands { Set BLOCKING_COMMAND_NAMES = new HashSet( Arrays.asList(BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName(), - BLPOP.getName(), BRPOP.getName(), BLMOVE.getName())); + BLPOP.getName(), BRPOP.getName(), BLMOVE.getName(), BZMPOP_SINGLE_LIST.getName())); RedisCommand PFADD = new RedisCommand("PFADD", new BooleanReplayConvertor()); RedisStrictCommand PFCOUNT = new RedisStrictCommand("PFCOUNT"); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/SublistDecoder.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/SublistDecoder.java new file mode 100644 index 000000000..6b19827ae --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/SublistDecoder.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.protocol.decoder; + +import org.redisson.client.codec.Codec; +import org.redisson.client.codec.DoubleCodec; +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; + +import java.util.List; + +/** + * + * @author Nikita Koksharov + * + */ +public class SublistDecoder implements MultiDecoder { + + public SublistDecoder() { + super(); + } + + @Override + public Decoder getDecoder(Codec codec, int paramNum, State state) { + if ((paramNum + 1) % 2 == 0) { + return DoubleCodec.INSTANCE.getValueDecoder(); + } + return MultiDecoder.super.getDecoder(codec, paramNum, state); + } + + @Override + public Object decode(List parts, State state) { + if (parts.isEmpty()) { + return null; + } + return parts.get(0); + } + +} diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java index c98773537..41bcc3660 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetTest.java @@ -11,6 +11,7 @@ import org.redisson.client.protocol.ScoredEntry; import org.redisson.config.Config; import java.io.IOException; +import java.time.Duration; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -587,7 +588,49 @@ public class RedissonScoredSortedSetTest extends BaseTest { assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isEqualTo("a"); assertThat(set).containsExactly("b", "c"); } - + + @Test + public void testPollFirstTimeoutCount() { + Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0); + + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + set.add(0.4, "d"); + set.add(0.5, "e"); + set.add(0.6, "f"); + + assertThat(set.pollFirst(Duration.ofSeconds(2), 2)).containsExactly("a", "b"); + assertThat(set).containsExactly("c", "d", "e", "f"); + + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + assertThat(set2.pollFirst(Duration.ofSeconds(1), 2)).isEmpty(); + } + + @Test + public void testPollLastTimeoutCount() { + Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0); + + RScoredSortedSet set = redisson.getScoredSortedSet("simple"); + assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull(); + + set.add(0.1, "a"); + set.add(0.2, "b"); + set.add(0.3, "c"); + set.add(0.4, "d"); + set.add(0.5, "e"); + set.add(0.6, "f"); + + assertThat(set.pollLast(Duration.ofSeconds(2), 2)).containsExactly("f", "e"); + assertThat(set).containsExactly("a", "b", "c", "d"); + + RScoredSortedSet set2 = redisson.getScoredSortedSet("simple2"); + assertThat(set2.pollLast(Duration.ofSeconds(1), 2)).isEmpty(); + } + @Test public void testPollFistAmount() { RScoredSortedSet set = redisson.getScoredSortedSet("simple");