Feature - RReliableTopic object added. #3131

pull/3209/head
Nikita Koksharov 4 years ago
parent 88d1c9507c
commit 3941558338

@ -305,12 +305,16 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RReliableTopicReactive getReliableTopic(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonReliableTopic(commandExecutor, name), RReliableTopicReactive.class);
RedissonReliableTopic topic = new RedissonReliableTopic(commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonReliableTopicReactive(topic), RReliableTopicReactive.class);
}
@Override
public RReliableTopicReactive getReliableTopic(String name, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonReliableTopic(codec, commandExecutor, name), RReliableTopicReactive.class);
RedissonReliableTopic topic = new RedissonReliableTopic(codec, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, topic,
new RedissonReliableTopicReactive(topic), RReliableTopicReactive.class);
}
@Override

@ -16,6 +16,7 @@
package org.redisson.api;
import org.redisson.api.listener.MessageListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@ -37,7 +38,7 @@ public interface RReliableTopicReactive extends RExpirableReactive {
*
* @return amount of messages
*/
Mono<Long> sizeAsync();
Mono<Long> size();
/**
* Publish the message to all subscribers of this topic asynchronously.
@ -46,7 +47,7 @@ public interface RReliableTopicReactive extends RExpirableReactive {
* @param message to send
* @return number of subscribers that received the message
*/
Mono<Long> publishAsync(Object message);
Mono<Long> publish(Object message);
/**
* Subscribes to this topic.
@ -63,7 +64,7 @@ public interface RReliableTopicReactive extends RExpirableReactive {
* @return locally unique listener id
* @see MessageListener
*/
<M> Mono<String> addListenerAsync(Class<M> type, MessageListener<M> listener);
<M> Mono<String> addListener(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
@ -71,12 +72,12 @@ public interface RReliableTopicReactive extends RExpirableReactive {
* @param listenerIds - listener ids
* @return void
*/
Mono<Void> removeListenerAsync(String... listenerIds);
Mono<Void> removeListener(String... listenerIds);
/**
* Removes all listeners from this topic
*/
Mono<Void> removeAllListenersAsync();
Mono<Void> removeAllListeners();
/**
* Returns amount of subscribers to this topic across all Redisson instances.
@ -84,6 +85,15 @@ public interface RReliableTopicReactive extends RExpirableReactive {
*
* @return amount of subscribers
*/
Mono<Integer> countSubscribersAsync();
Mono<Integer> countSubscribers();
/**
* Returns continues stream of published messages.
*
* @param <M> type of message
* @param type - type of message to listen
* @return stream of messages
*/
<M> Flux<M> getMessages(Class<M> type);
}

@ -16,6 +16,7 @@
package org.redisson.api;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import org.redisson.api.listener.MessageListener;
@ -38,7 +39,7 @@ public interface RReliableTopicRx extends RExpirableRx {
*
* @return amount of messages
*/
Single<Long> sizeAsync();
Single<Long> size();
/**
* Publish the message to all subscribers of this topic asynchronously.
@ -47,7 +48,7 @@ public interface RReliableTopicRx extends RExpirableRx {
* @param message to send
* @return number of subscribers that received the message
*/
Single<Long> publishAsync(Object message);
Single<Long> publish(Object message);
/**
* Subscribes to this topic.
@ -64,7 +65,7 @@ public interface RReliableTopicRx extends RExpirableRx {
* @return locally unique listener id
* @see MessageListener
*/
<M> Single<String> addListenerAsync(Class<M> type, MessageListener<M> listener);
<M> Single<String> addListener(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
@ -72,12 +73,12 @@ public interface RReliableTopicRx extends RExpirableRx {
* @param listenerIds - listener ids
* @return void
*/
Completable removeListenerAsync(String... listenerIds);
Completable removeListener(String... listenerIds);
/**
* Removes all listeners from this topic
*/
Completable removeAllListenersAsync();
Completable removeAllListeners();
/**
* Returns amount of subscribers to this topic across all Redisson instances.
@ -85,6 +86,15 @@ public interface RReliableTopicRx extends RExpirableRx {
*
* @return amount of subscribers
*/
Single<Integer> countSubscribersAsync();
Single<Integer> countSubscribers();
/**
* Returns continues stream of published messages.
*
* @param <M> - type of message
* @param type - type of message to listen
* @return stream of messages
*/
<M> Flowable<M> getMessages(Class<M> type);
}

@ -0,0 +1,65 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonReliableTopicReactive {
private final RReliableTopic topic;
public RedissonReliableTopicReactive(RReliableTopic topic) {
this.topic = topic;
}
public <M> Flux<M> getMessages(Class<M> type) {
return Flux.<M>create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
AtomicReference<String> idRef = new AtomicReference<>();
RFuture<String> t = topic.addListenerAsync(type, (channel, msg) -> {
emitter.next(msg);
if (counter.decrementAndGet() == 0) {
topic.removeListenerAsync(idRef.get());
emitter.complete();
}
});
t.onComplete((id, e) -> {
if (e != null) {
emitter.error(e);
return;
}
idRef.set(id);
emitter.onDispose(() -> {
topic.removeListenerAsync(id);
});
});
});
});
}
}

@ -0,0 +1,63 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import io.reactivex.Flowable;
import io.reactivex.processors.ReplayProcessor;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonReliableTopicRx {
private final RReliableTopic topic;
public RedissonReliableTopicRx(RReliableTopic topic) {
this.topic = topic;
}
public <M> Flowable<M> getMessages(Class<M> type) {
ReplayProcessor<M> p = ReplayProcessor.create();
return p.doOnRequest(n -> {
AtomicLong counter = new AtomicLong(n);
AtomicReference<String> idRef = new AtomicReference<>();
RFuture<String> t = topic.addListenerAsync(type, (channel, msg) -> {
p.onNext(msg);
if (counter.decrementAndGet() == 0) {
topic.removeListenerAsync(idRef.get());
p.onComplete();
}
});
t.onComplete((id, e) -> {
if (e != null) {
p.onError(e);
return;
}
idRef.set(id);
p.doOnCancel(() -> topic.removeListenerAsync(id));
});
});
}
}
Loading…
Cancel
Save