Fixed - RKeysReactive.getKeysByPattern() returns wrong result. #3648

pull/3663/head
Nikita Koksharov 4 years ago
parent 72332bcd20
commit f85a7b434b

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -228,7 +229,7 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
byte[] keyBuf = toByteArray(command.getKey());
Flux<Entry<Object, Object>> flux = Flux.create(new MapReactiveIterator<Object, Object, Entry<Object, Object>>(null, null, 0) {
@Override
public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
public RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -267,7 +268,7 @@ public class RedissonReactiveSetCommands extends RedissonBaseReactive implements
byte[] keyBuf = toByteArray(command.getKey());
Flux<byte[]> flux = Flux.create(new SetReactiveIterator<byte[]>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -24,6 +24,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -266,7 +267,7 @@ public class RedissonReactiveZSetCommands extends RedissonBaseReactive implement
byte[] keyBuf = toByteArray(command.getKey());
Flux<Tuple> flux = Flux.create(new SetReactiveIterator<Tuple>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -228,7 +229,7 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
byte[] keyBuf = toByteArray(command.getKey());
Flux<Entry<Object, Object>> flux = Flux.create(new MapReactiveIterator<Object, Object, Entry<Object, Object>>(null, null, 0) {
@Override
public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
public RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -267,7 +268,7 @@ public class RedissonReactiveSetCommands extends RedissonBaseReactive implements
byte[] keyBuf = toByteArray(command.getKey());
Flux<byte[]> flux = Flux.create(new SetReactiveIterator<byte[]>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -24,6 +24,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -266,7 +267,7 @@ public class RedissonReactiveZSetCommands extends RedissonBaseReactive implement
byte[] keyBuf = toByteArray(command.getKey());
Flux<Tuple> flux = Flux.create(new SetReactiveIterator<Tuple>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -228,7 +229,7 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
byte[] keyBuf = toByteArray(command.getKey());
Flux<Entry<Object, Object>> flux = Flux.create(new MapReactiveIterator<Object, Object, Entry<Object, Object>>(null, null, 0) {
@Override
public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
public RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -267,7 +268,7 @@ public class RedissonReactiveSetCommands extends RedissonBaseReactive implements
byte[] keyBuf = toByteArray(command.getKey());
Flux<byte[]> flux = Flux.create(new SetReactiveIterator<byte[]>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -24,6 +24,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -266,7 +267,7 @@ public class RedissonReactiveZSetCommands extends RedissonBaseReactive implement
byte[] keyBuf = toByteArray(command.getKey());
Flux<Tuple> flux = Flux.create(new SetReactiveIterator<Tuple>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -228,7 +229,7 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement
byte[] keyBuf = toByteArray(command.getKey());
Flux<Entry<Object, Object>> flux = Flux.create(new MapReactiveIterator<Object, Object, Entry<Object, Object>>(null, null, 0) {
@Override
public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
public RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -23,6 +23,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -267,7 +268,7 @@ public class RedissonReactiveSetCommands extends RedissonBaseReactive implements
byte[] keyBuf = toByteArray(command.getKey());
Flux<byte[]> flux = Flux.create(new SetReactiveIterator<byte[]>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -24,6 +24,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.ByteArrayCodec;
@ -268,7 +269,7 @@ public class RedissonReactiveZSetCommands extends RedissonBaseReactive implement
byte[] keyBuf = toByteArray(command.getKey());
Flux<Tuple> flux = Flux.create(new SetReactiveIterator<Tuple>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
if (command.getOptions().getPattern() == null) {
return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN,
keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L));

@ -15,20 +15,6 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.redisson.api.RFuture;
import org.redisson.api.RKeys;
import org.redisson.api.RObject;
@ -38,7 +24,6 @@ import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
@ -50,6 +35,14 @@ import org.redisson.misc.RedissonPromise;
import org.redisson.reactive.CommandReactiveBatchService;
import org.redisson.rx.CommandRxBatchService;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
*
* @author Nikita Koksharov
@ -136,7 +129,7 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null, count);
}
public RFuture<ListScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand<?> command, long startPos,
public RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand<?> command, long startPos,
String pattern, int count) {
if (pattern == null) {
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, command, startPos, "COUNT",
@ -146,7 +139,7 @@ public class RedissonKeys implements RKeys {
pattern, "COUNT", count);
}
public RFuture<ListScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos,
public RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos,
String pattern, int count) {
return scanIteratorAsync(client, entry, RedisCommands.SCAN, startPos, pattern, count);
}
@ -155,7 +148,7 @@ public class RedissonKeys implements RKeys {
return new RedissonBaseIterator<T>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return commandExecutor
.get(RedissonKeys.this.scanIteratorAsync(client, entry, command, nextIterPos, pattern, count));
}

@ -26,7 +26,6 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapValueDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
@ -1491,18 +1490,18 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return get(fastRemoveAsync(keys));
}
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<MapScanResult<Object, Object>> f = scanIteratorAsync(name, client, startPos, pattern, count);
public ScanResult<Map.Entry<Object, Object>> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<ScanResult<Map.Entry<Object, Object>>> f = scanIteratorAsync(name, client, startPos, pattern, count);
return get(f);
}
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
public RFuture<ScanResult<Map.Entry<Object, Object>>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<MapScanResult<Object, Object>> f
RFuture<ScanResult<Map.Entry<Object, Object>>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "COUNT", count);
return f;
}
RFuture<MapScanResult<Object, Object>> f
RFuture<ScanResult<Map.Entry<Object, Object>>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "MATCH", pattern, "COUNT", count);
return f;
}

@ -1516,7 +1516,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
@Override
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
public ScanResult<Map.Entry<Object, Object>> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
return get(scanIteratorAsync(name, client, startPos, pattern, count));
}
@ -1526,7 +1526,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
new ObjectMapDecoder(true)));
@Override
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
public RFuture<ScanResult<Map.Entry<Object, Object>>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis());
params.add(startPos);
@ -1609,7 +1609,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
}
});
return (RFuture<MapScanResult<Object, Object>>) (Object) f;
return (RFuture<ScanResult<Map.Entry<Object, Object>>>) (Object) f;
}
@Override

