|
|
|
@ -15,19 +15,17 @@
|
|
|
|
|
*/
|
|
|
|
|
package org.redisson.rx;
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
|
|
|
|
import io.reactivex.rxjava3.core.Flowable;
|
|
|
|
|
import io.reactivex.rxjava3.core.Single;
|
|
|
|
|
import io.reactivex.rxjava3.functions.Action;
|
|
|
|
|
import io.reactivex.rxjava3.functions.Consumer;
|
|
|
|
|
import io.reactivex.rxjava3.functions.LongConsumer;
|
|
|
|
|
import io.reactivex.rxjava3.processors.ReplayProcessor;
|
|
|
|
|
import org.reactivestreams.Publisher;
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
|
|
|
|
|
import io.reactivex.Flowable;
|
|
|
|
|
import io.reactivex.Single;
|
|
|
|
|
import io.reactivex.functions.Action;
|
|
|
|
|
import io.reactivex.functions.Consumer;
|
|
|
|
|
import io.reactivex.functions.LongConsumer;
|
|
|
|
|
import io.reactivex.internal.operators.flowable.FlowableInternalHelper;
|
|
|
|
|
import io.reactivex.processors.ReplayProcessor;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
@ -48,7 +46,7 @@ public abstract class PublisherAdder<V> {
|
|
|
|
|
final AtomicBoolean completed = new AtomicBoolean();
|
|
|
|
|
final AtomicLong values = new AtomicLong();
|
|
|
|
|
final AtomicBoolean lastSize = new AtomicBoolean();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cc.subscribe(new Consumer<V>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void accept(V t) throws Exception {
|
|
|
|
@ -58,7 +56,7 @@ public abstract class PublisherAdder<V> {
|
|
|
|
|
p.onError(e);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (res) {
|
|
|
|
|
lastSize.set(true);
|
|
|
|
|
}
|
|
|
|
@ -82,7 +80,7 @@ public abstract class PublisherAdder<V> {
|
|
|
|
|
p.onComplete();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}, FlowableInternalHelper.RequestMax.INSTANCE);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}).singleOrError();
|
|
|
|
|
}
|
|
|
|
|