|
|
|
@ -21,20 +21,16 @@ import java.util.List;
|
|
|
|
|
import org.reactivestreams.Publisher;
|
|
|
|
|
import org.redisson.PubSubMessageListener;
|
|
|
|
|
import org.redisson.PubSubStatusListener;
|
|
|
|
|
import org.redisson.RedissonTopic;
|
|
|
|
|
import org.redisson.api.RFuture;
|
|
|
|
|
import org.redisson.api.RTopic;
|
|
|
|
|
import org.redisson.api.RTopicReactive;
|
|
|
|
|
import org.redisson.api.listener.MessageListener;
|
|
|
|
|
import org.redisson.api.listener.StatusListener;
|
|
|
|
|
import org.redisson.client.RedisPubSubListener;
|
|
|
|
|
import org.redisson.client.codec.Codec;
|
|
|
|
|
import org.redisson.client.protocol.RedisCommands;
|
|
|
|
|
import org.redisson.command.CommandReactiveExecutor;
|
|
|
|
|
import org.redisson.connection.PubSubConnectionEntry;
|
|
|
|
|
import org.redisson.misc.RPromise;
|
|
|
|
|
import org.redisson.pubsub.AsyncSemaphore;
|
|
|
|
|
|
|
|
|
|
import io.netty.util.concurrent.Future;
|
|
|
|
|
import io.netty.util.concurrent.FutureListener;
|
|
|
|
|
import reactor.fn.Supplier;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -46,18 +42,18 @@ import reactor.fn.Supplier;
|
|
|
|
|
*/
|
|
|
|
|
public class RedissonTopicReactive<M> implements RTopicReactive<M> {
|
|
|
|
|
|
|
|
|
|
private final RTopic<M> topic;
|
|
|
|
|
private final CommandReactiveExecutor commandExecutor;
|
|
|
|
|
private final String name;
|
|
|
|
|
private final Codec codec;
|
|
|
|
|
|
|
|
|
|
public RedissonTopicReactive(CommandReactiveExecutor commandExecutor, String name) {
|
|
|
|
|
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RedissonTopicReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
|
|
|
|
|
this.topic = new RedissonTopic<M>(codec, commandExecutor, name);
|
|
|
|
|
this.commandExecutor = commandExecutor;
|
|
|
|
|
this.name = name;
|
|
|
|
|
this.codec = codec;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -66,8 +62,13 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Publisher<Long> publish(M message) {
|
|
|
|
|
return commandExecutor.writeReactive(name, codec, RedisCommands.PUBLISH, name, message);
|
|
|
|
|
public Publisher<Long> publish(final M message) {
|
|
|
|
|
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Long> get() {
|
|
|
|
|
return topic.publishAsync(message);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -82,23 +83,10 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Publisher<Integer> addListener(final RedisPubSubListener<?> pubSubListener) {
|
|
|
|
|
return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() {
|
|
|
|
|
return commandExecutor.reactive(new Supplier<RFuture<Integer>>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Integer> get() {
|
|
|
|
|
final RPromise<Integer> promise = commandExecutor.getConnectionManager().newPromise();
|
|
|
|
|
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
|
|
|
|
|
future.addListener(new FutureListener<PubSubConnectionEntry>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void operationComplete(Future<PubSubConnectionEntry> future) throws Exception {
|
|
|
|
|
if (!future.isSuccess()) {
|
|
|
|
|
promise.tryFailure(future.cause());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
promise.trySuccess(System.identityHashCode(pubSubListener));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
return promise;
|
|
|
|
|
return ((RedissonTopic<Integer>) topic).addListenerAsync(pubSubListener);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
@ -106,21 +94,7 @@ public class RedissonTopicReactive<M> implements RTopicReactive<M> {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void removeListener(int listenerId) {
|
|
|
|
|
AsyncSemaphore semaphore = commandExecutor.getConnectionManager().getSemaphore(name);
|
|
|
|
|
semaphore.acquireUninterruptibly();
|
|
|
|
|
|
|
|
|
|
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
semaphore.release();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
entry.removeListener(name, listenerId);
|
|
|
|
|
if (!entry.hasListeners(name)) {
|
|
|
|
|
commandExecutor.getConnectionManager().unsubscribe(name, semaphore);
|
|
|
|
|
} else {
|
|
|
|
|
semaphore.release();
|
|
|
|
|
}
|
|
|
|
|
topic.removeListener(listenerId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|