Fixed - getKeysByPattern isn't giving all entries if downstream consumer is slow. #4427

pull/4597/merge
Nikita Koksharov 2 years ago
parent a36f9df3cc
commit 637750c63d

@ -24,21 +24,16 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
/**
*
* @author Nikita Koksharov
*
*/
public abstract class IteratorConsumer<V> implements LongConsumer {
private final FluxSink<V> emitter;
private long nextIterPos;
private long nextIterPos = 0;
private RedisClient client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
private final AtomicLong requested = new AtomicLong();
public IteratorConsumer(FluxSink<V> emitter) {
this.emitter = emitter;
@ -46,50 +41,33 @@ public abstract class IteratorConsumer<V> implements LongConsumer {
@Override
public void accept(long value) {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues(emitter);
completed = false;
if (requested.addAndGet(value) == value) {
nextValues();
}
}
protected void nextValues(FluxSink<V> emitter) {
protected void nextValues() {
scanIterator(client, nextIterPos).whenComplete((res, e) -> {
if (e != null) {
emitter.error(e);
return;
}
if (finished) {
client = null;
nextIterPos = 0;
return;
}
client = res.getRedisClient();
nextIterPos = res.getPos();
for (Object val : res.getValues()) {
Object v = transformValue(val);
emitter.next((V) v);
elementsRead.incrementAndGet();
requested.decrementAndGet();
}
if (elementsRead.get() >= readAmount.get()) {
if (nextIterPos == 0 && !tryAgain()) {
emitter.complete();
elementsRead.set(0);
completed = true;
return;
}
if (res.getPos() == 0 && !tryAgain()) {
finished = true;
emitter.complete();
}
if (finished || completed) {
return;
}
nextValues(emitter);
nextValues();
});
}

@ -3,10 +3,13 @@ package org.redisson;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RKeysReactive;
import org.redisson.api.RMapReactive;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Iterator;
@ -35,6 +38,52 @@ public class RedissonKeysReactiveTest extends BaseReactiveTest {
});
}
@Test
public void testKeysByPatternIteratorWithSlowConsumer() throws InterruptedException {
int noOfKeys = 1200;
for (int i = 0; i < noOfKeys; i++) {
redisson.getBucket("key" + i).set(1).block();
}
Flux<String> p = redisson.getKeys().getKeysByPattern(null)
.flatMap(string -> Mono.just(string)
.delayElement(Duration.ofMillis(10)));
AtomicInteger i = new AtomicInteger();
p.doOnNext(t -> {
i.incrementAndGet();
}).blockLast();
assertThat(i.get()).isEqualTo(noOfKeys);
i.set(0);
Flux<String> pp = redisson.getKeys().getKeysByPattern(null);
pp.doOnNext(t -> {
i.incrementAndGet();
}).subscribeWith(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(10);
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(100);
assertThat(i.get()).isEqualTo(10);
}
@Test
public void testGetKeys() {
RKeysReactive keys = redisson.getKeys();

Loading…
Cancel
Save