diff --git a/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java index 8a860dc44..ea5ed1ea1 100644 --- a/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java @@ -39,11 +39,17 @@ public class CommandReactiveService extends CommandAsyncService implements Comma public Mono reactive(Supplier> supplier) { return Flux.create(emitter -> { emitter.onRequest(n -> { - supplier.get().whenComplete((v, e) -> { + RFuture future = supplier.get(); + future.whenComplete((v, e) -> { if (e != null) { emitter.error(e); return; - } + } + + emitter.onDispose(() -> { + future.cancel(true); + }); + if (v != null) { emitter.next(v); }