diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 42e3d02ec..b22471f13 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -50,6 +50,7 @@ import org.redisson.api.RSetCacheRx; import org.redisson.api.RSetMultimapRx; import org.redisson.api.RSetRx; import org.redisson.api.RStreamRx; +import org.redisson.api.RTopic; import org.redisson.api.RTopicRx; import org.redisson.api.RTransactionRx; import org.redisson.api.RedissonRxClient; @@ -75,6 +76,7 @@ import org.redisson.rx.RedissonScoredSortedSetRx; import org.redisson.rx.RedissonSetCacheRx; import org.redisson.rx.RedissonSetMultimapRx; import org.redisson.rx.RedissonSetRx; +import org.redisson.rx.RedissonTopicRx; import org.redisson.rx.RedissonTransactionRx; import org.redisson.rx.RxProxyBuilder; @@ -282,12 +284,14 @@ public class RedissonRx implements RedissonRxClient { @Override public RTopicRx getTopic(String name) { - return RxProxyBuilder.create(commandExecutor, new RedissonTopic(commandExecutor, name), RTopicRx.class); + RTopic topic = new RedissonTopic(commandExecutor, name); + return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class); } @Override public RTopicRx getTopic(String name, Codec codec) { - return RxProxyBuilder.create(commandExecutor, new RedissonTopic(codec, commandExecutor, name), RTopicRx.class); + RTopic topic = new RedissonTopic(codec, commandExecutor, name); + return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 407484100..7c0c10e6d 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -28,12 +28,12 @@ import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.command.CommandAsyncExecutor; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonObjectFactory; import org.redisson.misc.RedissonPromise; +import org.redisson.misc.TransferListener; import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PublishSubscribeService; @@ -190,6 +190,61 @@ public class RedissonTopic implements RTopic { } + @Override + public RFuture removeListenerAsync(final MessageListener listener) { + final RPromise promise = new RedissonPromise(); + final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); + semaphore.acquire(new Runnable() { + @Override + public void run() { + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); + if (entry == null) { + semaphore.release(); + promise.trySuccess(null); + return; + } + + entry.removeListener(channelName, listener); + if (!entry.hasListeners(channelName)) { + subscribeService.unsubscribe(channelName, semaphore) + .addListener(new TransferListener(promise)); + } else { + semaphore.release(); + promise.trySuccess(null); + } + + } + }); + return promise; + } + + @Override + public RFuture removeListenerAsync(final int listenerId) { + final RPromise promise = new RedissonPromise(); + final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); + semaphore.acquire(new Runnable() { + @Override + public void run() { + PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); + if (entry == null) { + semaphore.release(); + promise.trySuccess(null); + return; + } + + entry.removeListener(channelName, listenerId); + if (!entry.hasListeners(channelName)) { + subscribeService.unsubscribe(channelName, semaphore) + .addListener(new TransferListener(promise)); + } else { + semaphore.release(); + promise.trySuccess(null); + } + } + }); + return promise; + } + @Override public void removeListener(int listenerId) { AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); diff --git a/redisson/src/main/java/org/redisson/api/RTopicAsync.java b/redisson/src/main/java/org/redisson/api/RTopicAsync.java index 1aaef85bf..380d8851c 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicAsync.java +++ b/redisson/src/main/java/org/redisson/api/RTopicAsync.java @@ -55,5 +55,21 @@ public interface RTopicAsync { * @see org.redisson.api.listener.MessageListener */ RFuture addListenerAsync(Class type, MessageListener listener); + + /** + * Removes the listener by id for listening this topic + * + * @param listenerId - listener id + * @return void + */ + RFuture removeListenerAsync(int listenerId); + /** + * Removes the listener by its instance + * + * @param listener - listener instance + * @return void + */ + RFuture removeListenerAsync(MessageListener listener); + } diff --git a/redisson/src/main/java/org/redisson/api/RTopicRx.java b/redisson/src/main/java/org/redisson/api/RTopicRx.java index 019d4238d..edaee5aa4 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicRx.java +++ b/redisson/src/main/java/org/redisson/api/RTopicRx.java @@ -73,4 +73,13 @@ public interface RTopicRx { * @param listenerId - listener id */ void removeListener(int listenerId); + + /** + * Returns stream of messages. + * + * @param type - type of message to listen + * @return stream of messages + */ + Flowable getMessages(Class type); + } diff --git a/redisson/src/main/java/org/redisson/rx/CommandRxService.java b/redisson/src/main/java/org/redisson/rx/CommandRxService.java index 1d446ab6f..9f4a6fa8b 100644 --- a/redisson/src/main/java/org/redisson/rx/CommandRxService.java +++ b/redisson/src/main/java/org/redisson/rx/CommandRxService.java @@ -24,6 +24,7 @@ import org.redisson.connection.ConnectionManager; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.reactivex.Flowable; +import io.reactivex.functions.Action; import io.reactivex.functions.LongConsumer; import io.reactivex.processors.ReplayProcessor; @@ -44,15 +45,23 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx return p.doOnRequest(new LongConsumer() { @Override public void accept(long t) throws Exception { - supplier.call().addListener(new FutureListener() { + RFuture future = supplier.call(); + future.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(final Future future) throws Exception { if (!future.isSuccess()) { p.onError(future.cause()); return; } + p.doOnCancel(new Action() { + @Override + public void run() throws Exception { + future.cancel(true); + } + }); + if (future.getNow() != null) { p.onNext(future.getNow()); } diff --git a/redisson/src/test/java/org/redisson/rx/RedissonTopicRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonTopicRxTest.java new file mode 100644 index 000000000..16b9b463b --- /dev/null +++ b/redisson/src/test/java/org/redisson/rx/RedissonTopicRxTest.java @@ -0,0 +1,49 @@ +package org.redisson.rx; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.api.RTopicRx; + +import io.reactivex.Flowable; + +public class RedissonTopicRxTest extends BaseRxTest { + + @Test + public void testLong() throws InterruptedException { + RTopicRx topic = redisson.getTopic("test"); + Flowable messages = topic.getMessages(String.class); + List list = new ArrayList<>(); + messages.subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(10); + } + + @Override + public void onNext(String t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + for (int i = 0; i < 15; i++) { + sync(topic.publish("" + i)); + } + + assertThat(list).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + } +}