Feature - RScoredSortedSet.entryIterator(...) methods added. #4958

pull/5079/head
Nikita Koksharov 2 years ago
parent 6477a63d73
commit 75602344df

@ -824,6 +824,51 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
};
}
public Iterator<ScoredEntry<V>> entryIterator() {
return entryIterator(null, 10);
}
@Override
public Iterator<ScoredEntry<V>> entryIterator(String pattern) {
return entryIterator(pattern, 10);
}
@Override
public Iterator<ScoredEntry<V>> entryIterator(int count) {
return entryIterator(null, count);
}
@Override
public Iterator<ScoredEntry<V>> entryIterator(String pattern, int count) {
return new RedissonBaseIterator<ScoredEntry<V>>() {
@Override
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return entryScanIterator(client, nextIterPos, pattern, count);
}
@Override
protected void remove(Object value) {
RedissonScoredSortedSet.this.remove(value);
}
};
}
private ScanResult<Object> entryScanIterator(RedisClient client, long startPos, String pattern, int count) {
RFuture<ScanResult<Object>> f = entryScanIteratorAsync(client, startPos, pattern, count);
return get(f);
}
public RFuture<ScanResult<Object>> entryScanIteratorAsync(RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ScanResult<Object>> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN_ENTRY, getRawName(), startPos, "COUNT", count);
return f;
}
RFuture<ScanResult<Object>> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN_ENTRY, getRawName(), startPos, "MATCH", pattern, "COUNT", count);
return f;
}
@Override
public Iterator<V> distributedIterator(String pattern) {
String iteratorName = "__redisson_scored_sorted_set_cursor_{" + getRawName() + "}";

@ -672,6 +672,42 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
Iterator<V> iterator(String pattern, int count);
/**
* Returns an iterator over entries (value and its score) in this set.
*
* @return iterator
*/
Iterator<ScoredEntry<V>> entryIterator();
/**
* Returns an iterator over entries (value and its score) in this set.
* If <code>pattern</code> is not null then only entries match this pattern are loaded.
*
* @param pattern search pattern
* @return iterator
*/
Iterator<ScoredEntry<V>> entryIterator(String pattern);
/**
* Returns an iterator over entries (value and its score) in this set.
* Entries are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count size of elements batch
* @return iterator
*/
Iterator<ScoredEntry<V>> entryIterator(int count);
/**
* Returns an iterator over entries (value and its score) in this set.
* Entries are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only entries match this pattern are loaded.
*
* @param pattern search pattern
* @param count size of entries batch
* @return iterator
*/
Iterator<ScoredEntry<V>> entryIterator(String pattern, int count);
/**
* Returns element iterator that can be shared across multiple applications.
* Creating multiple iterators on the same object with this method will result in a single shared iterator.

@ -22,10 +22,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@ -338,9 +335,50 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive, RSortab
* @return iterator
*/
Flux<V> iterator(String pattern, int count);
/**
* Returns an iterator over elements in this set.
*
* @return iterator
*/
Flux<V> iterator();
/**
* Returns an iterator over entries (value and its score) in this set.
*
* @return iterator
*/
Flux<ScoredEntry<V>> entryIterator();
/**
* Returns an iterator over entries (value and its score) in this set.
* If <code>pattern</code> is not null then only entries match this pattern are loaded.
*
* @param pattern - search pattern
* @return iterator
*/
Flux<ScoredEntry<V>> entryIterator(String pattern);
/**
* Returns an iterator over entries (value and its score) in this set.
* Entries are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count - size of elements batch
* @return iterator
*/
Flux<ScoredEntry<V>> entryIterator(int count);
/**
* Returns an iterator over entries (value and its score) in this set.
* Entries are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only entries match this pattern are loaded.
*
* @param pattern search pattern
* @param count size of entries batch
* @return iterator
*/
Flux<ScoredEntry<V>> entryIterator(String pattern, int count);
/**
* Removes values by score range.
*

@ -23,10 +23,7 @@ import org.redisson.client.protocol.RankedEntry;
import org.redisson.client.protocol.ScoredEntry;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@ -340,9 +337,50 @@ public interface RScoredSortedSetRx<V> extends RExpirableRx, RSortableRx<Set<V>>
* @return iterator
*/
Flowable<V> iterator(String pattern, int count);
/**
* Returns an iterator over elements in this set.
*
* @return iterator
*/
Flowable<V> iterator();
/**
* Returns an iterator over entries (value and its score) in this set.
*
* @return iterator
*/
Flowable<ScoredEntry<V>> entryIterator();
/**
* Returns an iterator over entries (value and its score) in this set.
* If <code>pattern</code> is not null then only entries match this pattern are loaded.
*
* @param pattern search pattern
* @return iterator
*/
Flowable<ScoredEntry<V>> entryIterator(String pattern);
/**
* Returns an iterator over entries (value and its score) in this set.
* Entries are loaded in batch. Batch size is defined by <code>count</code> param.
*
* @param count size of elements batch
* @return iterator
*/
Flowable<ScoredEntry<V>> entryIterator(int count);
/**
* Returns an iterator over entries (value and its score) in this set.
* Entries are loaded in batch. Batch size is defined by <code>count</code> param.
* If pattern is not null then only entries match this pattern are loaded.
*
* @param pattern search pattern
* @param count size of entries batch
* @return iterator
*/
Flowable<ScoredEntry<V>> entryIterator(String pattern, int count);
/**
* Removes values by score range.
*

@ -180,6 +180,7 @@ public interface RedisCommands {
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new ListMultiDecoder2(new ScoredSortedSetScanReplayDecoder(), new ScoredSortedSetScanDecoder<Object>()));
RedisCommand<ListScanResult<Object>> ZSCAN_ENTRY = new RedisCommand<ListScanResult<Object>>("ZSCAN", new ListMultiDecoder2(new ScoredEntryScanDecoder<>(), new ScoredSortedSetScanDecoder<>()));
RedisStrictCommand<Double> ZINCRBY = new RedisStrictCommand<Double>("ZINCRBY", new DoubleNullSafeReplayConvertor());
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder2(new ListScanResultReplayDecoder(), new ObjectListReplayDecoder<String>()));

@ -0,0 +1,50 @@
/**
* Copyright (c) 2013-2022 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.ScoredEntry;
import java.util.LinkedList;
import java.util.List;
/**
*
* @author Nikita Koksharov
*
* @param <T> type
*/
public class ScoredEntryScanDecoder<T> implements MultiDecoder<ListScanResult<ScoredEntry<T>>> {
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
return LongCodec.INSTANCE.getValueDecoder();
}
@Override
public ListScanResult<ScoredEntry<T>> decode(List<Object> parts, State state) {
List<ScoredEntry<T>> result = new LinkedList<>();
List<Object> values = (List<Object>) parts.get(1);
for (int i = 0; i < values.size(); i += 2) {
result.add(new ScoredEntry<T>(((Number) values.get(i+1)).doubleValue(), (T) values.get(i)));
}
return new ListScanResult<>((Long) parts.get(0), result);
}
}

