diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java index b9c3b6d97..49a833ad4 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -228,7 +229,7 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement byte[] keyBuf = toByteArray(command.getKey()); Flux> flux = Flux.create(new MapReactiveIterator>(null, null, 0) { @Override - public RFuture> scanIterator(RedisClient client, long nextIterPos) { + public RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java index 544996b13..65211c271 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -267,7 +268,7 @@ public class RedissonReactiveSetCommands extends RedissonBaseReactive implements byte[] keyBuf = toByteArray(command.getKey()); Flux flux = Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java index 2435a3fdd..46302c250 100644 --- a/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java +++ b/redisson-spring-data/redisson-spring-data-21/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -266,7 +267,7 @@ public class RedissonReactiveZSetCommands extends RedissonBaseReactive implement byte[] keyBuf = toByteArray(command.getKey()); Flux flux = Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java index b9c3b6d97..49a833ad4 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -228,7 +229,7 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement byte[] keyBuf = toByteArray(command.getKey()); Flux> flux = Flux.create(new MapReactiveIterator>(null, null, 0) { @Override - public RFuture> scanIterator(RedisClient client, long nextIterPos) { + public RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java index 544996b13..65211c271 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -267,7 +268,7 @@ public class RedissonReactiveSetCommands extends RedissonBaseReactive implements byte[] keyBuf = toByteArray(command.getKey()); Flux flux = Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java index 2435a3fdd..46302c250 100644 --- a/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java +++ b/redisson-spring-data/redisson-spring-data-22/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -266,7 +267,7 @@ public class RedissonReactiveZSetCommands extends RedissonBaseReactive implement byte[] keyBuf = toByteArray(command.getKey()); Flux flux = Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java index b9c3b6d97..49a833ad4 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -228,7 +229,7 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement byte[] keyBuf = toByteArray(command.getKey()); Flux> flux = Flux.create(new MapReactiveIterator>(null, null, 0) { @Override - public RFuture> scanIterator(RedisClient client, long nextIterPos) { + public RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java index 544996b13..65211c271 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -267,7 +268,7 @@ public class RedissonReactiveSetCommands extends RedissonBaseReactive implements byte[] keyBuf = toByteArray(command.getKey()); Flux flux = Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java index 2435a3fdd..46302c250 100644 --- a/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java +++ b/redisson-spring-data/redisson-spring-data-23/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -266,7 +267,7 @@ public class RedissonReactiveZSetCommands extends RedissonBaseReactive implement byte[] keyBuf = toByteArray(command.getKey()); Flux flux = Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java index b9c3b6d97..49a833ad4 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java +++ b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveHashCommands.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -228,7 +229,7 @@ public class RedissonReactiveHashCommands extends RedissonBaseReactive implement byte[] keyBuf = toByteArray(command.getKey()); Flux> flux = Flux.create(new MapReactiveIterator>(null, null, 0) { @Override - public RFuture> scanIterator(RedisClient client, long nextIterPos) { + public RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.HSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java index 544996b13..65211c271 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java +++ b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveSetCommands.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -267,7 +268,7 @@ public class RedissonReactiveSetCommands extends RedissonBaseReactive implements byte[] keyBuf = toByteArray(command.getKey()); Flux flux = Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, RedisCommands.SSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java index 6acab6e0e..2ca8aaa67 100644 --- a/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java +++ b/redisson-spring-data/redisson-spring-data-24/src/main/java/org/redisson/spring/data/connection/RedissonReactiveZSetCommands.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.client.codec.ByteArrayCodec; @@ -268,7 +269,7 @@ public class RedissonReactiveZSetCommands extends RedissonBaseReactive implement byte[] keyBuf = toByteArray(command.getKey()); Flux flux = Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { if (command.getOptions().getPattern() == null) { return executorService.readAsync(client, keyBuf, ByteArrayCodec.INSTANCE, ZSCAN, keyBuf, nextIterPos, "COUNT", Optional.ofNullable(command.getOptions().getCount()).orElse(10L)); diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 92922d2b7..512275219 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -15,20 +15,6 @@ */ package org.redisson; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - import org.redisson.api.RFuture; import org.redisson.api.RKeys; import org.redisson.api.RObject; @@ -38,7 +24,6 @@ import org.redisson.client.RedisException; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; @@ -50,6 +35,14 @@ import org.redisson.misc.RedissonPromise; import org.redisson.reactive.CommandReactiveBatchService; import org.redisson.rx.CommandRxBatchService; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + /** * * @author Nikita Koksharov @@ -136,7 +129,7 @@ public class RedissonKeys implements RKeys { return getKeysByPattern(null, count); } - public RFuture> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand command, long startPos, + public RFuture> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand command, long startPos, String pattern, int count) { if (pattern == null) { return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, command, startPos, "COUNT", @@ -146,7 +139,7 @@ public class RedissonKeys implements RKeys { pattern, "COUNT", count); } - public RFuture> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos, + public RFuture> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) { return scanIteratorAsync(client, entry, RedisCommands.SCAN, startPos, pattern, count); } @@ -155,7 +148,7 @@ public class RedissonKeys implements RKeys { return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ScanResult iterator(RedisClient client, long nextIterPos) { return commandExecutor .get(RedissonKeys.this.scanIteratorAsync(client, entry, command, nextIterPos, pattern, count)); } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index b098286ed..2b49f49f0 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -26,7 +26,6 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.NumberConvertor; -import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapValueDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; @@ -1491,18 +1490,18 @@ public class RedissonMap extends RedissonExpirable implements RMap { return get(fastRemoveAsync(keys)); } - public MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { - RFuture> f = scanIteratorAsync(name, client, startPos, pattern, count); + public ScanResult> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { + RFuture>> f = scanIteratorAsync(name, client, startPos, pattern, count); return get(f); } - public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) { + public RFuture>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) { if (pattern == null) { - RFuture> f + RFuture>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "COUNT", count); return f; } - RFuture> f + RFuture>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "MATCH", pattern, "COUNT", count); return f; } diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 1b78e2aa5..d143605c2 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -1516,7 +1516,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } @Override - public MapScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { + public ScanResult> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { return get(scanIteratorAsync(name, client, startPos, pattern, count)); } @@ -1526,7 +1526,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac new ObjectMapDecoder(true))); @Override - public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) { + public RFuture>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) { List params = new ArrayList(); params.add(System.currentTimeMillis()); params.add(startPos); @@ -1609,7 +1609,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac } }); - return (RFuture>) (Object) f; + return (RFuture>>) (Object) f; } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 7672cab08..2f37fc0bc 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -19,15 +19,10 @@ import org.redisson.api.*; import org.redisson.api.listener.ScoredSortedSetAddListener; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; -import org.redisson.client.codec.Codec; -import org.redisson.client.codec.DoubleCodec; -import org.redisson.client.codec.IntegerCodec; -import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.StringCodec; +import org.redisson.client.codec.*; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.iterator.RedissonBaseIterator; import org.redisson.mapreduce.RedissonCollectionMapReduce; @@ -36,15 +31,8 @@ import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Stream; @@ -499,17 +487,17 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getRawName(), codec, RedisCommands.ZRANK_INT, getRawName(), encode(o)); } - private ListScanResult scanIterator(RedisClient client, long startPos, String pattern, int count) { - RFuture> f = scanIteratorAsync(client, startPos, pattern, count); + private ScanResult scanIterator(RedisClient client, long startPos, String pattern, int count) { + RFuture> f = scanIteratorAsync(client, startPos, pattern, count); return get(f); } - public RFuture> scanIteratorAsync(RedisClient client, long startPos, String pattern, int count) { + public RFuture> scanIteratorAsync(RedisClient client, long startPos, String pattern, int count) { if (pattern == null) { - RFuture> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN, getRawName(), startPos, "COUNT", count); + RFuture> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN, getRawName(), startPos, "COUNT", count); return f; } - RFuture> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN, getRawName(), startPos, "MATCH", pattern, "COUNT", count); + RFuture> f = commandExecutor.readAsync(client, getRawName(), codec, RedisCommands.ZSCAN, getRawName(), startPos, "MATCH", pattern, "COUNT", count); return f; } @@ -529,11 +517,11 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc } @Override - public Iterator iterator(final String pattern, final int count) { + public Iterator iterator(String pattern, int count) { return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(client, nextIterPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 9d758ec50..d62aef93c 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -15,34 +15,19 @@ */ package org.redisson; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.stream.Stream; - -import org.redisson.api.RCountDownLatch; -import org.redisson.api.RFuture; -import org.redisson.api.RLock; -import org.redisson.api.RPermitExpirableSemaphore; -import org.redisson.api.RReadWriteLock; -import org.redisson.api.RSemaphore; -import org.redisson.api.RSet; -import org.redisson.api.RedissonClient; -import org.redisson.api.SortOrder; +import org.redisson.api.*; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.iterator.RedissonBaseIterator; import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.misc.RedissonPromise; +import java.util.*; +import java.util.stream.Stream; + /** * Distributed and concurrent implementation of {@link java.util.Set} * @@ -96,7 +81,7 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt } @Override - public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { + public ScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { return get(scanIteratorAsync(name, client, startPos, pattern, count)); } @@ -115,7 +100,7 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(getRawName(), client, nextIterPos, pattern, count); } @@ -668,7 +653,7 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt } @Override - public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, + public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) { if (pattern == null) { return commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "COUNT", count); diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 6e4413e22..f697c4d9d 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -15,36 +15,21 @@ */ package org.redisson; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -import org.redisson.api.RCountDownLatch; -import org.redisson.api.RFuture; -import org.redisson.api.RLock; -import org.redisson.api.RPermitExpirableSemaphore; -import org.redisson.api.RReadWriteLock; -import org.redisson.api.RSemaphore; -import org.redisson.api.RSetCache; -import org.redisson.api.RedissonClient; +import io.netty.buffer.ByteBuf; +import org.redisson.api.*; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.eviction.EvictionScheduler; import org.redisson.iterator.RedissonBaseIterator; import org.redisson.mapreduce.RedissonCollectionMapReduce; import org.redisson.misc.RedissonPromise; -import io.netty.buffer.ByteBuf; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; /** *

Set-based cache with ability to set TTL for each entry via @@ -130,13 +115,13 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< } @Override - public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { - RFuture> f = scanIteratorAsync(name, client, startPos, pattern, count); + public ScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { + RFuture> f = scanIteratorAsync(name, client, startPos, pattern, count); return get(f); } @Override - public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) { + public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) { List params = new ArrayList(); params.add(startPos); params.add(System.currentTimeMillis()); @@ -179,7 +164,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< return new RedissonBaseIterator() { @Override - protected ListScanResult iterator(RedisClient client, long nextIterPos) { + protected ScanResult iterator(RedisClient client, long nextIterPos) { return scanIterator(getRawName(), client, nextIterPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/ScanIterator.java b/redisson/src/main/java/org/redisson/ScanIterator.java index 0b8d0d72e..1c89a448f 100644 --- a/redisson/src/main/java/org/redisson/ScanIterator.java +++ b/redisson/src/main/java/org/redisson/ScanIterator.java @@ -17,7 +17,6 @@ package org.redisson; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; /** * @@ -26,9 +25,9 @@ import org.redisson.client.protocol.decoder.ListScanResult; */ public interface ScanIterator { - ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count); + ScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count); - RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count); + RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count); boolean remove(Object value); diff --git a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java index 2feaed9d4..a2e93abdc 100644 --- a/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java +++ b/redisson/src/main/java/org/redisson/client/protocol/decoder/ListScanResult.java @@ -57,4 +57,12 @@ public class ListScanResult implements ScanResult { return client; } + @Override + public String toString() { + return "ListScanResult{" + + "pos=" + pos + + ", values=" + values + + ", client=" + client + + '}'; + } } diff --git a/redisson/src/main/java/org/redisson/reactive/IteratorConsumer.java b/redisson/src/main/java/org/redisson/reactive/IteratorConsumer.java new file mode 100644 index 000000000..69b620a01 --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/IteratorConsumer.java @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import org.redisson.ScanResult; +import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; +import reactor.core.publisher.FluxSink; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongConsumer; + +/** + * + * @author Nikita Koksharov + * + */ +public abstract class IteratorConsumer implements LongConsumer { + + private final FluxSink emitter; + + private long nextIterPos; + private RedisClient client; + private AtomicLong elementsRead = new AtomicLong(); + + private boolean finished; + private volatile boolean completed; + private AtomicLong readAmount = new AtomicLong(); + + public IteratorConsumer(FluxSink emitter) { + this.emitter = emitter; + } + + @Override + public void accept(long value) { + readAmount.addAndGet(value); + if (completed || elementsRead.get() == 0) { + nextValues(emitter); + completed = false; + } + } + + protected void nextValues(FluxSink emitter) { + scanIterator(client, nextIterPos).onComplete((res, e) -> { + if (e != null) { + emitter.error(e); + return; + } + + if (finished) { + client = null; + nextIterPos = 0; + return; + } + + client = res.getRedisClient(); + nextIterPos = res.getPos(); + + for (Object val : res.getValues()) { + Object v = transformValue(val); + emitter.next((V) v); + elementsRead.incrementAndGet(); + } + + if (elementsRead.get() >= readAmount.get()) { + emitter.complete(); + elementsRead.set(0); + completed = true; + return; + } + if (res.getPos() == 0 && !tryAgain()) { + finished = true; + emitter.complete(); + } + + if (finished || completed) { + return; + } + nextValues(emitter); + }); + } + + protected Object transformValue(Object value) { + return value; + } + + protected abstract boolean tryAgain(); + + protected abstract RFuture> scanIterator(RedisClient client, long nextIterPos); + +} diff --git a/redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java index 1982d7bab..e512dd762 100644 --- a/redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/MapReactiveIterator.java @@ -15,19 +15,16 @@ */ package org.redisson.reactive; -import java.util.AbstractMap; -import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.LongConsumer; - import org.redisson.RedissonMap; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.MapScanResult; - import reactor.core.publisher.FluxSink; +import java.util.AbstractMap; +import java.util.Map.Entry; +import java.util.function.Consumer; + /** * * @author Nikita Koksharov @@ -50,64 +47,21 @@ public class MapReactiveIterator implements Consumer> { @Override public void accept(FluxSink emitter) { - emitter.onRequest(new LongConsumer() { - - private long nextIterPos; - private RedisClient client; - private AtomicLong elementsRead = new AtomicLong(); - - private boolean finished; - private volatile boolean completed; - private AtomicLong readAmount = new AtomicLong(); - + emitter.onRequest(new IteratorConsumer(emitter) { @Override - public void accept(long value) { - readAmount.addAndGet(value); - if (completed || elementsRead.get() == 0) { - nextValues(emitter); - completed = false; - } - }; - - protected void nextValues(FluxSink emitter) { - scanIterator(client, nextIterPos).onComplete((res, e) -> { - if (e != null) { - emitter.error(e); - return; - } + protected boolean tryAgain() { + return MapReactiveIterator.this.tryAgain(); + } - if (finished) { - client = null; - nextIterPos = 0; - return; - } + @Override + protected Object transformValue(Object value) { + return getValue((Entry) value); + } - client = res.getRedisClient(); - nextIterPos = res.getPos(); - - for (Entry entry : res.getMap().entrySet()) { - M val = getValue(entry); - emitter.next(val); - elementsRead.incrementAndGet(); - } - - if (elementsRead.get() >= readAmount.get()) { - emitter.complete(); - elementsRead.set(0); - completed = true; - return; - } - if (res.getPos() == 0 && !tryAgain()) { - finished = true; - emitter.complete(); - } - - if (finished || completed) { - return; - } - nextValues(emitter); - }); - } + @Override + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + return MapReactiveIterator.this.scanIterator(client, nextIterPos); + } }); } @@ -126,8 +80,8 @@ public class MapReactiveIterator implements Consumer> { }; } - public RFuture> scanIterator(RedisClient client, long nextIterPos) { - return map.scanIteratorAsync(map.getRawName(), client, nextIterPos, pattern, count); + public RFuture> scanIterator(RedisClient client, long nextIterPos) { + return (RFuture>) (Object) map.scanIteratorAsync(map.getRawName(), client, nextIterPos, pattern, count); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index 862f10ec6..28d684591 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -15,18 +15,16 @@ */ package org.redisson.reactive; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; -import java.util.function.LongConsumer; - import org.reactivestreams.Publisher; import org.redisson.RedissonKeys; +import org.redisson.ScanResult; +import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.connection.MasterSlaveEntry; - import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; + +import java.util.ArrayList; +import java.util.List; /** * @@ -66,70 +64,18 @@ public class RedissonKeysReactive { } private Flux createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) { - return Flux.create(new Consumer>() { - - @Override - public void accept(FluxSink emitter) { - emitter.onRequest(new LongConsumer() { - - private RedisClient client; - private List firstValues; - private long nextIterPos; - - private long currentIndex; - - @Override - public void accept(long value) { - currentIndex = value; - nextValues(emitter); - } - - protected void nextValues(FluxSink emitter) { - instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count).onComplete((res, e) -> { - if (e != null) { - emitter.error(e); - return; - } - - client = res.getRedisClient(); - long prevIterPos = nextIterPos; - if (nextIterPos == 0 && firstValues == null) { - firstValues = (List) (Object) res.getValues(); - } else if (res.getValues().equals(firstValues)) { - emitter.complete(); - currentIndex = 0; - return; - } - - nextIterPos = res.getPos(); - if (prevIterPos == nextIterPos) { - nextIterPos = -1; - } - for (Object val : res.getValues()) { - emitter.next((String) val); - currentIndex--; - if (currentIndex == 0) { - emitter.complete(); - return; - } - } - if (nextIterPos == -1) { - emitter.complete(); - currentIndex = 0; - } - - if (currentIndex == 0) { - return; - } - nextValues(emitter); - }); - } + return Flux.create(emitter -> emitter.onRequest(new IteratorConsumer(emitter) { - }); + @Override + protected boolean tryAgain() { + return false; } - - }); + @Override + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + return instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count); + } + })); } - } +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java index 2abd037b2..ce6401827 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java @@ -17,11 +17,10 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; import org.redisson.RedissonScoredSortedSet; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.api.RLexSortedSet; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; - import reactor.core.publisher.Flux; @@ -50,7 +49,7 @@ public class RedissonLexSortedSetReactive { private Publisher scanIteratorReactive(final String pattern, final int count) { return Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(final RedisClient client, final long nextIterPos) { + protected RFuture> scanIterator(final RedisClient client, final long nextIterPos) { return ((RedissonScoredSortedSet) instance).scanIteratorAsync(client, nextIterPos, pattern, count); } }); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 2aa774195..907c91987 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -15,17 +15,16 @@ */ package org.redisson.reactive; -import java.util.concurrent.Callable; - import org.redisson.RedissonScoredSortedSet; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.api.RScoredSortedSetAsync; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.decoder.ListScanResult; - import reactor.core.publisher.Flux; +import java.util.concurrent.Callable; + /** * * @author Nikita Koksharov @@ -73,7 +72,7 @@ public class RedissonScoredSortedSetReactive { private Flux scanIteratorReactive(String pattern, int count) { return Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((RedissonScoredSortedSet) instance).scanIteratorAsync(client, nextIterPos, pattern, count); } }); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index accb927c1..89253ed3e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -18,9 +18,9 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; import org.redisson.RedissonObject; import org.redisson.ScanIterator; +import org.redisson.ScanResult; import org.redisson.api.*; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; import reactor.core.publisher.Flux; /** @@ -42,7 +42,7 @@ public class RedissonSetCacheReactive { public Publisher iterator() { return Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((ScanIterator) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, null, 10); } }); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index caecefe06..f6a95879a 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -18,9 +18,9 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; import org.redisson.RedissonObject; import org.redisson.ScanIterator; +import org.redisson.ScanResult; import org.redisson.api.*; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; import reactor.core.publisher.Flux; /** @@ -60,7 +60,7 @@ public class RedissonSetReactive { public Publisher iterator(String pattern, int count) { return Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((ScanIterator) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, pattern, count); } }); diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTimeSeriesReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTimeSeriesReactive.java index 63d69f4ff..8f25da01d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTimeSeriesReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTimeSeriesReactive.java @@ -18,11 +18,11 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; import org.redisson.RedissonObject; import org.redisson.RedissonTimeSeries; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.api.RTimeSeries; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; import reactor.core.publisher.Flux; /** @@ -44,7 +44,7 @@ public class RedissonTimeSeriesReactive { public Publisher iterator() { return Flux.create(new SetReactiveIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((RedissonTimeSeries) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, 10); } }); diff --git a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java index cf9469223..3ca80af0f 100644 --- a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java @@ -15,16 +15,13 @@ */ package org.redisson.reactive; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import java.util.function.LongConsumer; - +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; - import reactor.core.publisher.FluxSink; +import java.util.function.Consumer; + /** * * @author Nikita Koksharov @@ -35,62 +32,15 @@ public abstract class SetReactiveIterator implements Consumer> { @Override public void accept(FluxSink emitter) { - emitter.onRequest(new LongConsumer() { - - private long nextIterPos; - private RedisClient client; - private AtomicLong elementsRead = new AtomicLong(); - - private boolean finished; - private volatile boolean completed; - private AtomicLong readAmount = new AtomicLong(); - + emitter.onRequest(new IteratorConsumer(emitter) { @Override - public void accept(long value) { - readAmount.addAndGet(value); - if (completed || elementsRead.get() == 0) { - nextValues(emitter); - completed = false; - } + protected boolean tryAgain() { + return SetReactiveIterator.this.tryAgain(); } - - protected void nextValues(FluxSink emitter) { - scanIterator(client, nextIterPos).onComplete((res, e) -> { - if (e != null) { - emitter.error(e); - return; - } - - if (finished) { - client = null; - nextIterPos = 0; - return; - } - - client = res.getRedisClient(); - nextIterPos = res.getPos(); - for (Object val : res.getValues()) { - emitter.next((V) val); - elementsRead.incrementAndGet(); - } - - if (elementsRead.get() >= readAmount.get()) { - emitter.complete(); - elementsRead.set(0); - completed = true; - return; - } - if (res.getPos() == 0 && !tryAgain()) { - finished = true; - emitter.complete(); - } - - if (finished || completed) { - return; - } - nextValues(emitter); - }); + @Override + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + return SetReactiveIterator.this.scanIterator(client, nextIterPos); } }); } @@ -99,6 +49,6 @@ public abstract class SetReactiveIterator implements Consumer> { return false; } - protected abstract RFuture> scanIterator(RedisClient client, long nextIterPos); + protected abstract RFuture> scanIterator(RedisClient client, long nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonLexSortedSetRx.java b/redisson/src/main/java/org/redisson/rx/RedissonLexSortedSetRx.java index be477175a..627ec0b54 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonLexSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonLexSortedSetRx.java @@ -15,15 +15,14 @@ */ package org.redisson.rx; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.reactivestreams.Publisher; import org.redisson.RedissonScoredSortedSet; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.api.RLexSortedSet; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; - -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.core.Single; /** * @@ -50,7 +49,7 @@ public class RedissonLexSortedSetRx { private Flowable scanIteratorReactive(String pattern, int count) { return new SetRxIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((RedissonScoredSortedSet) instance).scanIteratorAsync(client, nextIterPos, pattern, count); } }.create(); diff --git a/redisson/src/main/java/org/redisson/rx/RedissonMapRxIterator.java b/redisson/src/main/java/org/redisson/rx/RedissonMapRxIterator.java index 203f8d604..205a03228 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonMapRxIterator.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonMapRxIterator.java @@ -83,7 +83,7 @@ public class RedissonMapRxIterator { client = res.getRedisClient(); nextIterPos = res.getPos(); - for (Entry entry : res.getMap().entrySet()) { + for (Entry entry : res.getValues()) { M val = getValue(entry); p.onNext(val); elementsRead.incrementAndGet(); diff --git a/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java b/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java index efb5af111..0305c4662 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonScoredSortedSetRx.java @@ -15,14 +15,13 @@ */ package org.redisson.rx; +import io.reactivex.rxjava3.core.Flowable; import org.redisson.RedissonScoredSortedSet; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.api.RObject; import org.redisson.api.RScoredSortedSetAsync; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; - -import io.reactivex.rxjava3.core.Flowable; /** * @@ -41,7 +40,7 @@ public class RedissonScoredSortedSetRx { private Flowable scanIteratorReactive(String pattern, int count) { return new SetRxIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((RedissonScoredSortedSet) instance).scanIteratorAsync(client, nextIterPos, pattern, count); } }.create(); diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java index 5612d5d5b..15469ae6f 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java @@ -19,9 +19,9 @@ import io.reactivex.rxjava3.core.Single; import org.reactivestreams.Publisher; import org.redisson.RedissonObject; import org.redisson.ScanIterator; +import org.redisson.ScanResult; import org.redisson.api.*; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; /** * @@ -42,7 +42,7 @@ public class RedissonSetCacheRx { public Publisher iterator() { return new SetRxIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((ScanIterator) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, null, 10); } }.create(); diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java index 1ede59b72..a3487ee8d 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java @@ -20,9 +20,9 @@ import io.reactivex.rxjava3.core.Single; import org.reactivestreams.Publisher; import org.redisson.RedissonObject; import org.redisson.ScanIterator; +import org.redisson.ScanResult; import org.redisson.api.*; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; /** * Distributed and concurrent implementation of {@link java.util.Set} @@ -61,7 +61,7 @@ public class RedissonSetRx { public Flowable iterator(String pattern, int count) { return new SetRxIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((ScanIterator) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, pattern, count); } }.create(); diff --git a/redisson/src/main/java/org/redisson/rx/RedissonTimeSeriesRx.java b/redisson/src/main/java/org/redisson/rx/RedissonTimeSeriesRx.java index 87e494c04..2efb3ac59 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonTimeSeriesRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonTimeSeriesRx.java @@ -18,11 +18,11 @@ package org.redisson.rx; import org.reactivestreams.Publisher; import org.redisson.RedissonObject; import org.redisson.RedissonTimeSeries; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.api.RTimeSeries; import org.redisson.api.RedissonRxClient; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; /** * @@ -43,7 +43,7 @@ public class RedissonTimeSeriesRx { public Publisher iterator() { return new SetRxIterator() { @Override - protected RFuture> scanIterator(RedisClient client, long nextIterPos) { + protected RFuture> scanIterator(RedisClient client, long nextIterPos) { return ((RedissonTimeSeries) instance).scanIteratorAsync(((RedissonObject) instance).getRawName(), client, nextIterPos, 10); } }.create(); diff --git a/redisson/src/main/java/org/redisson/rx/SetRxIterator.java b/redisson/src/main/java/org/redisson/rx/SetRxIterator.java index 7862d7038..81cfe82b4 100644 --- a/redisson/src/main/java/org/redisson/rx/SetRxIterator.java +++ b/redisson/src/main/java/org/redisson/rx/SetRxIterator.java @@ -15,15 +15,14 @@ */ package org.redisson.rx; -import java.util.concurrent.atomic.AtomicLong; - -import org.redisson.api.RFuture; -import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; - import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.functions.LongConsumer; import io.reactivex.rxjava3.processors.ReplayProcessor; +import org.redisson.ScanResult; +import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; + +import java.util.concurrent.atomic.AtomicLong; /** * @@ -99,6 +98,6 @@ public abstract class SetRxIterator { return false; } - protected abstract RFuture> scanIterator(RedisClient client, long nextIterPos); + protected abstract RFuture> scanIterator(RedisClient client, long nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index dc4ea1541..59c15855e 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.redisson.RedissonMap; import org.redisson.RedissonMultiLock; import org.redisson.RedissonObject; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RMap; @@ -177,20 +178,28 @@ public class BaseTransactionalMap { return result; } - protected MapScanResult scanIterator(String name, RedisClient client, + protected ScanResult> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { - MapScanResult res = ((RedissonMap) map).scanIterator(name, client, startPos, pattern, count); + ScanResult> res = ((RedissonMap) map).scanIterator(name, client, startPos, pattern, count); Map newstate = new HashMap(state); - for (Iterator iterator = res.getMap().keySet().iterator(); iterator.hasNext();) { - Object entry = iterator.next(); - MapEntry mapEntry = newstate.remove(toKeyHash(entry)); + Map newres = null; + for (Iterator> iterator = res.getValues().iterator(); iterator.hasNext();) { + Object key = iterator.next(); + MapEntry mapEntry = newstate.remove(toKeyHash(key)); if (mapEntry != null) { if (mapEntry == MapEntry.NULL) { iterator.remove(); continue; } - - res.getMap().put(entry, mapEntry.getValue()); + + if (newres == null) { + newres = new HashMap<>(); + for (Entry e : res.getValues()) { + newres.put(e.getKey(), e.getValue()); + } + } + + newres.put(key, mapEntry.getValue()); } } @@ -199,11 +208,22 @@ public class BaseTransactionalMap { if (entry.getValue() == MapEntry.NULL) { continue; } - - res.getMap().put(entry.getValue().getKey(), entry.getValue().getValue()); + + if (newres == null) { + newres = new HashMap<>(); + for (Entry e : res.getValues()) { + newres.put(e.getKey(), e.getValue()); + } + } + + newres.put(entry.getValue().getKey(), entry.getValue().getValue()); } } + if (newres != null) { + return new MapScanResult<>(res.getPos(), newres); + } + return res; } diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java index 6248ef5c9..fbc22ecae 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalSet.java @@ -19,9 +19,9 @@ import io.netty.buffer.ByteBuf; import org.redisson.RedissonMultiLock; import org.redisson.RedissonObject; import org.redisson.RedissonSet; +import org.redisson.ScanResult; import org.redisson.api.*; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.misc.Hash; import org.redisson.misc.HashValue; @@ -152,13 +152,13 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { return set.containsAsync(value); } - protected abstract ListScanResult scanIteratorSource(String name, RedisClient client, - long startPos, String pattern, int count); + protected abstract ScanResult scanIteratorSource(String name, RedisClient client, + long startPos, String pattern, int count); - protected ListScanResult scanIterator(String name, RedisClient client, + protected ScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { - ListScanResult res = scanIteratorSource(name, client, startPos, pattern, count); - Map newstate = new HashMap(state); + ScanResult res = scanIteratorSource(name, client, startPos, pattern, count); + Map newstate = new HashMap<>(state); for (Iterator iterator = res.getValues().iterator(); iterator.hasNext();) { Object entry = iterator.next(); Object value = newstate.remove(toHash(entry)); @@ -182,7 +182,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { protected abstract RFuture> readAllAsyncSource(); public RFuture> readAllAsync() { - RPromise> result = new RedissonPromise>(); + RPromise> result = new RedissonPromise<>(); RFuture> future = readAllAsyncSource(); future.onComplete((res, e) -> { if (e != null) { @@ -191,7 +191,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { } Set set = future.getNow(); - Map newstate = new HashMap(state); + Map newstate = new HashMap<>(state); for (Iterator iterator = set.iterator(); iterator.hasNext();) { V key = iterator.next(); Object value = newstate.remove(toHash(key)); @@ -220,7 +220,7 @@ public abstract class BaseTransactionalSet extends BaseTransactionalObject { } public RFuture addAsync(V value, TransactionalOperation operation) { - RPromise result = new RedissonPromise(); + RPromise result = new RedissonPromise<>(); executeLocked(result, value, new Runnable() { @Override public void run() { diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java index 8be12f4bb..048154c50 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java @@ -15,30 +15,20 @@ */ package org.redisson.transaction; -import java.time.Instant; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.redisson.RedissonMap; -import org.redisson.api.RCountDownLatch; -import org.redisson.api.RFuture; -import org.redisson.api.RLock; -import org.redisson.api.RMap; -import org.redisson.api.RPermitExpirableSemaphore; -import org.redisson.api.RReadWriteLock; -import org.redisson.api.RSemaphore; +import org.redisson.ScanResult; +import org.redisson.api.*; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + /** * * @author Nikita Koksharov @@ -115,8 +105,8 @@ public class RedissonTransactionalMap extends RedissonMap { } @Override - public MapScanResult scanIterator(String name, RedisClient client, - long startPos, String pattern, int count) { + public ScanResult> scanIterator(String name, RedisClient client, + long startPos, String pattern, int count) { checkState(); return transactionalMap.scanIterator(name, client, startPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java index 744cdc109..a9e601f98 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java @@ -15,29 +15,20 @@ */ package org.redisson.transaction; -import java.time.Instant; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.redisson.RedissonMapCache; -import org.redisson.api.RCountDownLatch; -import org.redisson.api.RFuture; -import org.redisson.api.RLock; -import org.redisson.api.RPermitExpirableSemaphore; -import org.redisson.api.RReadWriteLock; -import org.redisson.api.RSemaphore; +import org.redisson.ScanResult; +import org.redisson.api.*; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + /** * * @author Nikita Koksharov @@ -164,8 +155,8 @@ public class RedissonTransactionalMapCache extends RedissonMapCache } @Override - public MapScanResult scanIterator(String name, RedisClient client, - long startPos, String pattern, int count) { + public ScanResult> scanIterator(String name, RedisClient client, + long startPos, String pattern, int count) { checkState(); return transactionalMap.scanIterator(name, client, startPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java index 8cf0707f7..a907ebc06 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java @@ -15,6 +15,15 @@ */ package org.redisson.transaction; +import org.redisson.RedissonSet; +import org.redisson.ScanResult; +import org.redisson.api.*; +import org.redisson.api.mapreduce.RCollectionMapReduce; +import org.redisson.client.RedisClient; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.transaction.operation.TransactionalOperation; + import java.time.Instant; import java.util.Collection; import java.util.Date; @@ -23,21 +32,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.redisson.RedissonSet; -import org.redisson.api.RCountDownLatch; -import org.redisson.api.RFuture; -import org.redisson.api.RLock; -import org.redisson.api.RPermitExpirableSemaphore; -import org.redisson.api.RReadWriteLock; -import org.redisson.api.RSemaphore; -import org.redisson.api.SortOrder; -import org.redisson.api.mapreduce.RCollectionMapReduce; -import org.redisson.client.RedisClient; -import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.command.CommandAsyncExecutor; -import org.redisson.transaction.operation.TransactionalOperation; - /** * * @author Nikita Koksharov @@ -106,7 +100,7 @@ public class RedissonTransactionalSet extends RedissonSet { } @Override - public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { + public ScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { checkState(); return transactionalSet.scanIterator(name, client, startPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java index 6e98e394f..c906d11c2 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSetCache.java @@ -15,23 +15,23 @@ */ package org.redisson.transaction; -import java.time.Instant; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.redisson.RedissonSetCache; +import org.redisson.ScanResult; import org.redisson.api.RFuture; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; +import java.time.Instant; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + /** * * @author Nikita Koksharov @@ -101,7 +101,7 @@ public class RedissonTransactionalSetCache extends RedissonSetCache { } @Override - public ListScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { + public ScanResult scanIterator(String name, RedisClient client, long startPos, String pattern, int count) { checkState(); return transactionalSet.scanIterator(name, client, startPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java index 81511071f..94655c475 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java @@ -17,12 +17,12 @@ package org.redisson.transaction; import org.redisson.RedissonSet; import org.redisson.ScanIterator; +import org.redisson.ScanResult; import org.redisson.api.RCollectionAsync; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RSet; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.set.AddOperation; @@ -51,8 +51,8 @@ public class TransactionalSet extends BaseTransactionalSet { } @Override - protected ListScanResult scanIteratorSource(String name, RedisClient client, long startPos, - String pattern, int count) { + protected ScanResult scanIteratorSource(String name, RedisClient client, long startPos, + String pattern, int count) { return ((ScanIterator) set).scanIterator(name, client, startPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java index a85376a37..b3185dcea 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java @@ -17,12 +17,12 @@ package org.redisson.transaction; import org.redisson.RedissonSetCache; import org.redisson.ScanIterator; +import org.redisson.ScanResult; import org.redisson.api.RCollectionAsync; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RSetCache; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandAsyncExecutor; import org.redisson.transaction.operation.TransactionalOperation; import org.redisson.transaction.operation.set.AddCacheOperation; @@ -52,8 +52,8 @@ public class TransactionalSetCache extends BaseTransactionalSet { } @Override - protected ListScanResult scanIteratorSource(String name, RedisClient client, long startPos, - String pattern, int count) { + protected ScanResult scanIteratorSource(String name, RedisClient client, long startPos, + String pattern, int count) { return ((ScanIterator) set).scanIterator(name, client, startPos, pattern, count); } diff --git a/redisson/src/test/java/org/redisson/RedissonKeysReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonKeysReactiveTest.java index 50a22a64e..16e37818f 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysReactiveTest.java @@ -1,5 +1,6 @@ package org.redisson; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.redisson.api.RBucketReactive; @@ -7,12 +8,33 @@ import org.redisson.api.RKeysReactive; import org.redisson.api.RMapReactive; import reactor.core.publisher.Flux; +import java.time.Duration; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; public class RedissonKeysReactiveTest extends BaseReactiveTest { + @Test + public void testKeysByPatternIterator() { + for (int i = 0; i < 100; i++) { + redisson.getBucket("key" + i).set(1).block(); + } + + Flux p = redisson.getKeys().getKeysByPattern(null); + AtomicInteger i = new AtomicInteger(); + p.doOnNext(t -> { + i.incrementAndGet(); + }).doOnSubscribe(s -> { + s.request(100); + }).subscribe(); + + Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> { + assertThat(i.get()).isEqualTo(100); + }); + } + @Test public void testGetKeys() { RKeysReactive keys = redisson.getKeys();