@ -19,15 +19,10 @@ import org.redisson.api.*;
import org.redisson.api.listener.ScoredSortedSetAddListener;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.codec.*;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
@ -36,15 +31,8 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
@ -499,17 +487,17 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getRawName(), codec, RedisCommands.ZRANK_INT, getRawName(), encode(o));
}
private ListScanResult<Object> scanIterator(RedisClient client, long startPos, String pattern, int count) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(client, startPos, pattern, count);
private ScanResult<Object> scanIterator(RedisClient client, long startPos, String pattern, int count) {
RFuture<ScanResult<Object>> f = scanIteratorAsync(client, startPos, pattern, count);
return get(f);
}
public RFuture<ListScanResult<Object>> scanIteratorAsync(RedisClient client, long startPos, String pattern, int count) {
public RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN, getRawName(), startPos, "COUNT", count);
RFuture<ScanResult<Object>> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN, getRawName(), startPos, "COUNT", count);
return f;
}
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN, getRawName(), startPos, "MATCH", pattern, "COUNT", count);
RFuture<ScanResult<Object>> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN, getRawName(), startPos, "MATCH", pattern, "COUNT", count);
return f;
}
@ -529,11 +517,11 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
}
@Override
public Iterator<V> iterator(final String pattern, final int count) {
public Iterator<V> iterator(String pattern, int count) {
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos, pattern, count);
}

@ -15,34 +15,19 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
import org.redisson.api.*;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RedissonPromise;
import java.util.*;
import java.util.stream.Stream;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
*
@ -96,7 +81,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
}
@Override
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
public ScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
return get(scanIteratorAsync(name, client, startPos, pattern, count));
}
@ -115,7 +100,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getRawName(), client, nextIterPos, pattern, count);
}
@ -668,7 +653,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
}
@Override
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos,
public RFuture<ScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos,
String pattern, int count) {
if (pattern == null) {
return commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "COUNT", count);

@ -15,36 +15,21 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSetCache;
import org.redisson.api.RedissonClient;
import io.netty.buffer.ByteBuf;
import org.redisson.api.*;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* <p>Set-based cache with ability to set TTL for each entry via
@ -130,13 +115,13 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
}
@Override
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(name, client, startPos, pattern, count);
public ScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<ScanResult<Object>> f = scanIteratorAsync(name, client, startPos, pattern, count);
return get(f);
}
@Override
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
public RFuture<ScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>();
params.add(startPos);
params.add(System.currentTimeMillis());
@ -179,7 +164,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return new RedissonBaseIterator<V>() {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
protected ScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getRawName(), client, nextIterPos, pattern, count);
}

