From 48ce2698c0a0a291a9543ef41de08e6546f05e76 Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 1 Dec 2015 11:55:22 +0300 Subject: [PATCH] RedissonPatternTopicReactive added. #210 --- .../RedissonPatternTopicReactive.java | 120 ++++++++++++++++++ .../java/org/redisson/RedissonReactive.java | 11 ++ .../org/redisson/RedissonReactiveClient.java | 109 +++------------- .../redisson/core/RPatternTopicReactive.java | 65 ++++++++++ 4 files changed, 211 insertions(+), 94 deletions(-) create mode 100644 src/main/java/org/redisson/RedissonPatternTopicReactive.java create mode 100644 src/main/java/org/redisson/core/RPatternTopicReactive.java diff --git a/src/main/java/org/redisson/RedissonPatternTopicReactive.java b/src/main/java/org/redisson/RedissonPatternTopicReactive.java new file mode 100644 index 000000000..8dbaf56a3 --- /dev/null +++ b/src/main/java/org/redisson/RedissonPatternTopicReactive.java @@ -0,0 +1,120 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson; + +import java.util.Collections; +import java.util.List; + +import org.reactivestreams.Publisher; +import org.redisson.client.RedisPubSubListener; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandReactiveExecutor; +import org.redisson.connection.PubSubConnectionEntry; +import org.redisson.core.PatternMessageListener; +import org.redisson.core.PatternStatusListener; +import org.redisson.core.RPatternTopicReactive; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + +/** + * Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster. + * + * @author Nikita Koksharov + * + * @param message + */ +public class RedissonPatternTopicReactive implements RPatternTopicReactive { + + final CommandReactiveExecutor commandExecutor; + private final String name; + private final Codec codec; + + protected RedissonPatternTopicReactive(CommandReactiveExecutor commandExecutor, String name) { + this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); + } + + protected RedissonPatternTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + this.commandExecutor = commandExecutor; + this.name = name; + this.codec = codec; + } + + @Override + public Publisher addListener(PatternStatusListener listener) { + Promise promise = commandExecutor.getConnectionManager().newPromise(); + addListener(new PubSubPatternStatusListener(listener, name), promise); + return new NettyFuturePublisher(promise); + }; + + @Override + public Publisher addListener(PatternMessageListener listener) { + Promise promise = commandExecutor.getConnectionManager().newPromise(); + PubSubPatternMessageListener pubSubListener = new PubSubPatternMessageListener(listener, name); + addListener(pubSubListener, promise); + return new NettyFuturePublisher(promise); + } + + private void addListener(final RedisPubSubListener pubSubListener, final Promise promise) { + Future future = commandExecutor.getConnectionManager().psubscribe(name, codec); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.setFailure(future.cause()); + return; + } + + PubSubConnectionEntry entry = future.getNow(); + synchronized (entry) { + if (entry.isActive()) { + entry.addListener(name, pubSubListener); + promise.setSuccess(pubSubListener.hashCode()); + return; + } + } + addListener(pubSubListener, promise); + } + }); + } + + @Override + public void removeListener(int listenerId) { + PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name); + if (entry == null) { + return; + } + synchronized (entry) { + if (entry.isActive()) { + entry.removeListener(name, listenerId); + if (entry.getListeners(name).isEmpty()) { + commandExecutor.getConnectionManager().punsubscribe(name); + } + return; + } + } + + // entry is inactive trying add again + removeListener(listenerId); + } + + @Override + public List getPatternNames() { + return Collections.singletonList(name); + } + +} diff --git a/src/main/java/org/redisson/RedissonReactive.java b/src/main/java/org/redisson/RedissonReactive.java index f52226a5d..d4837f663 100644 --- a/src/main/java/org/redisson/RedissonReactive.java +++ b/src/main/java/org/redisson/RedissonReactive.java @@ -35,6 +35,7 @@ import org.redisson.core.RLexSortedSetReactive; import org.redisson.core.RListReactive; import org.redisson.core.RMap; import org.redisson.core.RMapReactive; +import org.redisson.core.RPatternTopicReactive; import org.redisson.core.RScoredSortedSetReactive; import org.redisson.core.RSetReactive; import org.redisson.core.RTopicReactive; @@ -169,6 +170,16 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonTopicReactive(codec, commandExecutor, name); } + @Override + public RPatternTopicReactive getPatternTopic(String pattern) { + return new RedissonPatternTopicReactive(commandExecutor, pattern); + } + + @Override + public RPatternTopicReactive getPatternTopic(String pattern, Codec codec) { + return new RedissonPatternTopicReactive(codec, commandExecutor, pattern); + } + @Override public void shutdown() { connectionManager.shutdown(); diff --git a/src/main/java/org/redisson/RedissonReactiveClient.java b/src/main/java/org/redisson/RedissonReactiveClient.java index 741b15dac..9555a4bd8 100644 --- a/src/main/java/org/redisson/RedissonReactiveClient.java +++ b/src/main/java/org/redisson/RedissonReactiveClient.java @@ -25,6 +25,7 @@ import org.redisson.core.RLexSortedSetReactive; import org.redisson.core.RListReactive; import org.redisson.core.RMap; import org.redisson.core.RMapReactive; +import org.redisson.core.RPatternTopicReactive; import org.redisson.core.RScoredSortedSetReactive; import org.redisson.core.RSetReactive; import org.redisson.core.RTopicReactive; @@ -116,20 +117,20 @@ public interface RedissonReactiveClient { RTopicReactive getTopic(String name, Codec codec); -// /** -// * Returns topic instance satisfies by pattern name. -// * -// * Supported glob-style patterns: -// * h?llo subscribes to hello, hallo and hxllo -// * h*llo subscribes to hllo and heeeello -// * h[ae]llo subscribes to hello and hallo, but not hillo -// * -// * @param pattern of the topic -// * @return -// */ -// RPatternTopic getPatternTopic(String pattern); -// -// RPatternTopic getPatternTopic(String pattern, Codec codec); + /** + * Returns topic instance satisfies by pattern name. + * + * Supported glob-style patterns: + * h?llo subscribes to hello, hallo and hxllo + * h*llo subscribes to hllo and heeeello + * h[ae]llo subscribes to hello and hallo, but not hillo + * + * @param pattern of the topic + * @return + */ + RPatternTopicReactive getPatternTopic(String pattern); + + RPatternTopicReactive getPatternTopic(String pattern, Codec codec); // // /** // * Returns queue instance by name. @@ -219,86 +220,6 @@ public interface RedissonReactiveClient { // Config getConfig(); // // /** -// * Find keys by key search pattern -// * -// * Supported glob-style patterns: -// * h?llo subscribes to hello, hallo and hxllo -// * h*llo subscribes to hllo and heeeello -// * h[ae]llo subscribes to hello and hallo, but not hillo -// * -// * @param pattern -// * @return -// */ -// // use RKeys.findKeysByPattern -// @Deprecated -// Collection findKeysByPattern(String pattern); -// -// /** -// * Find keys by key search pattern in async mode -// * -// * Supported glob-style patterns: -// * h?llo subscribes to hello, hallo and hxllo -// * h*llo subscribes to hllo and heeeello -// * h[ae]llo subscribes to hello and hallo, but not hillo -// * -// * @param pattern -// * @return -// */ -// // use RKeys.findKeysByPatternAsync -// @Deprecated -// Future> findKeysByPatternAsync(String pattern); -// -// /** -// * Delete multiple objects by a key pattern -// * -// * Supported glob-style patterns: -// * h?llo subscribes to hello, hallo and hxllo -// * h*llo subscribes to hllo and heeeello -// * h[ae]llo subscribes to hello and hallo, but not hillo -// * -// * @param pattern -// * @return -// */ -// // use RKeys.deleteByPattern -// @Deprecated -// long deleteByPattern(String pattern); -// -// /** -// * Delete multiple objects by a key pattern in async mode -// * -// * Supported glob-style patterns: -// * h?llo subscribes to hello, hallo and hxllo -// * h*llo subscribes to hllo and heeeello -// * h[ae]llo subscribes to hello and hallo, but not hillo -// * -// * @param pattern -// * @return -// */ -// // use RKeys.deleteByPatternAsync -// @Deprecated -// Future deleteByPatternAsync(String pattern); -// -// /** -// * Delete multiple objects by name -// * -// * @param keys - object names -// * @return -// */ -// // use RKeys.delete -// @Deprecated -// long delete(String ... keys); -// -// /** -// * Delete multiple objects by name in async mode -// * -// * @param keys - object names -// * @return -// */ -// // use RKeys.deleteAsync -// @Deprecated -// Future deleteAsync(String ... keys); -// -// /** // * Get Redis nodes group for server operations // * // * @return diff --git a/src/main/java/org/redisson/core/RPatternTopicReactive.java b/src/main/java/org/redisson/core/RPatternTopicReactive.java new file mode 100644 index 000000000..a1f84938f --- /dev/null +++ b/src/main/java/org/redisson/core/RPatternTopicReactive.java @@ -0,0 +1,65 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.core; + +import java.util.List; + +import org.reactivestreams.Publisher; + +/** + * Distributed topic. Messages are delivered to all message listeners across Redis cluster. + * + * @author Nikita Koksharov + * + * @param the type of message object + */ +public interface RPatternTopicReactive { + + /** + * Get topic channel patterns + * + * @return + */ + List getPatternNames(); + + /** + * Subscribes to this topic. + * MessageListener.onMessage is called when any message + * is published on this topic. + * + * @param listener + * @return locally unique listener id + * @see org.redisson.core.MessageListener + */ + Publisher addListener(PatternMessageListener listener); + + /** + * Subscribes to status changes of this topic + * + * @param listener + * @return + * @see org.redisson.core.StatusListener + */ + Publisher addListener(PatternStatusListener listener); + + /** + * Removes the listener by id for listening this topic + * + * @param listenerId + */ + void removeListener(int listenerId); + +}