Improvement - RxJava2 method call cancellation added

pull/1721/head
Nikita Koksharov 6 years ago
parent bf2ac3e870
commit 83eda8b26b

@ -191,9 +191,9 @@ public class RedissonTopic implements RTopic {
}
@Override
public RFuture<Void> removeListenerAsync(MessageListener<?> listener) {
RPromise<Void> promise = new RedissonPromise<Void>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
public RFuture<Void> removeListenerAsync(final MessageListener<?> listener) {
final RPromise<Void> promise = new RedissonPromise<Void>();
final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {
@ -219,9 +219,9 @@ public class RedissonTopic implements RTopic {
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> promise = new RedissonPromise<Void>();
AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
public RFuture<Void> removeListenerAsync(final int listenerId) {
final RPromise<Void> promise = new RedissonPromise<Void>();
final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
semaphore.acquire(new Runnable() {
@Override
public void run() {

@ -24,6 +24,7 @@ import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
@ -44,15 +45,23 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long t) throws Exception {
supplier.call().addListener(new FutureListener<R>() {
RFuture<R> future = supplier.call();
future.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
public void operationComplete(final Future<R> future) throws Exception {
if (!future.isSuccess()) {
p.onError(future.cause());
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
future.cancel(true);
}
});
if (future.getNow() != null) {
p.onNext(future.getNow());
}

Loading…
Cancel
Save