@ -21,6 +21,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.ScoredEntry;
import reactor.core.publisher.Flux;
import java.util.concurrent.Callable;
@ -98,4 +99,29 @@ public class RedissonScoredSortedSetReactive<V> {
return scanIteratorReactive(pattern, count);
}
private Flux<ScoredEntry<V>> entryScanIteratorReactive(String pattern, int count) {
return Flux.create(new SetReactiveIterator<ScoredEntry<V>>() {
@Override
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonScoredSortedSet<V>) instance).entryScanIteratorAsync(client, nextIterPos, pattern, count);
}
});
}
public Flux<ScoredEntry<V>> entryIterator() {
return entryScanIteratorReactive(null, 10);
}
public Flux<ScoredEntry<V>> entryIterator(String pattern) {
return entryScanIteratorReactive(pattern, 10);
}
public Flux<ScoredEntry<V>> entryIterator(int count) {
return entryScanIteratorReactive(null, count);
}
public Flux<ScoredEntry<V>> entryIterator(String pattern, int count) {
return entryScanIteratorReactive(pattern, count);
}
}

@ -22,6 +22,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RObject;
import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.ScoredEntry;
/**
*
@ -74,4 +75,29 @@ public class RedissonScoredSortedSetRx<V> {
return scanIteratorReactive(pattern, count);
}
private Flowable<ScoredEntry<V>> entryScanIteratorReactive(String pattern, int count) {
return new SetRxIterator<ScoredEntry<V>>() {
@Override
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonScoredSortedSet<V>) instance).entryScanIteratorAsync(client, nextIterPos, pattern, count);
}
}.create();
}
public Flowable<ScoredEntry<V>> entryIterator() {
return entryScanIteratorReactive(null, 10);
}
public Flowable<ScoredEntry<V>> entryIterator(String pattern) {
return entryScanIteratorReactive(pattern, 10);
}
public Flowable<ScoredEntry<V>> entryIterator(int count) {
return entryScanIteratorReactive(null, count);
}
public Flowable<ScoredEntry<V>> entryIterator(String pattern, int count) {
return entryScanIteratorReactive(pattern, count);
}
}

@ -27,6 +27,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RedissonScoredSortedSetTest extends BaseTest {
@Test
public void testEntryScanIterator() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("test");
set.add(1.1, "v1");
set.add(1.2, "v2");
set.add(1.3, "v3");
Iterator<ScoredEntry<String>> entries = set.entryIterator();
assertThat(entries).toIterable().containsExactly(new ScoredEntry<>(1.1, "v1"),
new ScoredEntry<>(1.2, "v2"), new ScoredEntry<>(1.3, "v3"));
}
@Test
public void testRankEntry() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("test");

@ -18,6 +18,19 @@ import org.redisson.client.protocol.ScoredEntry;
public class RedissonScoredSortedSetRxTest extends BaseRxTest {
@Test
public void testEntryIterator() {
RScoredSortedSetRx<String> set = redisson.getScoredSortedSet("simple");
sync(set.add(1.1, "v1"));
sync(set.add(1.2, "v2"));
sync(set.add(1.3, "v3"));
Iterator<ScoredEntry<String>> iter = toIterator(set.entryIterator());
assertThat(iter).toIterable().containsExactly(new ScoredEntry<>(1.1, "v1"),
new ScoredEntry<>(1.2, "v2"), new ScoredEntry<>(1.3, "v3"));
}
@Test
public void testFirstLast() {
RScoredSortedSetRx<String> set = redisson.getScoredSortedSet("simple");

Loading…
Cancel
Save