@ -17,7 +17,6 @@ package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
/**
*
@ -26,9 +25,9 @@ import org.redisson.client.protocol.decoder.ListScanResult;
*/
public interface ScanIterator {
ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count);
ScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count);
RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count);
RFuture<ScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count);
boolean remove(Object value);

@ -57,4 +57,12 @@ public class ListScanResult<V> implements ScanResult<V> {
return client;
}
@Override
public String toString() {
return "ListScanResult{" +
"pos=" + pos +
", values=" + values +
", client=" + client +
'}';
}
}

@ -0,0 +1,104 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
/**
*
* @author Nikita Koksharov
*
*/
public abstract class IteratorConsumer<V> implements LongConsumer {
private final FluxSink<V> emitter;
private long nextIterPos;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
public IteratorConsumer(FluxSink<V> emitter) {
this.emitter = emitter;
}
@Override
public void accept(long value) {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues(emitter);
completed = false;
}
}
protected void nextValues(FluxSink<V> emitter) {
scanIterator(client, nextIterPos).onComplete((res, e) -> {
if (e != null) {
emitter.error(e);
return;
}
if (finished) {
client = null;
nextIterPos = 0;
return;
}
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Object val : res.getValues()) {
Object v = transformValue(val);
emitter.next((V) v);
elementsRead.incrementAndGet();
}
if (elementsRead.get() >= readAmount.get()) {
emitter.complete();
elementsRead.set(0);
completed = true;
return;
}
if (res.getPos() == 0 && !tryAgain()) {
finished = true;
emitter.complete();
}
if (finished || completed) {
return;
}
nextValues(emitter);
});
}
protected Object transformValue(Object value) {
return value;
}
protected abstract boolean tryAgain();
protected abstract RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos);
}

@ -15,19 +15,16 @@
*/
package org.redisson.reactive;
import java.util.AbstractMap;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.redisson.RedissonMap;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;
import reactor.core.publisher.FluxSink;
import java.util.AbstractMap;
import java.util.Map.Entry;
import java.util.function.Consumer;
/**
*
* @author Nikita Koksharov
@ -50,64 +47,21 @@ public class MapReactiveIterator<K, V, M> implements Consumer<FluxSink<M>> {
@Override
public void accept(FluxSink<M> emitter) {
emitter.onRequest(new LongConsumer() {
private long nextIterPos;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
emitter.onRequest(new IteratorConsumer<M>(emitter) {
@Override
public void accept(long value) {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues(emitter);
completed = false;
}
};
protected void nextValues(FluxSink<M> emitter) {
scanIterator(client, nextIterPos).onComplete((res, e) -> {
if (e != null) {
emitter.error(e);
return;
}
protected boolean tryAgain() {
return MapReactiveIterator.this.tryAgain();
}
if (finished) {
client = null;
nextIterPos = 0;
return;
}
@Override
protected Object transformValue(Object value) {
return getValue((Entry<Object, Object>) value);
}
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Entry<Object, Object> entry : res.getMap().entrySet()) {
M val = getValue(entry);
emitter.next(val);
elementsRead.incrementAndGet();
}
if (elementsRead.get() >= readAmount.get()) {
emitter.complete();
elementsRead.set(0);
completed = true;
return;
}
if (res.getPos() == 0 && !tryAgain()) {
finished = true;
emitter.complete();
}
if (finished || completed) {
return;
}
nextValues(emitter);
});
}
@Override
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return MapReactiveIterator.this.scanIterator(client, nextIterPos);
}
});
}
@ -126,8 +80,8 @@ public class MapReactiveIterator<K, V, M> implements Consumer<FluxSink<M>> {
};
}
public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
return map.scanIteratorAsync(map.getRawName(), client, nextIterPos, pattern, count);
public RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return (RFuture<ScanResult<Object>>) (Object) map.scanIteratorAsync(map.getRawName(), client, nextIterPos, pattern, count);
}
}

@ -15,18 +15,16 @@
*/
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.reactivestreams.Publisher;
import org.redisson.RedissonKeys;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.connection.MasterSlaveEntry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.ArrayList;
import java.util.List;
/**
*
@ -66,70 +64,18 @@ public class RedissonKeysReactive {
}
private Flux<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) {
return Flux.create(new Consumer<FluxSink<String>>() {
@Override
public void accept(FluxSink<String> emitter) {
emitter.onRequest(new LongConsumer() {
private RedisClient client;
private List<String> firstValues;
private long nextIterPos;
private long currentIndex;
@Override
public void accept(long value) {
currentIndex = value;
nextValues(emitter);
}
protected void nextValues(FluxSink<String> emitter) {
instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count).onComplete((res, e) -> {
if (e != null) {
emitter.error(e);
return;
}
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = (List<String>) (Object) res.getValues();
} else if (res.getValues().equals(firstValues)) {
emitter.complete();
currentIndex = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (Object val : res.getValues()) {
emitter.next((String) val);
currentIndex--;
if (currentIndex == 0) {
emitter.complete();
return;
}
}
if (nextIterPos == -1) {
emitter.complete();
currentIndex = 0;
}
if (currentIndex == 0) {
return;
}
nextValues(emitter);
});
}
return Flux.create(emitter -> emitter.onRequest(new IteratorConsumer<String>(emitter) {
});
@Override
protected boolean tryAgain() {
return false;
}
});
@Override
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count);
}
}));
}
}
}

@ -17,11 +17,10 @@ package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.api.RLexSortedSet;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.publisher.Flux;
@ -50,7 +49,7 @@ public class RedissonLexSortedSetReactive {
private Publisher<String> scanIteratorReactive(final String pattern, final int count) {
return Flux.create(new SetReactiveIterator<String>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(final RedisClient client, final long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(final RedisClient client, final long nextIterPos) {
return ((RedissonScoredSortedSet<String>) instance).scanIteratorAsync(client, nextIterPos, pattern, count);
}
});

@ -15,17 +15,16 @@
*/
package org.redisson.reactive;
import java.util.concurrent.Callable;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.publisher.Flux;
import java.util.concurrent.Callable;
/**
*
* @author Nikita Koksharov
@ -73,7 +72,7 @@ public class RedissonScoredSortedSetReactive<V> {
private Flux<V> scanIteratorReactive(String pattern, int count) {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonScoredSortedSet<V>) instance).scanIteratorAsync(client, nextIterPos, pattern, count);
}
});

@ -18,9 +18,9 @@ package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonObject;
import org.redisson.ScanIterator;
import org.redisson.ScanResult;
import org.redisson.api.*;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.publisher.Flux;
/**
@ -42,7 +42,7 @@ public class RedissonSetCacheReactive<V> {
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((ScanIterator) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, null, 10);
}
});

@ -18,9 +18,9 @@ package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonObject;
import org.redisson.ScanIterator;
import org.redisson.ScanResult;
import org.redisson.api.*;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.publisher.Flux;
/**
@ -60,7 +60,7 @@ public class RedissonSetReactive<V> {
public Publisher<V> iterator(String pattern, int count) {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((ScanIterator) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, pattern, count);
}
});

@ -18,11 +18,11 @@ package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonObject;
import org.redisson.RedissonTimeSeries;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.api.RTimeSeries;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.publisher.Flux;
/**
@ -44,7 +44,7 @@ public class RedissonTimeSeriesReactive<V> {
public Publisher<V> iterator() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonTimeSeries) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, 10);
}
});

@ -15,16 +15,13 @@
*/
package org.redisson.reactive;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import reactor.core.publisher.FluxSink;
import java.util.function.Consumer;
/**
*
* @author Nikita Koksharov
@ -35,62 +32,15 @@ public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
@Override
public void accept(FluxSink<V> emitter) {
emitter.onRequest(new LongConsumer() {
private long nextIterPos;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
emitter.onRequest(new IteratorConsumer<V>(emitter) {
@Override
public void accept(long value) {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues(emitter);
completed = false;
}
protected boolean tryAgain() {
return SetReactiveIterator.this.tryAgain();
}
protected void nextValues(FluxSink<V> emitter) {
scanIterator(client, nextIterPos).onComplete((res, e) -> {
if (e != null) {
emitter.error(e);
return;
}
if (finished) {
client = null;
nextIterPos = 0;
return;
}
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Object val : res.getValues()) {
emitter.next((V) val);
elementsRead.incrementAndGet();
}
if (elementsRead.get() >= readAmount.get()) {
emitter.complete();
elementsRead.set(0);
completed = true;
return;
}
if (res.getPos() == 0 && !tryAgain()) {
finished = true;
emitter.complete();
}
if (finished || completed) {
return;
}
nextValues(emitter);
});
@Override
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return SetReactiveIterator.this.scanIterator(client, nextIterPos);
}
});
}
@ -99,6 +49,6 @@ public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
return false;
}
protected abstract RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos);
protected abstract RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos);
}

@ -15,15 +15,14 @@
*/
package org.redisson.rx;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import org.reactivestreams.Publisher;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.api.RLexSortedSet;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
/**
*
@ -50,7 +49,7 @@ public class RedissonLexSortedSetRx {
private Flowable<String> scanIteratorReactive(String pattern, int count) {
return new SetRxIterator<String>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonScoredSortedSet<String>) instance).scanIteratorAsync(client, nextIterPos, pattern, count);
}
}.create();

@ -83,7 +83,7 @@ public class RedissonMapRxIterator<K, V, M> {
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Entry<Object, Object> entry : res.getMap().entrySet()) {
for (Entry<Object, Object> entry : res.getValues()) {
M val = getValue(entry);
p.onNext(val);
elementsRead.incrementAndGet();

@ -15,14 +15,13 @@
*/
package org.redisson.rx;
import io.reactivex.rxjava3.core.Flowable;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.api.RObject;
import org.redisson.api.RScoredSortedSetAsync;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import io.reactivex.rxjava3.core.Flowable;
/**
*
@ -41,7 +40,7 @@ public class RedissonScoredSortedSetRx<V> {
private Flowable<V> scanIteratorReactive(String pattern, int count) {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonScoredSortedSet<V>) instance).scanIteratorAsync(client, nextIterPos, pattern, count);
}
}.create();

@ -19,9 +19,9 @@ import io.reactivex.rxjava3.core.Single;
import org.reactivestreams.Publisher;
import org.redisson.RedissonObject;
import org.redisson.ScanIterator;
import org.redisson.ScanResult;
import org.redisson.api.*;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
/**
*
@ -42,7 +42,7 @@ public class RedissonSetCacheRx<V> {
public Publisher<V> iterator() {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((ScanIterator) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, null, 10);
}
}.create();

@ -20,9 +20,9 @@ import io.reactivex.rxjava3.core.Single;
import org.reactivestreams.Publisher;
import org.redisson.RedissonObject;
import org.redisson.ScanIterator;
import org.redisson.ScanResult;
import org.redisson.api.*;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
@ -61,7 +61,7 @@ public class RedissonSetRx<V> {
public Flowable<V> iterator(String pattern, int count) {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((ScanIterator) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, pattern, count);
}
}.create();

@ -18,11 +18,11 @@ package org.redisson.rx;
import org.reactivestreams.Publisher;
import org.redisson.RedissonObject;
import org.redisson.RedissonTimeSeries;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.api.RTimeSeries;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
/**
*
@ -43,7 +43,7 @@ public class RedissonTimeSeriesRx<V> {
public Publisher<V> iterator() {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
protected RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonTimeSeries) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, 10);
}
}.create();

@ -15,15 +15,14 @@
*/
package org.redisson.rx;
import java.util.concurrent.atomic.AtomicLong;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.ReplayProcessor;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import java.util.concurrent.atomic.AtomicLong;
/**
*
@ -99,6 +98,6 @@ public abstract class SetRxIterator<V> {
return false;
}
protected abstract RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos);
protected abstract RFuture<ScanResult<Object>> scanIterator(RedisClient client, long nextIterPos);
}

@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.redisson.RedissonMap;
import org.redisson.RedissonMultiLock;
import org.redisson.RedissonObject;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
@ -177,20 +178,28 @@ public class BaseTransactionalMap<K, V> {
return result;
}
protected MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
protected ScanResult<Map.Entry<Object, Object>> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
MapScanResult<Object, Object> res = ((RedissonMap<?, ?>) map).scanIterator(name, client, startPos, pattern, count);
ScanResult<Map.Entry<Object, Object>> res = ((RedissonMap<?, ?>) map).scanIterator(name, client, startPos, pattern, count);
Map<HashValue, MapEntry> newstate = new HashMap<HashValue, MapEntry>(state);
for (Iterator<Object> iterator = res.getMap().keySet().iterator(); iterator.hasNext();) {
Object entry = iterator.next();
MapEntry mapEntry = newstate.remove(toKeyHash(entry));
Map<Object, Object> newres = null;
for (Iterator<Map.Entry<Object, Object>> iterator = res.getValues().iterator(); iterator.hasNext();) {
Object key = iterator.next();
MapEntry mapEntry = newstate.remove(toKeyHash(key));
if (mapEntry != null) {
if (mapEntry == MapEntry.NULL) {
iterator.remove();
continue;
}
res.getMap().put(entry, mapEntry.getValue());
if (newres == null) {
newres = new HashMap<>();
for (Entry<Object, Object> e : res.getValues()) {
newres.put(e.getKey(), e.getValue());
}
}
newres.put(key, mapEntry.getValue());
}
}
@ -199,11 +208,22 @@ public class BaseTransactionalMap<K, V> {
if (entry.getValue() == MapEntry.NULL) {
continue;
}
res.getMap().put(entry.getValue().getKey(), entry.getValue().getValue());
if (newres == null) {
newres = new HashMap<>();
for (Entry<Object, Object> e : res.getValues()) {
newres.put(e.getKey(), e.getValue());
}
}
newres.put(entry.getValue().getKey(), entry.getValue().getValue());
}
}
if (newres != null) {
return new MapScanResult<>(res.getPos(), newres);
}
return res;
}

@ -19,9 +19,9 @@ import io.netty.buffer.ByteBuf;
import org.redisson.RedissonMultiLock;
import org.redisson.RedissonObject;
import org.redisson.RedissonSet;
import org.redisson.ScanResult;
import org.redisson.api.*;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.Hash;
import org.redisson.misc.HashValue;
@ -152,13 +152,13 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
return set.containsAsync(value);
}
protected abstract ListScanResult<Object> scanIteratorSource(String name, RedisClient client,
long startPos, String pattern, int count);
protected abstract ScanResult<Object> scanIteratorSource(String name, RedisClient client,
long startPos, String pattern, int count);
protected ListScanResult<Object> scanIterator(String name, RedisClient client,
protected ScanResult<Object> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
ListScanResult<Object> res = scanIteratorSource(name, client, startPos, pattern, count);
Map<HashValue, Object> newstate = new HashMap<HashValue, Object>(state);
ScanResult<Object> res = scanIteratorSource(name, client, startPos, pattern, count);
Map<HashValue, Object> newstate = new HashMap<>(state);
for (Iterator<Object> iterator = res.getValues().iterator(); iterator.hasNext();) {
Object entry = iterator.next();
Object value = newstate.remove(toHash(entry));
@ -182,7 +182,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
protected abstract RFuture<Set<V>> readAllAsyncSource();
public RFuture<Set<V>> readAllAsync() {
RPromise<Set<V>> result = new RedissonPromise<Set<V>>();
RPromise<Set<V>> result = new RedissonPromise<>();
RFuture<Set<V>> future = readAllAsyncSource();
future.onComplete((res, e) -> {
if (e != null) {
@ -191,7 +191,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
}
Set<V> set = future.getNow();
Map<HashValue, Object> newstate = new HashMap<HashValue, Object>(state);
Map<HashValue, Object> newstate = new HashMap<>(state);
for (Iterator<V> iterator = set.iterator(); iterator.hasNext();) {
V key = iterator.next();
Object value = newstate.remove(toHash(key));
@ -220,7 +220,7 @@ public abstract class BaseTransactionalSet<V> extends BaseTransactionalObject {
}
public RFuture<Boolean> addAsync(V value, TransactionalOperation operation) {
RPromise<Boolean> result = new RedissonPromise<Boolean>();
RPromise<Boolean> result = new RedissonPromise<>();
executeLocked(result, value, new Runnable() {
@Override
public void run() {

@ -15,30 +15,20 @@
*/
package org.redisson.transaction;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedissonMap;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.ScanResult;
import org.redisson.api.*;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @author Nikita Koksharov
@ -115,8 +105,8 @@ public class RedissonTransactionalMap<K, V> extends RedissonMap<K, V> {
}
@Override
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
public ScanResult<Map.Entry<Object, Object>> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
checkState();
return transactionalMap.scanIterator(name, client, startPos, pattern, count);
}

@ -15,29 +15,20 @@
*/
package org.redisson.transaction;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedissonMapCache;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.ScanResult;
import org.redisson.api.*;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @author Nikita Koksharov
@ -164,8 +155,8 @@ public class RedissonTransactionalMapCache<K, V> extends RedissonMapCache<K, V>
}
@Override
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
public ScanResult<Entry<Object, Object>> scanIterator(String name, RedisClient client,
long startPos, String pattern, int count) {
checkState();
return transactionalMap.scanIterator(name, client, startPos, pattern, count);
}

@ -15,6 +15,15 @@
*/
package org.redisson.transaction;
import org.redisson.RedissonSet;
import org.redisson.ScanResult;
import org.redisson.api.*;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
@ -23,21 +32,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedissonSet;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
/**
*
* @author Nikita Koksharov
@ -106,7 +100,7 @@ public class RedissonTransactionalSet<V> extends RedissonSet<V> {
}
@Override
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
public ScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
checkState();
return transactionalSet.scanIterator(name, client, startPos, pattern, count);
}

@ -15,23 +15,23 @@
*/
package org.redisson.transaction;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedissonSetCache;
import org.redisson.ScanResult;
import org.redisson.api.RFuture;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
* @author Nikita Koksharov
@ -101,7 +101,7 @@ public class RedissonTransactionalSetCache<V> extends RedissonSetCache<V> {
}
@Override
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
public ScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
checkState();
return transactionalSet.scanIterator(name, client, startPos, pattern, count);
}

@ -17,12 +17,12 @@ package org.redisson.transaction;
import org.redisson.RedissonSet;
import org.redisson.ScanIterator;
import org.redisson.ScanResult;
import org.redisson.api.RCollectionAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RSet;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.set.AddOperation;
@ -51,8 +51,8 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
}
@Override
protected ListScanResult<Object> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern, int count) {
protected ScanResult<Object> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern, int count) {
return ((ScanIterator) set).scanIterator(name, client, startPos, pattern, count);
}

@ -17,12 +17,12 @@ package org.redisson.transaction;
import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.ScanResult;
import org.redisson.api.RCollectionAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RSetCache;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.transaction.operation.TransactionalOperation;
import org.redisson.transaction.operation.set.AddCacheOperation;
@ -52,8 +52,8 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
}
@Override
protected ListScanResult<Object> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern, int count) {
protected ScanResult<Object> scanIteratorSource(String name, RedisClient client, long startPos,
String pattern, int count) {
return ((ScanIterator) set).scanIterator(name, client, startPos, pattern, count);
}

@ -1,5 +1,6 @@
package org.redisson;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.RBucketReactive;
@ -7,12 +8,33 @@ import org.redisson.api.RKeysReactive;
import org.redisson.api.RMapReactive;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonKeysReactiveTest extends BaseReactiveTest {
@Test
public void testKeysByPatternIterator() {
for (int i = 0; i < 100; i++) {
redisson.getBucket("key" + i).set(1).block();
}
Flux<String> p = redisson.getKeys().getKeysByPattern(null);
AtomicInteger i = new AtomicInteger();
p.doOnNext(t -> {
i.incrementAndGet();
}).doOnSubscribe(s -> {
s.request(100);
}).subscribe();
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
assertThat(i.get()).isEqualTo(100);
});
}
@Test
public void testGetKeys() {
RKeysReactive keys = redisson.getKeys();

Loading…
Cancel
Save