diff --git a/src/main/java/org/redisson/CommandReactiveService.java b/src/main/java/org/redisson/CommandReactiveService.java index dbc8b53e8..a93d60c3f 100644 --- a/src/main/java/org/redisson/CommandReactiveService.java +++ b/src/main/java/org/redisson/CommandReactiveService.java @@ -19,17 +19,11 @@ import java.net.InetSocketAddress; import java.util.List; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import reactor.core.support.Exceptions; -import reactor.rx.Stream; -import reactor.rx.action.Action; -import reactor.rx.subscription.ReactiveSubscription; /** * @@ -38,47 +32,6 @@ import reactor.rx.subscription.ReactiveSubscription; */ public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor { - static class NettyFuturePublisher extends Stream { - private final Future that; - - public NettyFuturePublisher(Future that) { - this.that = that; - } - - @Override - public void subscribe(final Subscriber subscriber) { - try { - subscriber.onSubscribe(new ReactiveSubscription(this, subscriber) { - - @Override - public void request(long elements) { - Action.checkRequest(elements); - if (isComplete()) return; - - that.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - subscriber.onError(future.cause()); - return; - } - - if (future.getNow() != null) { - subscriber.onNext(future.getNow()); - } - onComplete(); - } - }); - } - }); - } catch (Throwable throwable) { - Exceptions.throwIfFatal(throwable); - subscriber.onError(throwable); - } - } - - } - public CommandReactiveService(ConnectionManager connectionManager) { super(connectionManager); } diff --git a/src/main/java/org/redisson/NettyFuturePublisher.java b/src/main/java/org/redisson/NettyFuturePublisher.java new file mode 100644 index 000000000..6d1bb63a0 --- /dev/null +++ b/src/main/java/org/redisson/NettyFuturePublisher.java @@ -0,0 +1,53 @@ +package org.redisson; + +import org.reactivestreams.Subscriber; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import reactor.core.support.Exceptions; +import reactor.rx.Stream; +import reactor.rx.action.Action; +import reactor.rx.subscription.ReactiveSubscription; + +public class NettyFuturePublisher extends Stream { + + private final Future that; + + public NettyFuturePublisher(Future that) { + this.that = that; + } + + @Override + public void subscribe(final Subscriber subscriber) { + try { + subscriber.onSubscribe(new ReactiveSubscription(this, subscriber) { + + @Override + public void request(long elements) { + Action.checkRequest(elements); + if (isComplete()) return; + + that.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + subscriber.onError(future.cause()); + return; + } + + if (future.getNow() != null) { + subscriber.onNext(future.getNow()); + } + onComplete(); + } + }); + } + }); + } catch (Throwable throwable) { + Exceptions.throwIfFatal(throwable); + subscriber.onError(throwable); + } + } + + +} diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index 274af7014..06f98288a 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -36,6 +36,7 @@ import org.redisson.core.RMap; import org.redisson.core.RMapReactive; import org.redisson.core.RScoredSortedSetReactive; import org.redisson.core.RSetReactive; +import org.redisson.core.RTopicReactive; import io.netty.util.concurrent.Future; @@ -157,6 +158,16 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonLexSortedSetReactive(commandExecutor, name); } + @Override + public RTopicReactive getTopic(String name) { + return new RedissonTopicReactive(commandExecutor, name); + } + + @Override + public RTopicReactive getTopic(String name, Codec codec) { + return new RedissonTopicReactive(codec, commandExecutor, name); + } + @Override public void shutdown() { connectionManager.shutdown(); diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 235bcc470..741b15dac 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -27,6 +27,7 @@ import org.redisson.core.RMap; import org.redisson.core.RMapReactive; import org.redisson.core.RScoredSortedSetReactive; import org.redisson.core.RSetReactive; +import org.redisson.core.RTopicReactive; public interface RedissonReactiveClient { @@ -105,16 +106,16 @@ public interface RedissonReactiveClient { */ RLexSortedSetReactive getLexSortedSet(String name); -// /** -// * Returns topic instance by name. -// * -// * @param name of topic -// * @return -// */ -// RTopic getTopic(String name); -// -// RTopic getTopic(String name, Codec codec); -// + /** + * Returns topic instance by name. + * + * @param name of topic + * @return + */ + RTopicReactive getTopic(String name); + + RTopicReactive getTopic(String name, Codec codec); + // /** // * Returns topic instance satisfies by pattern name. // * diff --git a/src/main/java/org/redisson/RedissonTopicReactive.java b/src/main/java/org/redisson/RedissonTopicReactive.java new file mode 100644 index 000000000..01199a9c0 --- /dev/null +++ b/src/main/java/org/redisson/RedissonTopicReactive.java @@ -0,0 +1,117 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Collections; +import java.util.List; + +import org.reactivestreams.Publisher; +import org.redisson.client.RedisPubSubListener; +import org.redisson.client.codec.Codec; +import org.redisson.client.protocol.RedisCommands; +import org.redisson.connection.PubSubConnectionEntry; +import org.redisson.core.MessageListener; +import org.redisson.core.RTopicReactive; +import org.redisson.core.StatusListener; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + +/** + * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. + * + * @author Nikita Koksharov + * + * @param message + */ +public class RedissonTopicReactive implements RTopicReactive { + + private final CommandReactiveExecutor commandExecutor; + private final String name; + private final Codec codec; + + protected RedissonTopicReactive(CommandReactiveExecutor commandExecutor, String name) { + this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); + } + + protected RedissonTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + this.commandExecutor = commandExecutor; + this.name = name; + this.codec = codec; + } + + @Override + public List getChannelNames() { + return Collections.singletonList(name); + } + + @Override + public Publisher publish(M message) { + return commandExecutor.writeObservable(name, codec, RedisCommands.PUBLISH, name, message); + } + + @Override + public Publisher addListener(StatusListener listener) { + return addListener(new PubSubStatusListener(listener, name)); + }; + + @Override + public Publisher addListener(MessageListener listener) { + PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); + return addListener(pubSubListener); + } + + private Publisher addListener(final RedisPubSubListener pubSubListener) { + final Promise promise = commandExecutor.getConnectionManager().newPromise(); + Future future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.setFailure(future.cause()); + return; + } + + promise.setSuccess(System.identityHashCode(pubSubListener)); + } + }); + return new NettyFuturePublisher(promise); + } + + + @Override + public void removeListener(int listenerId) { + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + if (entry == null) { + return; + } + synchronized (entry) { + if (entry.isActive()) { + entry.removeListener(name, listenerId); + if (!entry.hasListeners(name)) { + commandExecutor.getConnectionManager().unsubscribe(name); + } + return; + } + } + + // listener has been re-attached + removeListener(listenerId); + } + + +} diff --git a/src/main/java/org/redisson/core/RTopicReactive.java b/src/main/java/org/redisson/core/RTopicReactive.java new file mode 100644 index 000000000..674d95050 --- /dev/null +++ b/src/main/java/org/redisson/core/RTopicReactive.java @@ -0,0 +1,46 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.List; + +import org.reactivestreams.Publisher; + +/** + * Distributed topic. Messages are delivered to all message listeners across Redis cluster. + * + * @author Nikita Koksharov + * + * @param the type of message object + */ +public interface RTopicReactive { + + List getChannelNames(); + + /** + * Publish the message to all subscribers of this topic asynchronously + * + * @param message + * @return the Future object with number of clients that received the message + */ + Publisher publish(M message); + + Publisher addListener(StatusListener listener); + + Publisher addListener(MessageListener listener); + + void removeListener(int listenerId); +}