From bf2ac3e870baf24380e55adcf350db26572603da Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 5 Nov 2018 14:10:24 +0300 Subject: [PATCH] Feature - RTopicRx.getMessages streaming method added --- .../main/java/org/redisson/RedissonRx.java | 8 ++- .../main/java/org/redisson/api/RTopicRx.java | 9 ++++ .../org/redisson/rx/RedissonTopicRxTest.java | 49 +++++++++++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 redisson/src/test/java/org/redisson/rx/RedissonTopicRxTest.java diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 42e3d02ec..b22471f13 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -50,6 +50,7 @@ import org.redisson.api.RSetCacheRx; import org.redisson.api.RSetMultimapRx; import org.redisson.api.RSetRx; import org.redisson.api.RStreamRx; +import org.redisson.api.RTopic; import org.redisson.api.RTopicRx; import org.redisson.api.RTransactionRx; import org.redisson.api.RedissonRxClient; @@ -75,6 +76,7 @@ import org.redisson.rx.RedissonScoredSortedSetRx; import org.redisson.rx.RedissonSetCacheRx; import org.redisson.rx.RedissonSetMultimapRx; import org.redisson.rx.RedissonSetRx; +import org.redisson.rx.RedissonTopicRx; import org.redisson.rx.RedissonTransactionRx; import org.redisson.rx.RxProxyBuilder; @@ -282,12 +284,14 @@ public class RedissonRx implements RedissonRxClient { @Override public RTopicRx getTopic(String name) { - return RxProxyBuilder.create(commandExecutor, new RedissonTopic(commandExecutor, name), RTopicRx.class); + RTopic topic = new RedissonTopic(commandExecutor, name); + return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class); } @Override public RTopicRx getTopic(String name, Codec codec) { - return RxProxyBuilder.create(commandExecutor, new RedissonTopic(codec, commandExecutor, name), RTopicRx.class); + RTopic topic = new RedissonTopic(codec, commandExecutor, name); + return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RTopicRx.java b/redisson/src/main/java/org/redisson/api/RTopicRx.java index 019d4238d..edaee5aa4 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicRx.java +++ b/redisson/src/main/java/org/redisson/api/RTopicRx.java @@ -73,4 +73,13 @@ public interface RTopicRx { * @param listenerId - listener id */ void removeListener(int listenerId); + + /** + * Returns stream of messages. + * + * @param type - type of message to listen + * @return stream of messages + */ + Flowable getMessages(Class type); + } diff --git a/redisson/src/test/java/org/redisson/rx/RedissonTopicRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonTopicRxTest.java new file mode 100644 index 000000000..16b9b463b --- /dev/null +++ b/redisson/src/test/java/org/redisson/rx/RedissonTopicRxTest.java @@ -0,0 +1,49 @@ +package org.redisson.rx; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.api.RTopicRx; + +import io.reactivex.Flowable; + +public class RedissonTopicRxTest extends BaseRxTest { + + @Test + public void testLong() throws InterruptedException { + RTopicRx topic = redisson.getTopic("test"); + Flowable messages = topic.getMessages(String.class); + List list = new ArrayList<>(); + messages.subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(10); + } + + @Override + public void onNext(String t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + for (int i = 0; i < 15; i++) { + sync(topic.publish("" + i)); + } + + assertThat(list).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + } +}