From 7a0583b145a776cdc927bc10f3c26761258793d8 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 3 Aug 2021 14:24:03 +0300 Subject: [PATCH] Fixed - RListReactive iterator with filter returns non-deterministic result. #3734 --- .../reactive/RedissonListReactive.java | 9 ++++++++- .../redisson/RedissonListReactiveTest.java | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index 74371cf95..42e7ed20e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -74,13 +74,20 @@ public class RedissonListReactive { emitter.onRequest(new LongConsumer() { int currentIndex = startIndex; + volatile boolean maxAccepted; @Override public void accept(long value) { + if (Long.MAX_VALUE == value) { + maxAccepted = true; + } + if (maxAccepted && value != Long.MAX_VALUE) { + return; + } onRequest(forward, emitter, value); } - protected void onRequest(boolean forward, FluxSink emitter, long n) { + private void onRequest(boolean forward, FluxSink emitter, long n) { instance.getAsync(currentIndex).onComplete((value, e) -> { if (e != null) { emitter.error(e); diff --git a/redisson/src/test/java/org/redisson/RedissonListReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonListReactiveTest.java index 15ca802b2..f436e4743 100644 --- a/redisson/src/test/java/org/redisson/RedissonListReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonListReactiveTest.java @@ -5,16 +5,35 @@ import org.junit.jupiter.api.Test; import org.redisson.api.RListReactive; import org.redisson.client.RedisException; import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; import static org.assertj.core.api.Assertions.assertThat; public class RedissonListReactiveTest extends BaseReactiveTest { + @Test + public void testIteratorFilter() { + for (int i = 0; i < 10; i++) { + RListReactive testList = redisson.getList("list"); + testList.delete().block(); + testList.add("1").block(); + testList.add("2").block(); + testList.add("1").block(); + testList.add("2").block(); + Predicate lambdaPredicate = (String s) -> s.equals("2"); + Flux ret = testList.iterator().filter(lambdaPredicate); + List s = ret.collectList().block(); + assertThat(s).containsExactly("2", "2"); + } + } + @Test public void testHashCode() throws InterruptedException { RListReactive list = redisson.getList("list");