Improvement - Reactive method call cancellation support added

pull/1821/head
Nikita Koksharov 6 years ago
parent 7b20c0d9a5
commit ad091fe14f

@ -39,11 +39,17 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
public <R> Mono<R> reactive(Supplier<RFuture<R>> supplier) { public <R> Mono<R> reactive(Supplier<RFuture<R>> supplier) {
return Flux.<R>create(emitter -> { return Flux.<R>create(emitter -> {
emitter.onRequest(n -> { emitter.onRequest(n -> {
supplier.get().whenComplete((v, e) -> { RFuture<R> future = supplier.get();
future.whenComplete((v, e) -> {
if (e != null) { if (e != null) {
emitter.error(e); emitter.error(e);
return; return;
} }
emitter.onDispose(() -> {
future.cancel(true);
});
if (v != null) { if (v != null) {
emitter.next(v); emitter.next(v);
} }

Loading…
Cancel
Save