Feature - pollFirstEntries(...) and pollLastEntries(...) methods added to RScoredSortedSet object. #4897

pull/5079/head
Nikita Koksharov 2 years ago
parent 62a94887a0
commit be4ffed6ef

@ -132,6 +132,86 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
Collections.<Object>singletonList(getRawName()), from, to); Collections.<Object>singletonList(getRawName()), from, to);
} }
@Override
public ScoredEntry<V> pollFirstEntry() {
return get(pollFirstEntryAsync());
}
@Override
public ScoredEntry<V> pollLastEntry() {
return get(pollLastEntryAsync());
}
@Override
public RFuture<ScoredEntry<V>> pollFirstEntryAsync() {
return pollEntry(0, 0, RedisCommands.EVAL_FIRST_LIST_ENTRY);
}
@Override
public RFuture<ScoredEntry<V>> pollLastEntryAsync() {
return pollEntry(-1, -1, RedisCommands.EVAL_FIRST_LIST_ENTRY);
}
@Override
public List<ScoredEntry<V>> pollFirstEntries(int count) {
return get(pollFirstEntriesAsync(count));
}
@Override
public List<ScoredEntry<V>> pollLastEntries(int count) {
return get(pollLastEntriesAsync(count));
}
@Override
public RFuture<List<ScoredEntry<V>>> pollFirstEntriesAsync(int count) {
if (count <= 0) {
return new CompletableFutureWrapper<>(Collections.emptyList());
}
return poll(0, count-1, RedisCommands.EVAL_LIST_ENTRY);
}
@Override
public RFuture<List<ScoredEntry<V>>> pollLastEntriesAsync(int count) {
if (count <= 0) {
return new CompletableFutureWrapper<>(Collections.emptyList());
}
return poll(-count, -1, RedisCommands.EVAL_LIST_ENTRY);
}
private <T> RFuture<T> pollEntry(int from, int to, RedisCommand<?> command) {
return commandExecutor.evalWriteAsync(getRawName(), codec, command,
"local v = redis.call('zrange', KEYS[1], ARGV[1], ARGV[2], 'withscores'); "
+ "if #v > 0 then "
+ "redis.call('zremrangebyrank', KEYS[1], ARGV[1], ARGV[2]); "
+ "return v; "
+ "end "
+ "return v;",
Collections.singletonList(getRawName()), from, to);
}
@Override
public List<ScoredEntry<V>> pollFirstEntries(Duration duration, int count) {
return get(pollFirstEntriesAsync(duration, count));
}
@Override
public List<ScoredEntry<V>> pollLastEntries(Duration duration, int count) {
return get(pollLastEntriesAsync(duration, count));
}
@Override
public RFuture<List<ScoredEntry<V>>> pollFirstEntriesAsync(Duration duration, int count) {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_ENTRIES,
duration.getSeconds(), 1, getRawName(), "MIN", "COUNT", count);
}
@Override
public RFuture<List<ScoredEntry<V>>> pollLastEntriesAsync(Duration duration, int count) {
return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BZMPOP_ENTRIES,
duration.getSeconds(), 1, getRawName(), "MAX", "COUNT", count);
}
@Override @Override
public V pollFirst(long timeout, TimeUnit unit) { public V pollFirst(long timeout, TimeUnit unit) {
return get(pollFirstAsync(timeout, unit)); return get(pollFirstAsync(timeout, unit));

@ -244,6 +244,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* Requires <b>Redis 7.0.0 and higher.</b> * Requires <b>Redis 7.0.0 and higher.</b>
* *
* @param duration how long to wait before giving up * @param duration how long to wait before giving up
* @param count entries amount
* @return the head elements * @return the head elements
*/ */
List<V> pollFirst(Duration duration, int count); List<V> pollFirst(Duration duration, int count);
@ -295,6 +296,34 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/ */
V pollFirst(); V pollFirst();
/**
* Removes and returns the head entry (value and its score) or {@code null} if this sorted set is empty.
*
* @return the head entry,
* or {@code null} if this sorted set is empty
*/
ScoredEntry<V> pollFirstEntry();
/**
* Removes and returns the head entries (value and its score) of this sorted set.
*
* @param count entries amount
* @return the head entries of this sorted set
*/
List<ScoredEntry<V>> pollFirstEntries(int count);
/**
* Removes and returns the head entries (value and its score).
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count entries amount
* @return the head entries
*/
List<ScoredEntry<V>> pollFirstEntries(Duration duration, int count);
/** /**
* Removes and returns the tail element or {@code null} if this sorted set is empty. * Removes and returns the tail element or {@code null} if this sorted set is empty.
* *
@ -302,6 +331,32 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/ */
V pollLast(); V pollLast();
/**
* Removes and returns the tail entry (value and its score) or {@code null} if this sorted set is empty.
*
* @return the tail entry or {@code null} if this sorted set is empty
*/
ScoredEntry<V> pollLastEntry();
/**
* Removes and returns the tail entries (value and its score) of this sorted set.
*
* @param count entries amount
* @return the tail entries of this sorted set
*/
List<ScoredEntry<V>> pollLastEntries(int count);
/**
* Removes and returns the head entries (value and its score).
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count entries amount
* @return the tail entries
*/
List<ScoredEntry<V>> pollLastEntries(Duration duration, int count);
/** /**
* Returns the head element or {@code null} if this sorted set is empty. * Returns the head element or {@code null} if this sorted set is empty.
* *

@ -258,6 +258,33 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
*/ */
RFuture<V> pollFirstAsync(); RFuture<V> pollFirstAsync();
/**
* Removes and returns the head entry (value and its score) or {@code null} if this sorted set is empty.
*
* @return the head entry,
* or {@code null} if this sorted set is empty
*/
RFuture<ScoredEntry<V>> pollFirstEntryAsync();
/**
* Removes and returns the head entries (value and its score) of this sorted set.
*
* @param count entries amount
* @return the head entries of this sorted set
*/
RFuture<List<ScoredEntry<V>>> pollFirstEntriesAsync(int count);
/**
* Removes and returns the head entries (value and its score).
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count entries amount
* @return the head entries
*/
RFuture<List<ScoredEntry<V>>> pollFirstEntriesAsync(Duration duration, int count);
/** /**
* Removes and returns the tail element or {@code null} if this sorted set is empty. * Removes and returns the tail element or {@code null} if this sorted set is empty.
* *
@ -265,6 +292,32 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
*/ */
RFuture<V> pollLastAsync(); RFuture<V> pollLastAsync();
/**
* Removes and returns the tail entry (value and its score) or {@code null} if this sorted set is empty.
*
* @return the tail entry or {@code null} if this sorted set is empty
*/
RFuture<ScoredEntry<V>> pollLastEntryAsync();
/**
* Removes and returns the tail entries (value and its score) of this sorted set.
*
* @param count entries amount
* @return the tail entries of this sorted set
*/
RFuture<List<ScoredEntry<V>>> pollLastEntriesAsync(int count);
/**
* Removes and returns the head entries (value and its score).
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count entries amount
* @return the tail entries
*/
RFuture<List<ScoredEntry<V>>> pollLastEntriesAsync(Duration duration, int count);
/** /**
* Returns the head element or {@code null} if this sorted set is empty. * Returns the head element or {@code null} if this sorted set is empty.
* *

@ -15,6 +15,8 @@
*/ */
package org.redisson.api; package org.redisson.api;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.client.protocol.RankedEntry; import org.redisson.client.protocol.RankedEntry;
import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.ScoredEntry;
@ -22,7 +24,10 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -242,6 +247,33 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
*/ */
Mono<V> pollFirst(); Mono<V> pollFirst();
/**
* Removes and returns the head entry (value and its score) or {@code null} if this sorted set is empty.
*
* @return the head entry,
* or {@code null} if this sorted set is empty
*/
Maybe<ScoredEntry<V>> pollFirstEntry();
/**
* Removes and returns the head entries (value and its score) of this sorted set.
*
* @param count entries amount
* @return the head entries of this sorted set
*/
Single<List<ScoredEntry<V>>> pollFirstEntries(int count);
/**
* Removes and returns the head entries (value and its score).
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count entries amount
* @return the head entries
*/
Single<List<ScoredEntry<V>>> pollFirstEntries(Duration duration, int count);
/** /**
* Removes and returns the tail element or {@code null} if this sorted set is empty. * Removes and returns the tail element or {@code null} if this sorted set is empty.
* *
@ -249,6 +281,32 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
*/ */
Mono<V> pollLast(); Mono<V> pollLast();
/**
* Removes and returns the tail entry (value and its score) or {@code null} if this sorted set is empty.
*
* @return the tail entry or {@code null} if this sorted set is empty
*/
Maybe<ScoredEntry<V>> pollLastEntry();
/**
* Removes and returns the tail entries (value and its score) of this sorted set.
*
* @param count entries amount
* @return the tail entries of this sorted set
*/
Single<List<ScoredEntry<V>>> pollLastEntries(int count);
/**
* Removes and returns the head entries (value and its score).
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count entries amount
* @return the tail entries
*/
Single<List<ScoredEntry<V>>> pollLastEntries(Duration duration, int count);
/** /**
* Returns the head element or {@code null} if this sorted set is empty. * Returns the head element or {@code null} if this sorted set is empty.
* *

@ -23,7 +23,10 @@ import org.redisson.client.protocol.RankedEntry;
import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.ScoredEntry;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -244,6 +247,34 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
*/ */
Maybe<V> pollFirst(); Maybe<V> pollFirst();
/**
* Removes and returns the head entry (value and its score) or {@code null} if this sorted set is empty.
*
* @return the head entry,
* or {@code null} if this sorted set is empty
*/
Maybe<ScoredEntry<V>> pollFirstEntry();
/**
* Removes and returns the head entries (value and its score) of this sorted set.
*
* @param count entries amount
* @return the head entries of this sorted set
*/
Single<List<ScoredEntry<V>>> pollFirstEntries(int count);
/**
* Removes and returns the head entries (value and its score).
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count entries amount
* @return the head entries
*/
Single<List<ScoredEntry<V>>> pollFirstEntries(Duration duration, int count);
/** /**
* Removes and returns the tail element or {@code null} if this sorted set is empty. * Removes and returns the tail element or {@code null} if this sorted set is empty.
* *
@ -251,6 +282,32 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
*/ */
Maybe<V> pollLast(); Maybe<V> pollLast();
/**
* Removes and returns the tail entry (value and its score) or {@code null} if this sorted set is empty.
*
* @return the tail entry or {@code null} if this sorted set is empty
*/
Maybe<ScoredEntry<V>> pollLastEntry();
/**
* Removes and returns the tail entries (value and its score) of this sorted set.
*
* @param count entries amount
* @return the tail entries of this sorted set
*/
Single<List<ScoredEntry<V>>> pollLastEntries(int count);
/**
* Removes and returns the head entries (value and its score).
* <p>
* Requires <b>Redis 7.0.0 and higher.</b>
*
* @param duration how long to wait before giving up
* @param count entries amount
* @return the tail entries
*/
Single<List<ScoredEntry<V>>> pollLastEntries(Duration duration, int count);
/** /**
* Returns the head element or {@code null} if this sorted set is empty. * Returns the head element or {@code null} if this sorted set is empty.
* *

@ -117,6 +117,32 @@ public interface RedisCommands {
RedisCommand<List<Object>> ZPOPMIN = new RedisCommand<List<Object>>("ZPOPMIN", new ObjectListReplayDecoder<Object>()); RedisCommand<List<Object>> ZPOPMIN = new RedisCommand<List<Object>>("ZPOPMIN", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZPOPMAX = new RedisCommand<List<Object>>("ZPOPMAX", new ObjectListReplayDecoder<Object>()); RedisCommand<List<Object>> ZPOPMAX = new RedisCommand<List<Object>>("ZPOPMAX", new ObjectListReplayDecoder<Object>());
RedisCommand<List<ScoredEntry>> BZMPOP_ENTRIES = new RedisCommand<>("BZMPOP",
new ListMultiDecoder2(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) {
@Override
public Object decode(List parts, State state) {
for (int i = 0; i < parts.size(); i+= 2) {
List<List<Object>> entries = (List<List<Object>>) parts.get(i + 1);
List<ScoredEntry> map = new ArrayList<>();
for (List<Object> entry : entries) {
map.add(new ScoredEntry((Double) entry.get(1), entry.get(0)));
}
return map;
}
return Collections.emptyList();
}
},
new CodecDecoder(),
new CodecDecoder() {
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
if ((paramNum + 1) % 2 == 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return codec.getValueDecoder();
}
}));
RedisCommand<Map<String, Map<Object, Double>>> ZMPOP = new RedisCommand<>("ZMPOP", RedisCommand<Map<String, Map<Object, Double>>> ZMPOP = new RedisCommand<>("ZMPOP",
new ListMultiDecoder2( new ListMultiDecoder2(
new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) { new ObjectDecoder(StringCodec.INSTANCE.getValueDecoder()) {
@ -403,7 +429,9 @@ public interface RedisCommands {
RedisStrictCommand<Long> EVAL_LONG_SAFE = new RedisStrictCommand<Long>("EVAL", new LongReplayConvertor()); RedisStrictCommand<Long> EVAL_LONG_SAFE = new RedisStrictCommand<Long>("EVAL", new LongReplayConvertor());
RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor()); RedisStrictCommand<Void> EVAL_VOID = new RedisStrictCommand<Void>("EVAL", new VoidReplayConvertor());
RedisCommand<Object> EVAL_FIRST_LIST = new RedisCommand<Object>("EVAL", new ListFirstObjectDecoder()); RedisCommand<Object> EVAL_FIRST_LIST = new RedisCommand<Object>("EVAL", new ListFirstObjectDecoder());
RedisCommand<Object> EVAL_FIRST_LIST_ENTRY = new RedisCommand<Object>("EVAL", new ListFirstObjectDecoder(new ScoredSortedSetReplayDecoder()));
RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>()); RedisCommand<List<Object>> EVAL_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> EVAL_LIST_ENTRY = new RedisCommand<>("EVAL", new ScoredSortedSetReplayDecoder());
RedisCommand<List<Object>> EVAL_LIST_REVERSE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<>(true)); RedisCommand<List<Object>> EVAL_LIST_REVERSE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<>(true));
RedisCommand<List<Integer>> EVAL_INT_LIST = new RedisCommand("EVAL", new ObjectListReplayDecoder<Integer>(), new IntegerReplayConvertor()); RedisCommand<List<Integer>> EVAL_INT_LIST = new RedisCommand("EVAL", new ObjectListReplayDecoder<Integer>(), new IntegerReplayConvertor());

@ -15,7 +15,9 @@
*/ */
package org.redisson.client.protocol.decoder; package org.redisson.client.protocol.decoder;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State; import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.convertor.Convertor; import org.redisson.client.protocol.convertor.Convertor;
import java.util.List; import java.util.List;
@ -27,18 +29,34 @@ import java.util.List;
*/ */
public class ListFirstObjectDecoder implements MultiDecoder<Object> { public class ListFirstObjectDecoder implements MultiDecoder<Object> {
private final Convertor<?> convertor; private MultiDecoder<Object> inner;
private Convertor<?> convertor;
public ListFirstObjectDecoder() { public ListFirstObjectDecoder() {
this(null); this((Convertor<?>) null);
} }
public ListFirstObjectDecoder(Convertor<?> convertor) { public ListFirstObjectDecoder(Convertor<?> convertor) {
this.convertor = convertor; this.convertor = convertor;
} }
public ListFirstObjectDecoder(MultiDecoder<Object> inner) {
this.inner = inner;
}
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
if (inner != null) {
return inner.getDecoder(codec, paramNum, state);
}
return MultiDecoder.super.getDecoder(codec, paramNum, state);
}
@Override @Override
public Object decode(List<Object> parts, State state) { public Object decode(List<Object> parts, State state) {
if (inner != null) {
parts = (List) inner.decode(parts, state);
}
if (!parts.isEmpty()) { if (!parts.isEmpty()) {
return parts.get(0); return parts.get(0);
} }

@ -27,6 +27,39 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RedissonScoredSortedSetTest extends BaseTest { public class RedissonScoredSortedSetTest extends BaseTest {
@Test
public void testPollEntryDuration() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("test");
set.add(1.1, "v1");
set.add(1.2, "v2");
set.add(1.3, "v3");
set.add(1.4, "v4");
set.add(1.5, "v5");
List<ScoredEntry<String>> v1 = set.pollFirstEntries(Duration.ofSeconds(1), 2);
assertThat(v1).containsOnly(new ScoredEntry<>(1.1, "v1"), new ScoredEntry<>(1.2, "v2"));
List<ScoredEntry<String>> v2 = set.pollLastEntries(Duration.ofSeconds(1), 2);
assertThat(v2).containsOnly(new ScoredEntry<>(1.4, "v4"), new ScoredEntry<>(1.5, "v5"));
assertThat(set.size()).isEqualTo(1);
}
@Test
public void testPollEntry() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("test");
set.add(1.1, "v1");
set.add(1.2, "v2");
set.add(1.3, "v3");
ScoredEntry<String> e = set.pollFirstEntry();
assertThat(e).isEqualTo(new ScoredEntry<>(1.1, "v1"));
ScoredEntry<String> e2 = set.pollLastEntry();
assertThat(e2).isEqualTo(new ScoredEntry<>(1.3, "v3"));
assertThat(set.size()).isEqualTo(1);
}
@Test @Test
public void testEntryScanIterator() { public void testEntryScanIterator() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("test"); RScoredSortedSet<String> set = redisson.getScoredSortedSet("test");

Loading…
Cancel
Save