diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 5018b0807..d3b2d4997 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -80,6 +80,7 @@ import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetMultimapReactive; import org.redisson.reactive.RedissonSetReactive; +import org.redisson.reactive.RedissonTopicReactive; import org.redisson.reactive.RedissonTransactionReactive; /** @@ -295,12 +296,16 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RTopicReactive getTopic(String name) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonTopic(commandExecutor, name), RTopicReactive.class); + RedissonTopic topic = new RedissonTopic(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, topic, + new RedissonTopicReactive(topic), RTopicReactive.class); } @Override public RTopicReactive getTopic(String name, Codec codec) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonTopic(codec, commandExecutor, name), RTopicReactive.class); + RedissonTopic topic = new RedissonTopic(codec, commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, topic, + new RedissonTopicReactive(topic), RTopicReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RTopicReactive.java b/redisson/src/main/java/org/redisson/api/RTopicReactive.java index b2285c95e..bd7694613 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicReactive.java +++ b/redisson/src/main/java/org/redisson/api/RTopicReactive.java @@ -20,6 +20,7 @@ import java.util.List; import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.StatusListener; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -71,6 +72,15 @@ public interface RTopicReactive { * Removes the listener by id for listening this topic * * @param listenerId - listener id + * @return void */ - void removeListener(int listenerId); + Mono removeListener(int listenerId); + + /** + * Returns stream of messages. + * + * @param type - type of message to listen + * @return stream of messages + */ + Flux getMessages(Class type); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java new file mode 100644 index 000000000..1acffaa8c --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java @@ -0,0 +1,62 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.concurrent.atomic.AtomicLong; + +import org.redisson.api.RFuture; +import org.redisson.api.RTopic; +import org.redisson.api.listener.MessageListener; + +import reactor.core.publisher.Flux; + +/** + * + * @author Nikita Koksharov + * + */ +public class RedissonTopicReactive { + + private final RTopic topic; + + public RedissonTopicReactive(RTopic topic) { + this.topic = topic; + } + + public Flux getMessages(Class type) { + return Flux.create(emitter -> { + emitter.onRequest(n -> { + AtomicLong counter = new AtomicLong(n); + RFuture t = topic.addListenerAsync(type, new MessageListener() { + @Override + public void onMessage(CharSequence channel, M msg) { + emitter.next(msg); + if (counter.decrementAndGet() == 0) { + topic.removeListenerAsync(this); + emitter.complete(); + } + } + }); + t.whenComplete((id, e) -> { + emitter.onDispose(() -> { + topic.removeListenerAsync(id); + }); + }); + }); + }); + } + +} diff --git a/redisson/src/test/java/org/redisson/RedissonTopicReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonTopicReactiveTest.java new file mode 100644 index 000000000..44457cb89 --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonTopicReactiveTest.java @@ -0,0 +1,49 @@ +package org.redisson; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.api.RTopicReactive; + +import reactor.core.publisher.Flux; + +public class RedissonTopicReactiveTest extends BaseReactiveTest { + + @Test + public void testLong() throws InterruptedException { + RTopicReactive topic = redisson.getTopic("test"); + Flux messages = topic.getMessages(String.class); + List list = new ArrayList<>(); + messages.subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(10); + } + + @Override + public void onNext(String t) { + list.add(t); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + for (int i = 0; i < 15; i++) { + sync(topic.publish("" + i)); + } + + assertThat(list).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + } +}