diff --git a/redisson/src/main/java/org/redisson/reactive/IteratorConsumer.java b/redisson/src/main/java/org/redisson/reactive/IteratorConsumer.java index 701ff316e..ccd628545 100644 --- a/redisson/src/main/java/org/redisson/reactive/IteratorConsumer.java +++ b/redisson/src/main/java/org/redisson/reactive/IteratorConsumer.java @@ -24,21 +24,16 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongConsumer; /** - * * @author Nikita Koksharov - * */ public abstract class IteratorConsumer implements LongConsumer { private final FluxSink emitter; - private long nextIterPos; + private long nextIterPos = 0; private RedisClient client; - private AtomicLong elementsRead = new AtomicLong(); - private boolean finished; - private volatile boolean completed; - private AtomicLong readAmount = new AtomicLong(); + private final AtomicLong requested = new AtomicLong(); public IteratorConsumer(FluxSink emitter) { this.emitter = emitter; @@ -46,50 +41,33 @@ public abstract class IteratorConsumer implements LongConsumer { @Override public void accept(long value) { - readAmount.addAndGet(value); - if (completed || elementsRead.get() == 0) { - nextValues(emitter); - completed = false; + if (requested.addAndGet(value) == value) { + nextValues(); } } - protected void nextValues(FluxSink emitter) { + protected void nextValues() { scanIterator(client, nextIterPos).whenComplete((res, e) -> { if (e != null) { emitter.error(e); return; } - if (finished) { - client = null; - nextIterPos = 0; - return; - } - client = res.getRedisClient(); nextIterPos = res.getPos(); for (Object val : res.getValues()) { Object v = transformValue(val); emitter.next((V) v); - elementsRead.incrementAndGet(); + requested.decrementAndGet(); } - if (elementsRead.get() >= readAmount.get()) { + if (nextIterPos == 0 && !tryAgain()) { emitter.complete(); - elementsRead.set(0); - completed = true; return; } - if (res.getPos() == 0 && !tryAgain()) { - finished = true; - emitter.complete(); - } - if (finished || completed) { - return; - } - nextValues(emitter); + nextValues(); }); } diff --git a/redisson/src/test/java/org/redisson/RedissonKeysReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonKeysReactiveTest.java index 16e37818f..243549af8 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysReactiveTest.java @@ -3,10 +3,13 @@ package org.redisson; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.redisson.api.RBucketReactive; import org.redisson.api.RKeysReactive; import org.redisson.api.RMapReactive; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.time.Duration; import java.util.Iterator; @@ -35,6 +38,52 @@ public class RedissonKeysReactiveTest extends BaseReactiveTest { }); } + @Test + public void testKeysByPatternIteratorWithSlowConsumer() throws InterruptedException { + int noOfKeys = 1200; + for (int i = 0; i < noOfKeys; i++) { + redisson.getBucket("key" + i).set(1).block(); + } + + Flux p = redisson.getKeys().getKeysByPattern(null) + .flatMap(string -> Mono.just(string) + .delayElement(Duration.ofMillis(10))); + AtomicInteger i = new AtomicInteger(); + p.doOnNext(t -> { + i.incrementAndGet(); + }).blockLast(); + assertThat(i.get()).isEqualTo(noOfKeys); + i.set(0); + + Flux pp = redisson.getKeys().getKeysByPattern(null); + + pp.doOnNext(t -> { + i.incrementAndGet(); + }).subscribeWith(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(10); + } + + @Override + public void onNext(Object o) { + + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + } + }); + Thread.sleep(100); + assertThat(i.get()).isEqualTo(10); + } + @Test public void testGetKeys() { RKeysReactive keys = redisson.getKeys();