From ad091fe14f91622c5a1004629663cac0dc420b56 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 5 Nov 2018 14:16:32 +0300 Subject: [PATCH] Improvement - Reactive method call cancellation support added --- .../org/redisson/reactive/CommandReactiveService.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java b/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java index 8a860dc44..ea5ed1ea1 100644 --- a/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java +++ b/redisson/src/main/java/org/redisson/reactive/CommandReactiveService.java @@ -39,11 +39,17 @@ public class CommandReactiveService extends CommandAsyncService implements Comma public Mono reactive(Supplier> supplier) { return Flux.create(emitter -> { emitter.onRequest(n -> { - supplier.get().whenComplete((v, e) -> { + RFuture future = supplier.get(); + future.whenComplete((v, e) -> { if (e != null) { emitter.error(e); return; - } + } + + emitter.onDispose(() -> { + future.cancel(true); + }); + if (v != null) { emitter.next(v); }