RedissonPatternTopicReactive added. #210

pull/337/head
Nikita 9 years ago
parent c59d627891
commit 48ce2698c0

@ -0,0 +1,120 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.PatternMessageListener;
import org.redisson.core.PatternStatusListener;
import org.redisson.core.RPatternTopicReactive;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
*
* @author Nikita Koksharov
*
* @param <M> message
*/
public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M> {
final CommandReactiveExecutor commandExecutor;
private final String name;
private final Codec codec;
protected RedissonPatternTopicReactive(CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected RedissonPatternTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
}
@Override
public Publisher<Integer> addListener(PatternStatusListener listener) {
Promise<Integer> promise = commandExecutor.getConnectionManager().newPromise();
addListener(new PubSubPatternStatusListener(listener, name), promise);
return new NettyFuturePublisher<Integer>(promise);
};
@Override
public Publisher<Integer> addListener(PatternMessageListener<M> listener) {
Promise<Integer> promise = commandExecutor.getConnectionManager().newPromise();
PubSubPatternMessageListener<M> pubSubListener = new PubSubPatternMessageListener<M>(listener, name);
addListener(pubSubListener, promise);
return new NettyFuturePublisher<Integer>(promise);
}
private void addListener(final RedisPubSubListener<M> pubSubListener, final Promise<Integer> promise) {
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
PubSubConnectionEntry entry = future.getNow();
synchronized (entry) {
if (entry.isActive()) {
entry.addListener(name, pubSubListener);
promise.setSuccess(pubSubListener.hashCode());
return;
}
}
addListener(pubSubListener, promise);
}
});
}
@Override
public void removeListener(int listenerId) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
return;
}
synchronized (entry) {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (entry.getListeners(name).isEmpty()) {
commandExecutor.getConnectionManager().punsubscribe(name);
}
return;
}
}
// entry is inactive trying add again
removeListener(listenerId);
}
@Override
public List<String> getPatternNames() {
return Collections.singletonList(name);
}
}

@ -35,6 +35,7 @@ import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive;
import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
import org.redisson.core.RPatternTopicReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;
@ -169,6 +170,16 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonTopicReactive<M>(codec, commandExecutor, name);
}
@Override
public <M> RPatternTopicReactive<M> getPatternTopic(String pattern) {
return new RedissonPatternTopicReactive<M>(commandExecutor, pattern);
}
@Override
public <M> RPatternTopicReactive<M> getPatternTopic(String pattern, Codec codec) {
return new RedissonPatternTopicReactive<M>(codec, commandExecutor, pattern);
}
@Override
public void shutdown() {
connectionManager.shutdown();

@ -25,6 +25,7 @@ import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive;
import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
import org.redisson.core.RPatternTopicReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;
@ -116,20 +117,20 @@ public interface RedissonReactiveClient {
<M> RTopicReactive<M> getTopic(String name, Codec codec);
// /**
// * Returns topic instance satisfies by pattern name.
// *
// * Supported glob-style patterns:
// * h?llo subscribes to hello, hallo and hxllo
// * h*llo subscribes to hllo and heeeello
// * h[ae]llo subscribes to hello and hallo, but not hillo
// *
// * @param pattern of the topic
// * @return
// */
// <M> RPatternTopic<M> getPatternTopic(String pattern);
//
// <M> RPatternTopic<M> getPatternTopic(String pattern, Codec codec);
/**
* Returns topic instance satisfies by pattern name.
*
* Supported glob-style patterns:
* h?llo subscribes to hello, hallo and hxllo
* h*llo subscribes to hllo and heeeello
* h[ae]llo subscribes to hello and hallo, but not hillo
*
* @param pattern of the topic
* @return
*/
<M> RPatternTopicReactive<M> getPatternTopic(String pattern);
<M> RPatternTopicReactive<M> getPatternTopic(String pattern, Codec codec);
//
// /**
// * Returns queue instance by name.
@ -219,86 +220,6 @@ public interface RedissonReactiveClient {
// Config getConfig();
//
// /**
// * Find keys by key search pattern
// *
// * Supported glob-style patterns:
// * h?llo subscribes to hello, hallo and hxllo
// * h*llo subscribes to hllo and heeeello
// * h[ae]llo subscribes to hello and hallo, but not hillo
// *
// * @param pattern
// * @return
// */
// // use RKeys.findKeysByPattern
// @Deprecated
// Collection<String> findKeysByPattern(String pattern);
//
// /**
// * Find keys by key search pattern in async mode
// *
// * Supported glob-style patterns:
// * h?llo subscribes to hello, hallo and hxllo
// * h*llo subscribes to hllo and heeeello
// * h[ae]llo subscribes to hello and hallo, but not hillo
// *
// * @param pattern
// * @return
// */
// // use RKeys.findKeysByPatternAsync
// @Deprecated
// Future<Collection<String>> findKeysByPatternAsync(String pattern);
//
// /**
// * Delete multiple objects by a key pattern
// *
// * Supported glob-style patterns:
// * h?llo subscribes to hello, hallo and hxllo
// * h*llo subscribes to hllo and heeeello
// * h[ae]llo subscribes to hello and hallo, but not hillo
// *
// * @param pattern
// * @return
// */
// // use RKeys.deleteByPattern
// @Deprecated
// long deleteByPattern(String pattern);
//
// /**
// * Delete multiple objects by a key pattern in async mode
// *
// * Supported glob-style patterns:
// * h?llo subscribes to hello, hallo and hxllo
// * h*llo subscribes to hllo and heeeello
// * h[ae]llo subscribes to hello and hallo, but not hillo
// *
// * @param pattern
// * @return
// */
// // use RKeys.deleteByPatternAsync
// @Deprecated
// Future<Long> deleteByPatternAsync(String pattern);
//
// /**
// * Delete multiple objects by name
// *
// * @param keys - object names
// * @return
// */
// // use RKeys.delete
// @Deprecated
// long delete(String ... keys);
//
// /**
// * Delete multiple objects by name in async mode
// *
// * @param keys - object names
// * @return
// */
// // use RKeys.deleteAsync
// @Deprecated
// Future<Long> deleteAsync(String ... keys);
//
// /**
// * Get Redis nodes group for server operations
// *
// * @return

@ -0,0 +1,65 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.List;
import org.reactivestreams.Publisher;
/**
* Distributed topic. Messages are delivered to all message listeners across Redis cluster.
*
* @author Nikita Koksharov
*
* @param <M> the type of message object
*/
public interface RPatternTopicReactive<M> {
/**
* Get topic channel patterns
*
* @return
*/
List<String> getPatternNames();
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> is called when any message
* is published on this topic.
*
* @param listener
* @return locally unique listener id
* @see org.redisson.core.MessageListener
*/
Publisher<Integer> addListener(PatternMessageListener<M> listener);
/**
* Subscribes to status changes of this topic
*
* @param listener
* @return
* @see org.redisson.core.StatusListener
*/
Publisher<Integer> addListener(PatternStatusListener listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerId
*/
void removeListener(int listenerId);
}
Loading…
Cancel
Save