Feature - added RScoredSortedSet.pollFirst() pollLast() methods with timeout and count #4137

pull/4162/head
Nikita Koksharov 3 years ago
parent b65cdcf903
commit 0014642ac3

@ -30,6 +30,7 @@ import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedissonPromise;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
@ -170,6 +171,28 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZPOPMAX_VALUE, getRawName(), toSeconds(timeout, unit));
}
@Override
public List<V> pollFirst(Duration duration, int count) {
return get(pollFirstAsync(duration, count));
}
@Override
public RFuture<List<V>> pollFirstAsync(Duration duration, int count) {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_SINGLE_LIST,
duration.getSeconds(), 1, getRawName(), "MIN", "COUNT", count);
}
@Override
public List<V> pollLast(Duration duration, int count) {
return get(pollLastAsync(duration, count));
}
@Override
public RFuture<List<V>> pollLastAsync(Duration duration, int count) {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_SINGLE_LIST,
duration.getSeconds(), 1, getRawName(), "MAX", "COUNT", count);
}
@Override
public V random() {
return get(randomAsync());

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@ -124,6 +125,8 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
/**
* Removes and returns the head element or {@code null} if this sorted set is empty.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
@ -134,8 +137,20 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
V pollFirst(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @return the head elements
*/
List<V> pollFirst(Duration duration, int count);
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
@ -144,7 +159,17 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* @return the tail element or {@code null} if this sorted set is empty
*/
V pollLast(long timeout, TimeUnit unit);
/**
* Removes and returns the tail elements.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @return the tail elements
*/
List<V> pollLast(Duration duration, int count);
/**
* Removes and returns the head elements of this sorted set.
*

@ -15,6 +15,7 @@
*/
package org.redisson.api;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -79,6 +80,17 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
*/
RFuture<V> pollFirstAsync(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @return the head elements
*/
RFuture<List<V>> pollFirstAsync(Duration duration, int count);
/**
* Removes and returns the head element waiting if necessary for an element to become available.
*
@ -105,7 +117,18 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
* @return the tail element or {@code null} if this sorted set is empty
*/
RFuture<V> pollLastAsync(long timeout, TimeUnit unit);
/**
* Removes and returns the tail elements.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @return the tail elements
*/
RFuture<List<V>> pollLastAsync(Duration duration, int count);
/**
* Removes and returns the head elements of this sorted set.
*

@ -20,6 +20,7 @@ import org.redisson.client.protocol.ScoredEntry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -82,6 +83,16 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
*/
Mono<V> pollFirst(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @return the head elements
*/
Mono<List<V>> pollFirst(Duration duration, int count);
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
* <p>
@ -94,7 +105,17 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
* @return the tail element or {@code null} if this sorted set is empty
*/
Mono<V> pollLast(long timeout, TimeUnit unit);
/**
* Removes and returns the tail elements.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @return the tail elements
*/
Mono<List<V>> pollLast(Duration duration, int count);
/**
* Removes and returns the head elements of this sorted set.
*

@ -21,6 +21,7 @@ import io.reactivex.rxjava3.core.Single;
import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.client.protocol.ScoredEntry;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -83,6 +84,17 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
*/
Maybe<V> pollFirst(long timeout, TimeUnit unit);
/**
* Removes and returns the head elements.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count elements amount
* @return the head element
*/
Single<List<V>> pollFirst(Duration duration, int count);
/**
* Removes and returns the tail element or {@code null} if this sorted set is empty.
* <p>
@ -95,7 +107,17 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
* @return the tail element or {@code null} if this sorted set is empty
*/
Maybe<V> pollLast(long timeout, TimeUnit unit);
/**
* Removes and returns the tail elements.
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @return the tail elements
*/
Single<List<V>> pollLast(Duration duration, int count);
/**
* Removes and returns the head elements of this sorted set.
*

@ -20,6 +20,7 @@ import org.redisson.api.RType;
import org.redisson.api.StreamInfo;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.convertor.*;
import org.redisson.client.protocol.decoder.*;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
@ -189,6 +190,18 @@ public interface RedisCommands {
RedisCommand<Object> BRPOPLPUSH = new RedisCommand<Object>("BRPOPLPUSH");
RedisCommand<List<Object>> BLPOP = new RedisCommand<List<Object>>("BLPOP", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> BRPOP = new RedisCommand<List<Object>>("BRPOP", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> BZMPOP_SINGLE_LIST = new RedisCommand<>("BZMPOP", new ListMultiDecoder2(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) {
@Override
public Object decode(List parts, State state) {
if (parts.isEmpty()) {
return parts;
}
return parts.get(1);
}
},
new CodecDecoder(),
new SublistDecoder()));
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new ListObjectDecoder<Object>(1));
RedisCommand<Object> BLMOVE = new RedisCommand<Object>("BLMOVE");
RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new ListObjectDecoder<Object>(1));
@ -197,7 +210,7 @@ public interface RedisCommands {
Set<String> BLOCKING_COMMAND_NAMES = new HashSet<String>(
Arrays.asList(BRPOPLPUSH.getName(), BZPOPMIN_VALUE.getName(), BZPOPMAX_VALUE.getName(),
BLPOP.getName(), BRPOP.getName(), BLMOVE.getName()));
BLPOP.getName(), BRPOP.getName(), BLMOVE.getName(), BZMPOP_SINGLE_LIST.getName()));
RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor());
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");

@ -0,0 +1,52 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
*/
public class SublistDecoder implements MultiDecoder<Object> {
public SublistDecoder() {
super();
}
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
if ((paramNum + 1) % 2 == 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return MultiDecoder.super.getDecoder(codec, paramNum, state);
}
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
return parts.get(0);
}
}

@ -11,6 +11,7 @@ import org.redisson.client.protocol.ScoredEntry;
import org.redisson.config.Config;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -587,7 +588,49 @@ public class RedissonScoredSortedSetTest extends BaseTest {
assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isEqualTo("a");
assertThat(set).containsExactly("b", "c");
}
@Test
public void testPollFirstTimeoutCount() {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull();
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
set.add(0.5, "e");
set.add(0.6, "f");
assertThat(set.pollFirst(Duration.ofSeconds(2), 2)).containsExactly("a", "b");
assertThat(set).containsExactly("c", "d", "e", "f");
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("simple2");
assertThat(set2.pollFirst(Duration.ofSeconds(1), 2)).isEmpty();
}
@Test
public void testPollLastTimeoutCount() {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("7.0.0") > 0);
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
assertThat(set.pollFirst(1, TimeUnit.SECONDS)).isNull();
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
set.add(0.5, "e");
set.add(0.6, "f");
assertThat(set.pollLast(Duration.ofSeconds(2), 2)).containsExactly("f", "e");
assertThat(set).containsExactly("a", "b", "c", "d");
RScoredSortedSet<String> set2 = redisson.getScoredSortedSet("simple2");
assertThat(set2.pollLast(Duration.ofSeconds(1), 2)).isEmpty();
}
@Test
public void testPollFistAmount() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");

Loading…
Cancel
Save