RTopicReactive added. #210

pull/337/head
Nikita 9 years ago
parent 1802228a5e
commit 1cc0b2fa1a

@ -19,17 +19,11 @@ import java.net.InetSocketAddress;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.support.Exceptions;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.subscription.ReactiveSubscription;
/**
*
@ -38,47 +32,6 @@ import reactor.rx.subscription.ReactiveSubscription;
*/
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {
static class NettyFuturePublisher<T> extends Stream<T> {
private final Future<? extends T> that;
public NettyFuturePublisher(Future<? extends T> that) {
this.that = that;
}
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new ReactiveSubscription<T>(this, subscriber) {
@Override
public void request(long elements) {
Action.checkRequest(elements);
if (isComplete()) return;
that.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
subscriber.onError(future.cause());
return;
}
if (future.getNow() != null) {
subscriber.onNext(future.getNow());
}
onComplete();
}
});
}
});
} catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
subscriber.onError(throwable);
}
}
}
public CommandReactiveService(ConnectionManager connectionManager) {
super(connectionManager);
}

@ -0,0 +1,53 @@
package org.redisson;
import org.reactivestreams.Subscriber;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.support.Exceptions;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.subscription.ReactiveSubscription;
public class NettyFuturePublisher<T> extends Stream<T> {
private final Future<? extends T> that;
public NettyFuturePublisher(Future<? extends T> that) {
this.that = that;
}
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new ReactiveSubscription<T>(this, subscriber) {
@Override
public void request(long elements) {
Action.checkRequest(elements);
if (isComplete()) return;
that.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
subscriber.onError(future.cause());
return;
}
if (future.getNow() != null) {
subscriber.onNext(future.getNow());
}
onComplete();
}
});
}
});
} catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
subscriber.onError(throwable);
}
}
}

@ -36,6 +36,7 @@ import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;
import io.netty.util.concurrent.Future;
@ -157,6 +158,16 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonLexSortedSetReactive(commandExecutor, name);
}
@Override
public <M> RTopicReactive<M> getTopic(String name) {
return new RedissonTopicReactive<M>(commandExecutor, name);
}
@Override
public <M> RTopicReactive<M> getTopic(String name, Codec codec) {
return new RedissonTopicReactive<M>(codec, commandExecutor, name);
}
@Override
public void shutdown() {
connectionManager.shutdown();

@ -27,6 +27,7 @@ import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
import org.redisson.core.RScoredSortedSetReactive;
import org.redisson.core.RSetReactive;
import org.redisson.core.RTopicReactive;
public interface RedissonReactiveClient {
@ -105,16 +106,16 @@ public interface RedissonReactiveClient {
*/
RLexSortedSetReactive getLexSortedSet(String name);
// /**
// * Returns topic instance by name.
// *
// * @param name of topic
// * @return
// */
// <M> RTopic<M> getTopic(String name);
//
// <M> RTopic<M> getTopic(String name, Codec codec);
//
/**
* Returns topic instance by name.
*
* @param name of topic
* @return
*/
<M> RTopicReactive<M> getTopic(String name);
<M> RTopicReactive<M> getTopic(String name, Codec codec);
// /**
// * Returns topic instance satisfies by pattern name.
// *

@ -0,0 +1,117 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.core.MessageListener;
import org.redisson.core.RTopicReactive;
import org.redisson.core.StatusListener;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
/**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
*
* @author Nikita Koksharov
*
* @param <M> message
*/
public class RedissonTopicReactive<M> implements RTopicReactive<M> {
private final CommandReactiveExecutor commandExecutor;
private final String name;
private final Codec codec;
protected RedissonTopicReactive(CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected RedissonTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.name = name;
this.codec = codec;
}
@Override
public List<String> getChannelNames() {
return Collections.singletonList(name);
}
@Override
public Publisher<Long> publish(M message) {
return commandExecutor.writeObservable(name, codec, RedisCommands.PUBLISH, name, message);
}
@Override
public Publisher<Integer> addListener(StatusListener listener) {
return addListener(new PubSubStatusListener(listener, name));
};
@Override
public Publisher<Integer> addListener(MessageListener<M> listener) {
PubSubMessageListener<M> pubSubListener = new PubSubMessageListener<M>(listener, name);
return addListener(pubSubListener);
}
private Publisher<Integer> addListener(final RedisPubSubListener<M> pubSubListener) {
final Promise<Integer> promise = commandExecutor.getConnectionManager().newPromise();
Future<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
return;
}
promise.setSuccess(System.identityHashCode(pubSubListener));
}
});
return new NettyFuturePublisher<Integer>(promise);
}
@Override
public void removeListener(int listenerId) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
return;
}
synchronized (entry) {
if (entry.isActive()) {
entry.removeListener(name, listenerId);
if (!entry.hasListeners(name)) {
commandExecutor.getConnectionManager().unsubscribe(name);
}
return;
}
}
// listener has been re-attached
removeListener(listenerId);
}
}

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.List;
import org.reactivestreams.Publisher;
/**
* Distributed topic. Messages are delivered to all message listeners across Redis cluster.
*
* @author Nikita Koksharov
*
* @param <M> the type of message object
*/
public interface RTopicReactive<M> {
List<String> getChannelNames();
/**
* Publish the message to all subscribers of this topic asynchronously
*
* @param message
* @return the <code>Future</code> object with number of clients that received the message
*/
Publisher<Long> publish(M message);
Publisher<Integer> addListener(StatusListener listener);
Publisher<Integer> addListener(MessageListener<M> listener);
void removeListener(int listenerId);
}
Loading…
Cancel
Save