Fixed - RCollectionReactive.addAll is executed without subscription. #4554

pull/4550/merge
Nikita Koksharov 2 years ago
parent 5d9fd8f559
commit e62f7221b1

@ -15,16 +15,14 @@
*/ */
package org.redisson.reactive; package org.redisson.reactive;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -36,13 +34,13 @@ public abstract class PublisherAdder<V> {
public abstract RFuture<Boolean> add(Object o); public abstract RFuture<Boolean> add(Object o);
public Publisher<Boolean> addAll(Publisher<? extends V> c) { public Publisher<Boolean> addAll(Publisher<? extends V> c) {
CompletableFuture<Boolean> promise = new CompletableFuture<>(); return Mono.create(emitter -> emitter.onRequest(n -> {
c.subscribe(new BaseSubscriber<V>() { c.subscribe(new BaseSubscriber<V>() {
volatile boolean completed; volatile boolean completed;
AtomicLong values = new AtomicLong(); final AtomicLong values = new AtomicLong();
Subscription s; Subscription s;
Boolean lastSize = false; volatile Boolean lastSize = false;
@Override @Override
protected void hookOnSubscribe(Subscription s) { protected void hookOnSubscribe(Subscription s) {
@ -55,7 +53,7 @@ public abstract class PublisherAdder<V> {
values.getAndIncrement(); values.getAndIncrement();
add(o).whenComplete((res, e) -> { add(o).whenComplete((res, e) -> {
if (e != null) { if (e != null) {
promise.completeExceptionally(e); emitter.error(e);
return; return;
} }
@ -64,7 +62,7 @@ public abstract class PublisherAdder<V> {
} }
s.request(1); s.request(1);
if (values.decrementAndGet() == 0 && completed) { if (values.decrementAndGet() == 0 && completed) {
promise.complete(lastSize); emitter.success(lastSize);
} }
}); });
} }
@ -73,12 +71,11 @@ public abstract class PublisherAdder<V> {
protected void hookOnComplete() { protected void hookOnComplete() {
completed = true; completed = true;
if (values.get() == 0) { if (values.get() == 0) {
promise.complete(lastSize); emitter.success(lastSize);
} }
} }
}); });
}));
return Mono.fromCompletionStage(promise);
} }
} }

@ -3,9 +3,12 @@ package org.redisson;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.redisson.api.RListReactive; import org.redisson.api.RListReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -18,6 +21,16 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonListReactiveTest extends BaseReactiveTest { public class RedissonListReactiveTest extends BaseReactiveTest {
@Test
public void test1() throws InterruptedException {
RListReactive<String> testQueue = redisson.getList("list");
Mono<Boolean> s = testQueue.addAll(Flux.just("a", "b", "c"));
Thread.sleep(400);
assertThat(testQueue.iterator().collectList().block()).isEmpty();
assertThat(s.block()).isTrue();
assertThat(testQueue.iterator().collectList().block()).containsExactly("a", "b", "c");
}
@Test @Test
public void testIteratorFilter() { public void testIteratorFilter() {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {

Loading…
Cancel
Save