Feature - RTopicRx.getMessages streaming method added

pull/1721/head
Nikita Koksharov 6 years ago
parent 59b6a02463
commit bf2ac3e870

@ -50,6 +50,7 @@ import org.redisson.api.RSetCacheRx;
import org.redisson.api.RSetMultimapRx;
import org.redisson.api.RSetRx;
import org.redisson.api.RStreamRx;
import org.redisson.api.RTopic;
import org.redisson.api.RTopicRx;
import org.redisson.api.RTransactionRx;
import org.redisson.api.RedissonRxClient;
@ -75,6 +76,7 @@ import org.redisson.rx.RedissonScoredSortedSetRx;
import org.redisson.rx.RedissonSetCacheRx;
import org.redisson.rx.RedissonSetMultimapRx;
import org.redisson.rx.RedissonSetRx;
import org.redisson.rx.RedissonTopicRx;
import org.redisson.rx.RedissonTransactionRx;
import org.redisson.rx.RxProxyBuilder;
@ -282,12 +284,14 @@ public class RedissonRx implements RedissonRxClient {
@Override
public RTopicRx getTopic(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonTopic(commandExecutor, name), RTopicRx.class);
RTopic topic = new RedissonTopic(commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class);
}
@Override
public RTopicRx getTopic(String name, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonTopic(codec, commandExecutor, name), RTopicRx.class);
RTopic topic = new RedissonTopic(codec, commandExecutor, name);
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class);
}
@Override

@ -73,4 +73,13 @@ public interface RTopicRx {
* @param listenerId - listener id
*/
void removeListener(int listenerId);
/**
* Returns stream of messages.
*
* @param type - type of message to listen
* @return stream of messages
*/
<M> Flowable<M> getMessages(Class<M> type);
}

@ -0,0 +1,49 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RTopicRx;
import io.reactivex.Flowable;
public class RedissonTopicRxTest extends BaseRxTest {
@Test
public void testLong() throws InterruptedException {
RTopicRx topic = redisson.getTopic("test");
Flowable<String> messages = topic.getMessages(String.class);
List<String> list = new ArrayList<>();
messages.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(10);
}
@Override
public void onNext(String t) {
list.add(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
for (int i = 0; i < 15; i++) {
sync(topic.publish("" + i));
}
assertThat(list).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
}
}
Loading…
Cancel
Save