Fixed - RListReactive iterator with filter returns non-deterministic result. #3734

pull/3760/head
Nikita Koksharov 4 years ago
parent db8207dd5e
commit 7a0583b145

@ -74,13 +74,20 @@ public class RedissonListReactive<V> {
emitter.onRequest(new LongConsumer() {
int currentIndex = startIndex;
volatile boolean maxAccepted;
@Override
public void accept(long value) {
if (Long.MAX_VALUE == value) {
maxAccepted = true;
}
if (maxAccepted && value != Long.MAX_VALUE) {
return;
}
onRequest(forward, emitter, value);
}
protected void onRequest(boolean forward, FluxSink<V> emitter, long n) {
private void onRequest(boolean forward, FluxSink<V> emitter, long n) {
instance.getAsync(currentIndex).onComplete((value, e) -> {
if (e != null) {
emitter.error(e);

@ -5,16 +5,35 @@ import org.junit.jupiter.api.Test;
import org.redisson.api.RListReactive;
import org.redisson.client.RedisException;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonListReactiveTest extends BaseReactiveTest {
@Test
public void testIteratorFilter() {
for (int i = 0; i < 10; i++) {
RListReactive<String> testList = redisson.getList("list");
testList.delete().block();
testList.add("1").block();
testList.add("2").block();
testList.add("1").block();
testList.add("2").block();
Predicate<String> lambdaPredicate = (String s) -> s.equals("2");
Flux<String> ret = testList.iterator().filter(lambdaPredicate);
List<String> s = ret.collectList().block();
assertThat(s).containsExactly("2", "2");
}
}
@Test
public void testHashCode() throws InterruptedException {
RListReactive<String> list = redisson.getList("list");

Loading…
Cancel
Save