From 83eda8b26b4c9913f0f26bc6ad83f888e5e2905a Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 5 Nov 2018 14:11:08 +0300 Subject: [PATCH] Improvement - RxJava2 method call cancellation added --- .../src/main/java/org/redisson/RedissonTopic.java | 12 ++++++------ .../main/java/org/redisson/rx/CommandRxService.java | 13 +++++++++++-- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 6bdd9e5ab..7c0c10e6d 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -191,9 +191,9 @@ public class RedissonTopic implements RTopic { } @Override - public RFuture removeListenerAsync(MessageListener listener) { - RPromise promise = new RedissonPromise(); - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); + public RFuture removeListenerAsync(final MessageListener listener) { + final RPromise promise = new RedissonPromise(); + final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); semaphore.acquire(new Runnable() { @Override public void run() { @@ -219,9 +219,9 @@ public class RedissonTopic implements RTopic { } @Override - public RFuture removeListenerAsync(int listenerId) { - RPromise promise = new RedissonPromise(); - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); + public RFuture removeListenerAsync(final int listenerId) { + final RPromise promise = new RedissonPromise(); + final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); semaphore.acquire(new Runnable() { @Override public void run() { diff --git a/redisson/src/main/java/org/redisson/rx/CommandRxService.java b/redisson/src/main/java/org/redisson/rx/CommandRxService.java index 1d446ab6f..9f4a6fa8b 100644 --- a/redisson/src/main/java/org/redisson/rx/CommandRxService.java +++ b/redisson/src/main/java/org/redisson/rx/CommandRxService.java @@ -24,6 +24,7 @@ import org.redisson.connection.ConnectionManager; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.reactivex.Flowable; +import io.reactivex.functions.Action; import io.reactivex.functions.LongConsumer; import io.reactivex.processors.ReplayProcessor; @@ -44,15 +45,23 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx return p.doOnRequest(new LongConsumer() { @Override public void accept(long t) throws Exception { - supplier.call().addListener(new FutureListener() { + RFuture future = supplier.call(); + future.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(final Future future) throws Exception { if (!future.isSuccess()) { p.onError(future.cause()); return; } + p.doOnCancel(new Action() { + @Override + public void run() throws Exception { + future.cancel(true); + } + }); + if (future.getNow() != null) { p.onNext(future.getNow()); }