From a93320dde5b713d6184f962e2442339acab94dff Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 15 Oct 2018 13:17:06 +0300 Subject: [PATCH] refactoring --- .../org/redisson/PubSubStatusListener.java | 1 - .../org/redisson/RedissonPatternTopic.java | 44 +++++- .../java/org/redisson/RedissonReactive.java | 10 +- .../main/java/org/redisson/RedissonTopic.java | 28 ++-- .../redisson/api/RBlockingDequeReactive.java | 2 +- .../redisson/api/RBlockingQueueReactive.java | 2 +- .../java/org/redisson/api/RCollectionRx.java | 2 +- .../java/org/redisson/api/RDequeReactive.java | 2 +- .../redisson/api/RLexSortedSetReactive.java | 1 + .../java/org/redisson/api/RPatternTopic.java | 5 +- .../redisson/api/RPatternTopicReactive.java | 2 +- .../api/RScoredSortedSetReactive.java | 1 + .../java/org/redisson/api/RTopicAsync.java | 10 ++ .../java/org/redisson/api/RTopicReactive.java | 28 +++- .../redisson/api/RTransactionReactive.java | 2 +- .../RedissonPatternTopicReactive.java | 143 ------------------ .../reactive/RedissonTopicReactive.java | 101 ------------- 17 files changed, 104 insertions(+), 280 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java diff --git a/redisson/src/main/java/org/redisson/PubSubStatusListener.java b/redisson/src/main/java/org/redisson/PubSubStatusListener.java index d60e73b4a..abd1c66a2 100644 --- a/redisson/src/main/java/org/redisson/PubSubStatusListener.java +++ b/redisson/src/main/java/org/redisson/PubSubStatusListener.java @@ -16,7 +16,6 @@ package org.redisson; import org.redisson.api.listener.StatusListener; -import org.redisson.client.ChannelName; import org.redisson.client.RedisPubSubListener; import org.redisson.client.protocol.pubsub.PubSubType; diff --git a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java index c355dfca0..1cd80a872 100644 --- a/redisson/src/main/java/org/redisson/RedissonPatternTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonPatternTopic.java @@ -26,12 +26,19 @@ import org.redisson.client.ChannelName; import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisTimeoutException; import org.redisson.client.codec.Codec; +import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandExecutor; +import org.redisson.command.CommandSyncExecutor; import org.redisson.config.MasterSlaveServersConfig; +import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.AsyncSemaphore; import org.redisson.pubsub.PubSubConnectionEntry; import org.redisson.pubsub.PublishSubscribeService; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + /** * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. * @@ -42,16 +49,16 @@ import org.redisson.pubsub.PublishSubscribeService; public class RedissonPatternTopic implements RPatternTopic { final PublishSubscribeService subscribeService; - final CommandExecutor commandExecutor; + final CommandAsyncExecutor commandExecutor; private final String name; private final ChannelName channelName; private final Codec codec; - protected RedissonPatternTopic(CommandExecutor commandExecutor, String name) { + protected RedissonPatternTopic(CommandAsyncExecutor commandExecutor, String name) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } - protected RedissonPatternTopic(Codec codec, CommandExecutor commandExecutor, String name) { + protected RedissonPatternTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) { this.commandExecutor = commandExecutor; this.name = name; this.channelName = new ChannelName(name); @@ -75,7 +82,36 @@ public class RedissonPatternTopic implements RPatternTopic { commandExecutor.syncSubscription(future); return System.identityHashCode(pubSubListener); } - + + @Override + public RFuture addListenerAsync(PatternStatusListener listener) { + PubSubPatternStatusListener pubSubListener = new PubSubPatternStatusListener(listener, name); + return addListenerAsync(pubSubListener); + } + + @Override + public RFuture addListenerAsync(PatternMessageListener listener) { + PubSubPatternMessageListener pubSubListener = new PubSubPatternMessageListener(listener, name); + return addListenerAsync(pubSubListener); + } + + private RFuture addListenerAsync(final RedisPubSubListener pubSubListener) { + RFuture future = subscribeService.subscribe(codec, channelName, pubSubListener); + final RPromise result = new RedissonPromise(); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + result.tryFailure(future.cause()); + return; + } + + result.trySuccess(System.identityHashCode(pubSubListener)); + } + }); + return result; + } + protected void acquire(AsyncSemaphore semaphore) { MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig(); int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index f1820072b..22e0a39ea 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -75,13 +75,11 @@ import org.redisson.reactive.RedissonListMultimapReactive; import org.redisson.reactive.RedissonListReactive; import org.redisson.reactive.RedissonMapCacheReactive; import org.redisson.reactive.RedissonMapReactive; -import org.redisson.reactive.RedissonPatternTopicReactive; import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetMultimapReactive; import org.redisson.reactive.RedissonSetReactive; -import org.redisson.reactive.RedissonTopicReactive; import org.redisson.reactive.RedissonTransactionReactive; /** @@ -297,22 +295,22 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RTopicReactive getTopic(String name) { - return new RedissonTopicReactive(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonTopic(commandExecutor, name), RTopicReactive.class); } @Override public RTopicReactive getTopic(String name, Codec codec) { - return new RedissonTopicReactive(codec, commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonTopic(codec, commandExecutor, name), RTopicReactive.class); } @Override public RPatternTopicReactive getPatternTopic(String pattern) { - return new RedissonPatternTopicReactive(commandExecutor, pattern); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(commandExecutor, pattern), RPatternTopicReactive.class); } @Override public RPatternTopicReactive getPatternTopic(String pattern, Codec codec) { - return new RedissonPatternTopicReactive(codec, commandExecutor, pattern); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(codec, commandExecutor, pattern), RPatternTopicReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 9749b4253..93a584cb7 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -69,6 +69,7 @@ public class RedissonTopic implements RTopic { this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); } + @Override public List getChannelNames() { return Collections.singletonList(name); } @@ -110,22 +111,15 @@ public class RedissonTopic implements RTopic { } @Override - public RFuture addListenerAsync(final MessageListener listener) { - final PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); - RFuture future = subscribeService.subscribe(codec, channelName, pubSubListener); - final RPromise result = new RedissonPromise(); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - result.tryFailure(future.cause()); - return; - } - - result.trySuccess(System.identityHashCode(pubSubListener)); - } - }); - return result; + public RFuture addListenerAsync(StatusListener listener) { + PubSubStatusListener pubSubListener = new PubSubStatusListener(listener, name); + return addListenerAsync((RedisPubSubListener)pubSubListener); + } + + @Override + public RFuture addListenerAsync(MessageListener listener) { + PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); + return addListenerAsync(pubSubListener); } private int addListener(RedisPubSubListener pubSubListener) { @@ -134,7 +128,7 @@ public class RedissonTopic implements RTopic { return System.identityHashCode(pubSubListener); } - public RFuture addListenerAsync(final RedisPubSubListener pubSubListener) { + private RFuture addListenerAsync(final RedisPubSubListener pubSubListener) { RFuture future = subscribeService.subscribe(codec, channelName, pubSubListener); final RPromise result = new RedissonPromise(); future.addListener(new FutureListener() { diff --git a/redisson/src/main/java/org/redisson/api/RBlockingDequeReactive.java b/redisson/src/main/java/org/redisson/api/RBlockingDequeReactive.java index dc1fc5fc2..c8eec0385 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingDequeReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingDequeReactive.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; /** - * Distributed reactive implementation of {@link BlockingDeque} + * Reactive interface for Redis based BlockingDeque object * * @author Nikita Koksharov * @param the type of elements held in this collection diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java index a3fbbde44..ccad9c2e7 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; /** - * Distributed reactive implementation of {@link BlockingQueue} + * Reactive interface for BlockingQueue object * * @author Nikita Koksharov * @param the type of elements held in this collection diff --git a/redisson/src/main/java/org/redisson/api/RCollectionRx.java b/redisson/src/main/java/org/redisson/api/RCollectionRx.java index 6b400cd32..68316cc79 100644 --- a/redisson/src/main/java/org/redisson/api/RCollectionRx.java +++ b/redisson/src/main/java/org/redisson/api/RCollectionRx.java @@ -22,7 +22,7 @@ import org.reactivestreams.Publisher; import io.reactivex.Flowable; /** - * Common reactive interface for collection object + * Common RxJava2 interface for collection object * * @author Nikita Koksharov * diff --git a/redisson/src/main/java/org/redisson/api/RDequeReactive.java b/redisson/src/main/java/org/redisson/api/RDequeReactive.java index 1274a131f..b2a0465e3 100644 --- a/redisson/src/main/java/org/redisson/api/RDequeReactive.java +++ b/redisson/src/main/java/org/redisson/api/RDequeReactive.java @@ -18,7 +18,7 @@ package org.redisson.api; import org.reactivestreams.Publisher; /** - * Distributed reactive implementation of {@link java.util.Deque} + * Reactive interface for Deque object * * @author Nikita Koksharov * diff --git a/redisson/src/main/java/org/redisson/api/RLexSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RLexSortedSetReactive.java index e8bab44ff..80b2fea5c 100644 --- a/redisson/src/main/java/org/redisson/api/RLexSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RLexSortedSetReactive.java @@ -20,6 +20,7 @@ import java.util.Collection; import org.reactivestreams.Publisher; /** + * Reactive interface for LexSortedSet object * * @author Nikita Koksharov * diff --git a/redisson/src/main/java/org/redisson/api/RPatternTopic.java b/redisson/src/main/java/org/redisson/api/RPatternTopic.java index e3e433858..d5b187af7 100644 --- a/redisson/src/main/java/org/redisson/api/RPatternTopic.java +++ b/redisson/src/main/java/org/redisson/api/RPatternTopic.java @@ -21,7 +21,7 @@ import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.PatternStatusListener; /** - * Distributed topic. Messages are delivered to all message listeners across Redis cluster. + * Pattern based observer for Publish Subscribe object. * * @author Nikita Koksharov * @@ -75,5 +75,8 @@ public interface RPatternTopic { */ void removeAllListeners(); + RFuture addListenerAsync(PatternStatusListener listener); + + RFuture addListenerAsync(PatternMessageListener listener); } diff --git a/redisson/src/main/java/org/redisson/api/RPatternTopicReactive.java b/redisson/src/main/java/org/redisson/api/RPatternTopicReactive.java index 69772be4a..3bf635978 100644 --- a/redisson/src/main/java/org/redisson/api/RPatternTopicReactive.java +++ b/redisson/src/main/java/org/redisson/api/RPatternTopicReactive.java @@ -22,7 +22,7 @@ import org.redisson.api.listener.PatternMessageListener; import org.redisson.api.listener.PatternStatusListener; /** - * Distributed topic. Messages are delivered to all message listeners across Redis cluster. + * Reactive interface for Pattern based observer for Publish Subscribe object. * * @author Nikita Koksharov * diff --git a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java index 0fea838c0..df8fdb95a 100644 --- a/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java @@ -25,6 +25,7 @@ import org.redisson.api.RScoredSortedSet.Aggregate; import org.redisson.client.protocol.ScoredEntry; /** + * Reactive interface for SortedSet object * * @author Nikita Koksharov * diff --git a/redisson/src/main/java/org/redisson/api/RTopicAsync.java b/redisson/src/main/java/org/redisson/api/RTopicAsync.java index 4af810768..78dfa413f 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicAsync.java +++ b/redisson/src/main/java/org/redisson/api/RTopicAsync.java @@ -16,6 +16,7 @@ package org.redisson.api; import org.redisson.api.listener.MessageListener; +import org.redisson.api.listener.StatusListener; /** * Distributed topic. Messages are delivered to all message listeners across Redis cluster. @@ -34,6 +35,15 @@ public interface RTopicAsync { */ RFuture publishAsync(M message); + /** + * Subscribes to status changes of this topic + * + * @param listener for messages + * @return listener id + * @see org.redisson.api.listener.StatusListener + */ + RFuture addListenerAsync(StatusListener listener); + /** * Subscribes to this topic. * MessageListener.onMessage is called when any message diff --git a/redisson/src/main/java/org/redisson/api/RTopicReactive.java b/redisson/src/main/java/org/redisson/api/RTopicReactive.java index f0fe6007b..6182e542c 100644 --- a/redisson/src/main/java/org/redisson/api/RTopicReactive.java +++ b/redisson/src/main/java/org/redisson/api/RTopicReactive.java @@ -22,7 +22,7 @@ import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.StatusListener; /** - * Distributed topic. Messages are delivered to all message listeners across Redis cluster. + * Reactive interface for Publish Subscribe object. Messages are delivered to all message listeners across Redis cluster. * * @author Nikita Koksharov * @@ -30,6 +30,11 @@ import org.redisson.api.listener.StatusListener; */ public interface RTopicReactive { + /** + * Get topic channel names + * + * @return channel names + */ List getChannelNames(); /** @@ -40,9 +45,30 @@ public interface RTopicReactive { */ Publisher publish(M message); + /** + * Subscribes to status changes of this topic + * + * @param listener for messages + * @return listener id + * @see org.redisson.api.listener.StatusListener + */ Publisher addListener(StatusListener listener); + /** + * Subscribes to this topic. + * MessageListener.onMessage is called when any message + * is published on this topic. + * + * @param listener for messages + * @return locally unique listener id + * @see org.redisson.api.listener.MessageListener + */ Publisher addListener(MessageListener listener); + /** + * Removes the listener by id for listening this topic + * + * @param listenerId - listener id + */ void removeListener(int listenerId); } diff --git a/redisson/src/main/java/org/redisson/api/RTransactionReactive.java b/redisson/src/main/java/org/redisson/api/RTransactionReactive.java index b250c406a..6aba4d8c1 100644 --- a/redisson/src/main/java/org/redisson/api/RTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/api/RTransactionReactive.java @@ -19,7 +19,7 @@ import org.reactivestreams.Publisher; import org.redisson.client.codec.Codec; /** - * Transaction object allows to execute transactions over Redisson objects. + * Reactive interface for transaction object allows to execute transactions over Redisson objects. * Uses locks for write operations and maintains data modification operations list till the commit/rollback operation. *

* Transaction isolation level: READ_COMMITTED diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java deleted file mode 100644 index 4b44e547c..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonPatternTopicReactive.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.Collections; -import java.util.List; - -import org.reactivestreams.Publisher; -import org.redisson.PubSubPatternMessageListener; -import org.redisson.PubSubPatternStatusListener; -import org.redisson.api.RFuture; -import org.redisson.api.RPatternTopicReactive; -import org.redisson.api.listener.PatternMessageListener; -import org.redisson.api.listener.PatternStatusListener; -import org.redisson.client.ChannelName; -import org.redisson.client.RedisPubSubListener; -import org.redisson.client.RedisTimeoutException; -import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; -import org.redisson.config.MasterSlaveServersConfig; -import org.redisson.misc.RPromise; -import org.redisson.misc.RedissonPromise; -import org.redisson.pubsub.AsyncSemaphore; -import org.redisson.pubsub.PubSubConnectionEntry; -import org.redisson.pubsub.PublishSubscribeService; - -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import reactor.fn.Supplier; - -/** - * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. - * - * @author Nikita Koksharov - * - * @param message - */ -public class RedissonPatternTopicReactive implements RPatternTopicReactive { - - final PublishSubscribeService subscribeService; - final CommandReactiveExecutor commandExecutor; - private final String name; - private final ChannelName channelName; - private final Codec codec; - - public RedissonPatternTopicReactive(CommandReactiveExecutor commandExecutor, String name) { - this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); - } - - public RedissonPatternTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - this.commandExecutor = commandExecutor; - this.name = name; - this.channelName = new ChannelName(name); - this.codec = codec; - this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); - } - - @Override - public Publisher addListener(final PatternStatusListener listener) { - return commandExecutor.reactive(new Supplier>() { - @Override - public RFuture get() { - RPromise promise = new RedissonPromise(); - addListener(new PubSubPatternStatusListener(listener, name), promise); - return promise; - } - }); - }; - - @Override - public Publisher addListener(final PatternMessageListener listener) { - return commandExecutor.reactive(new Supplier>() { - @Override - public RFuture get() { - RPromise promise = new RedissonPromise(); - PubSubPatternMessageListener pubSubListener = new PubSubPatternMessageListener(listener, name); - addListener(pubSubListener, promise); - return promise; - } - }); - } - - private void addListener(final RedisPubSubListener pubSubListener, final RPromise promise) { - RFuture future = subscribeService.psubscribe(channelName, codec, pubSubListener); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - promise.tryFailure(future.cause()); - return; - } - - promise.trySuccess(pubSubListener.hashCode()); - } - }); - } - - protected void acquire(AsyncSemaphore semaphore) { - MasterSlaveServersConfig config = commandExecutor.getConnectionManager().getConfig(); - int timeout = config.getTimeout() + config.getRetryInterval() * config.getRetryAttempts(); - if (!semaphore.tryAcquire(timeout)) { - throw new RedisTimeoutException("Remove listeners operation timeout: (" + timeout + "ms) for " + name + " topic"); - } - } - - @Override - public void removeListener(int listenerId) { - AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); - acquire(semaphore); - - PubSubConnectionEntry entry = subscribeService.getPubSubEntry(channelName); - if (entry == null) { - semaphore.release(); - return; - } - - entry.removeListener(channelName, listenerId); - if (!entry.hasListeners(channelName)) { - subscribeService.punsubscribe(channelName, semaphore); - } else { - semaphore.release(); - } - } - - @Override - public List getPatternNames() { - return Collections.singletonList(name); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java deleted file mode 100644 index 8f537de81..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTopicReactive.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.Collections; -import java.util.List; - -import org.reactivestreams.Publisher; -import org.redisson.PubSubMessageListener; -import org.redisson.PubSubStatusListener; -import org.redisson.RedissonTopic; -import org.redisson.api.RFuture; -import org.redisson.api.RTopic; -import org.redisson.api.RTopicReactive; -import org.redisson.api.listener.MessageListener; -import org.redisson.api.listener.StatusListener; -import org.redisson.client.RedisPubSubListener; -import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; - -/** - * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. - * - * @author Nikita Koksharov - * - * @param message - */ -public class RedissonTopicReactive implements RTopicReactive { - - private final RTopic topic; - private final CommandReactiveExecutor commandExecutor; - private final String name; - - public RedissonTopicReactive(CommandReactiveExecutor commandExecutor, String name) { - this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); - } - - public RedissonTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - this.topic = new RedissonTopic(codec, commandExecutor, name); - this.commandExecutor = commandExecutor; - this.name = name; - } - - @Override - public List getChannelNames() { - return Collections.singletonList(name); - } - - @Override - public Publisher publish(final M message) { - return commandExecutor.reactive(new Supplier>() { - @Override - public RFuture get() { - return topic.publishAsync(message); - } - }); - } - - @Override - public Publisher addListener(StatusListener listener) { - return addListener(new PubSubStatusListener(listener, name)); - }; - - @Override - public Publisher addListener(MessageListener listener) { - PubSubMessageListener pubSubListener = new PubSubMessageListener(listener, name); - return addListener(pubSubListener); - } - - private Publisher addListener(final RedisPubSubListener pubSubListener) { - return commandExecutor.reactive(new Supplier>() { - @Override - public RFuture get() { - return ((RedissonTopic) topic).addListenerAsync(pubSubListener); - } - }); - } - - - @Override - public void removeListener(int listenerId) { - topic.removeListener(listenerId); - } - - -}