Feature - RReliableTopic object added. #3131

pull/3140/head
Nikita Koksharov 4 years ago
parent f27e931cde
commit 4418a5810b

@ -465,6 +465,16 @@ public class Redisson implements RedissonClient {
return new RedissonTopic(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RReliableTopic getReliableTopic(String name) {
return new RedissonReliableTopic(connectionManager.getCommandExecutor(), name);
}
@Override
public RReliableTopic getReliableTopic(String name, Codec codec) {
return new RedissonReliableTopic(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RPatternTopic getPatternTopic(String pattern) {
return new RedissonPatternTopic(connectionManager.getCommandExecutor(), pattern);

@ -298,6 +298,16 @@ public class RedissonReactive implements RedissonReactiveClient {
new RedissonTopicReactive(topic), RTopicReactive.class);
}
@Override
public RReliableTopicReactive getReliableTopic(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonReliableTopic(commandExecutor, name), RReliableTopicReactive.class);
}
@Override
public RReliableTopicReactive getReliableTopic(String name, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonReliableTopic(codec, commandExecutor, name), RReliableTopicReactive.class);
}
@Override
public RPatternTopicReactive getPatternTopic(String pattern) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonPatternTopic(commandExecutor, pattern), RPatternTopicReactive.class);

@ -0,0 +1,352 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import org.redisson.api.RFuture;
import org.redisson.api.RReliableTopic;
import org.redisson.api.StreamMessageId;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonReliableTopic extends RedissonExpirable implements RReliableTopic {
private static final Logger log = LoggerFactory.getLogger(RedissonReliableTopic.class);
private static class Entry {
private final Class<?> type;
private final MessageListener<?> listener;
Entry(Class<?> type, MessageListener<?> listener) {
this.type = type;
this.listener = listener;
}
public Class<?> getType() {
return type;
}
public MessageListener<?> getListener() {
return listener;
}
}
private final Map<String, Entry> listeners = new ConcurrentHashMap<>();
private final AtomicReference<String> subscriberId = new AtomicReference<>();
private volatile RFuture<Map<StreamMessageId, Map<String, Object>>> readFuture;
private volatile Timeout timeoutTask;
public RedissonReliableTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
public RedissonReliableTopic(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
private String getSubscribersName() {
return suffixName(getName(), "subscribers");
}
private String getMapName() {
return suffixName(getName(), "map");
}
private String getCounter() {
return suffixName(getName(), "counter");
}
private String getTimeout() {
return suffixName(getName(), "timeout");
}
@Override
public long publish(Object message) {
return get(publishAsync(message));
}
@Override
public <M> String addListener(Class<M> type, MessageListener<M> listener) {
return get(addListenerAsync(type, listener));
}
@Override
public void removeListener(String... listenerIds) {
get(removeListenerAsync(listenerIds));
}
@Override
public void removeAllListeners() {
get(removeAllListenersAsync());
}
public RFuture<Void> removeAllListenersAsync() {
listeners.clear();
return removeSubscriber();
}
@Override
public long size() {
return get(sizeAsync());
}
public RFuture<Long> sizeAsync() {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.XLEN, getName());
}
@Override
public int countListeners() {
return listeners.size();
}
@Override
public RFuture<Long> publishAsync(Object message) {
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_LONG,
"redis.call('xadd', KEYS[1], '*', 'm', ARGV[1]); "
+ "return redis.call('zcard', KEYS[2]); ",
Arrays.asList(getName(), getSubscribersName()), encode(message));
}
protected String generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return ByteBufUtil.hexDump(id);
}
@Override
public <M> RFuture<String> addListenerAsync(Class<M> type, MessageListener<M> listener) {
String id = generateId();
listeners.put(id, new Entry(type, listener));
if (subscriberId.get() != null) {
return RedissonPromise.newSucceededFuture(id);
}
if (subscriberId.compareAndSet(null, id)) {
renewExpiration();
StreamMessageId startId = new StreamMessageId(System.currentTimeMillis(), 0);
RPromise<String> promise = new RedissonPromise<>();
RFuture<Void> addFuture = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"local value = redis.call('incr', KEYS[3]); "
+ "redis.call('zadd', KEYS[4], ARGV[3], ARGV[2]); "
+ "redis.call('zadd', KEYS[1], value, ARGV[2]); "
+ "redis.call('hset', KEYS[2], ARGV[2], ARGV[1]); ",
Arrays.asList(getSubscribersName(), getMapName(), getCounter(), getTimeout()),
startId, id, System.currentTimeMillis() + commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout());
addFuture.onComplete((r, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}
poll(id, startId);
promise.trySuccess(id);
});
return promise;
}
return RedissonPromise.newSucceededFuture(id);
}
private void poll(String id, StreamMessageId startId) {
readFuture = commandExecutor.readAsync(getName(), new CompositeCodec(StringCodec.INSTANCE, codec),
RedisCommands.XREAD_BLOCKING_SINGLE, "BLOCK", 0, "STREAMS", getName(), startId);
readFuture.onComplete((res, ex) -> {
if (readFuture.isCancelled()) {
return;
}
if (ex != null) {
if (ex instanceof RedissonShutdownException) {
return;
}
poll(id, startId);
return;
}
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
res.values().forEach(entry -> {
Object m = entry.get("m");
listeners.values().forEach(e -> {
if (e.getType().isInstance(m)) {
((MessageListener<Object>) e.getListener()).onMessage(getName(), m);
}
});
});
});
if (listeners.isEmpty()) {
return;
}
StreamMessageId lastId = res.keySet().stream().skip(res.size() - 1).findFirst().get();
long time = System.currentTimeMillis();
RFuture<Boolean> updateFuture = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local r = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if r ~= false then "
+ "local value = redis.call('incr', KEYS[4]); "
+ "redis.call('zadd', KEYS[2], value, ARGV[2]); "
+ "redis.call('hset', KEYS[3], ARGV[2], ARGV[1]); "
+ "end; "
+ "local t = redis.call('zrange', KEYS[5], 0, 0, 'WITHSCORES'); "
+ "if tonumber(t[2]) < tonumber(ARGV[3]) then "
+ "redis.call('hdel', KEYS[3], t[1]); "
+ "redis.call('zrem', KEYS[2], t[1]); "
+ "redis.call('zrem', KEYS[5], t[1]); "
+ "end; "
+ "local v = redis.call('zrange', KEYS[2], 0, 0); "
+ "local score = redis.call('hget', KEYS[3], v[1]); "
+ "local range = redis.call('xrange', KEYS[1], score, '+'); "
+ "if #range == 0 then "
+ "redis.call('del', KEYS[1]); "
+ "elseif #range == 1 and range[1][1] == score then "
+ "redis.call('del', KEYS[1]); "
+ "else "
+ "redis.call('xtrim', KEYS[1], 'maxlen', #range); "
+ "end;"
+ "return r ~= false; ",
Arrays.asList(getName(), getSubscribersName(), getMapName(), getCounter(), getTimeout()),
lastId, id, time);
updateFuture.onComplete((re, exc) -> {
if (exc != null) {
if (exc instanceof RedissonShutdownException) {
return;
}
log.error("Unable to update subscriber status", exc);
return;
}
if (!re || listeners.isEmpty()) {
return;
}
poll(id, lastId);
});
});
}
@Override
public RFuture<Boolean> deleteAsync() {
return deleteAsync(getName(), getSubscribersName(), getMapName(), getCounter(), getTimeout());
}
@Override
public RFuture<Long> sizeInMemoryAsync() {
return super.sizeInMemoryAsync(Arrays.asList(getName(), getSubscribersName(), getMapName(), getCounter(), getTimeout()));
}
@Override
public RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit) {
return expireAsync(timeToLive, timeUnit, getName(), getSubscribersName(), getMapName(), getCounter(), getTimeout());
}
@Override
public RFuture<Boolean> expireAtAsync(long timestamp) {
return expireAtAsync(timestamp, getName(), getSubscribersName(), getMapName(), getCounter(), getTimeout());
}
@Override
public RFuture<Boolean> clearExpireAsync() {
return clearExpireAsync(getName(), getSubscribersName(), getMapName(), getCounter(), getTimeout());
}
@Override
public RFuture<Void> removeListenerAsync(String... listenerIds) {
listeners.keySet().removeAll(Arrays.asList(listenerIds));
if (listeners.isEmpty()) {
return removeSubscriber();
}
return RedissonPromise.newSucceededFuture(null);
}
private RFuture<Void> removeSubscriber() {
readFuture.cancel(false);
timeoutTask.cancel();
String id = subscriberId.getAndSet(null);
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
"redis.call('zrem', KEYS[3], ARGV[1]); "
+ "redis.call('zrem', KEYS[1], ARGV[1]); "
+ "redis.call('hdel', KEYS[2], ARGV[1]); ",
Arrays.asList(getSubscribersName(), getMapName(), getTimeout()),
id);
}
@Override
public int countSubscribers() {
return get(countSubscribersAsync());
}
@Override
public RFuture<Integer> countSubscribersAsync() {
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZCARD_INT, getSubscribersName());
}
private void renewExpiration() {
timeoutTask = commandExecutor.getConnectionManager().newTimeout(t -> {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('zscore', KEYS[1], ARGV[2]) == false then "
+ "return 0; "
+ "end; "
+ "redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]); "
+ "return 1; ",
Arrays.asList(getTimeout()),
System.currentTimeMillis() + commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout(), subscriberId.get());
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update reliable topic " + getName() + " expiration time", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}, commandExecutor.getConnectionManager().getCfg().getReliableTopicWatchdogTimeout() / 3, TimeUnit.MILLISECONDS);
}
}

@ -273,6 +273,16 @@ public class RedissonRx implements RedissonRxClient {
return RxProxyBuilder.create(commandExecutor, topic, new RedissonTopicRx(topic), RTopicRx.class);
}
@Override
public RReliableTopicRx getReliableTopic(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonReliableTopic(commandExecutor, name), RReliableTopicRx.class);
}
@Override
public RReliableTopicRx getReliableTopic(String name, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonReliableTopic(codec, commandExecutor, name), RReliableTopicRx.class);
}
@Override
public RPatternTopicRx getPatternTopic(String pattern) {
return RxProxyBuilder.create(commandExecutor, new RedissonPatternTopic(commandExecutor, pattern), RPatternTopicRx.class);

@ -0,0 +1,94 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.api.listener.MessageListener;
/**
*
* Reliable topic based on Redis Stream object.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
* <p>
* @author Nikita Koksharov
*
*/
public interface RReliableTopic extends RExpirable, RReliableTopicAsync {
/**
* Amount of messages stored in Redis Stream object.
*
* @return amount of messages
*/
long size();
/**
* Publish the message to all subscribers of this topic asynchronously.
* Each subscriber may have multiple listeners.
*
* @param message to send
* @return number of subscribers that received the message
*/
long publish(Object message);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> method is called when any message
* is published on this topic.
* <p>
* Watchdog is started when listener was registered.
*
* @see org.redisson.config.Config#setReliableTopicWatchdogTimeout(long)
*
* @param <M> - type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see MessageListener
*/
<M> String addListener(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerIds - listener ids
*/
void removeListener(String... listenerIds);
/**
* Removes all listeners from this topic
*/
void removeAllListeners();
/**
* Returns amount of registered listeners to this topic
*
* @return amount of listeners
*/
int countListeners();
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
int countSubscribers();
}

@ -0,0 +1,88 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.api.listener.MessageListener;
/**
*
* Asynchronous interface for Reliable topic based on Redis Stream object.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
* <p>
* @author Nikita Koksharov
*
*/
public interface RReliableTopicAsync extends RExpirableAsync {
/**
* Amount of messages stored in Redis Stream object.
*
* @return amount of messages
*/
RFuture<Long> sizeAsync();
/**
* Publish the message to all subscribers of this topic asynchronously.
* Each subscriber may have multiple listeners.
*
* @param message to send
* @return number of subscribers that received the message
*/
RFuture<Long> publishAsync(Object message);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> method is called when any message
* is published on this topic.
* <p>
* Watchdog is started when listener was registered.
*
* @see org.redisson.config.Config#setReliableTopicWatchdogTimeout(long)
*
* @param <M> - type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see MessageListener
*/
<M> RFuture<String> addListenerAsync(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerIds - listener ids
* @return void
*/
RFuture<Void> removeListenerAsync(String... listenerIds);
/**
* Removes all listeners from this topic
*/
RFuture<Void> removeAllListenersAsync();
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
RFuture<Integer> countSubscribersAsync();
}

@ -0,0 +1,89 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.api.listener.MessageListener;
import reactor.core.publisher.Mono;
/**
*
* Reactive interface for Reliable topic based on Redis Stream object.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
* <p>
* @author Nikita Koksharov
*
*/
public interface RReliableTopicReactive extends RExpirableReactive {
/**
* Amount of messages stored in Redis Stream object.
*
* @return amount of messages
*/
Mono<Long> sizeAsync();
/**
* Publish the message to all subscribers of this topic asynchronously.
* Each subscriber may have multiple listeners.
*
* @param message to send
* @return number of subscribers that received the message
*/
Mono<Long> publishAsync(Object message);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> method is called when any message
* is published on this topic.
* <p>
* Watchdog is started when listener was registered.
*
* @see org.redisson.config.Config#setReliableTopicWatchdogTimeout(long)
*
* @param <M> - type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see MessageListener
*/
<M> Mono<String> addListenerAsync(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerIds - listener ids
* @return void
*/
Mono<Void> removeListenerAsync(String... listenerIds);
/**
* Removes all listeners from this topic
*/
Mono<Void> removeAllListenersAsync();
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
Mono<Integer> countSubscribersAsync();
}

@ -0,0 +1,90 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import io.reactivex.Completable;
import io.reactivex.Single;
import org.redisson.api.listener.MessageListener;
/**
*
* RxJava2 interface for Reliable topic based on Redis Stream object.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
* <p>
* @author Nikita Koksharov
*
*/
public interface RReliableTopicRx extends RExpirableRx {
/**
* Amount of messages stored in Redis Stream object.
*
* @return amount of messages
*/
Single<Long> sizeAsync();
/**
* Publish the message to all subscribers of this topic asynchronously.
* Each subscriber may have multiple listeners.
*
* @param message to send
* @return number of subscribers that received the message
*/
Single<Long> publishAsync(Object message);
/**
* Subscribes to this topic.
* <code>MessageListener.onMessage</code> method is called when any message
* is published on this topic.
* <p>
* Watchdog is started when listener was registered.
*
* @see org.redisson.config.Config#setReliableTopicWatchdogTimeout(long)
*
* @param <M> - type of message
* @param type - type of message
* @param listener for messages
* @return locally unique listener id
* @see MessageListener
*/
<M> Single<String> addListenerAsync(Class<M> type, MessageListener<M> listener);
/**
* Removes the listener by <code>id</code> for listening this topic
*
* @param listenerIds - listener ids
* @return void
*/
Completable removeListenerAsync(String... listenerIds);
/**
* Removes all listeners from this topic
*/
Completable removeAllListenersAsync();
/**
* Returns amount of subscribers to this topic across all Redisson instances.
* Each subscriber may have multiple listeners.
*
* @return amount of subscribers
*/
Single<Integer> countSubscribersAsync();
}

@ -587,7 +587,10 @@ public interface RedissonClient {
/**
* Returns topic instance by name.
*
* <p>
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
*
* @param name - name of object
* @return Topic object
*/
@ -596,6 +599,9 @@ public interface RedissonClient {
/**
* Returns topic instance by name
* using provided codec for messages.
* <p>
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
*
* @param name - name of object
* @param codec - codec for message
@ -603,6 +609,34 @@ public interface RedissonClient {
*/
RTopic getTopic(String name, Codec codec);
/**
* Returns reliable topic instance by name.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param name - name of object
* @return ReliableTopic object
*/
RReliableTopic getReliableTopic(String name);
/**
* Returns reliable topic instance by name
* using provided codec for messages.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param name - name of object
* @param codec - codec for message
* @return ReliableTopic object
*/
RReliableTopic getReliableTopic(String name, Codec codec);
/**
* Returns topic instance satisfies by pattern name.
*

@ -516,6 +516,34 @@ public interface RedissonReactiveClient {
*/
RTopicReactive getTopic(String name, Codec codec);
/**
* Returns reliable topic instance by name.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param name - name of object
* @return ReliableTopic object
*/
RReliableTopicReactive getReliableTopic(String name);
/**
* Returns reliable topic instance by name
* using provided codec for messages.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param name - name of object
* @param codec - codec for message
* @return ReliableTopic object
*/
RReliableTopicReactive getReliableTopic(String name, Codec codec);
/**
* Returns topic instance satisfies by pattern name.
*

@ -505,6 +505,34 @@ public interface RedissonRxClient {
*/
RTopicRx getTopic(String name, Codec codec);
/**
* Returns reliable topic instance by name.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param name - name of object
* @return ReliableTopic object
*/
RReliableTopicRx getReliableTopic(String name);
/**
* Returns reliable topic instance by name
* using provided codec for messages.
* <p>
* Dedicated Redis connection is allocated per instance (subscriber) of this object.
* Messages are delivered to all listeners attached to the same Redis setup.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param name - name of object
* @param codec - codec for message
* @return ReliableTopic object
*/
RReliableTopicRx getReliableTopic(String name, Codec codec);
/**
* Returns topic instance satisfies by pattern name.
*

@ -33,6 +33,7 @@ import java.io.InputStream;
import java.io.Reader;
import java.net.URL;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Redisson configuration
@ -82,6 +83,8 @@ public class Config {
private long lockWatchdogTimeout = 30 * 1000;
private long reliableTopicWatchdogTimeout = TimeUnit.MINUTES.toMillis(10);
private boolean keepPubSubOrder = true;
private boolean decodeInExecutor = false;
@ -130,6 +133,7 @@ public class Config {
setEventLoopGroup(oldConf.getEventLoopGroup());
setTransportMode(oldConf.getTransportMode());
setAddressResolverGroupFactory(oldConf.getAddressResolverGroupFactory());
setReliableTopicWatchdogTimeout(oldConf.getReliableTopicWatchdogTimeout());
if (oldConf.getSingleServerConfig() != null) {
setSingleServerConfig(new SingleServerConfig(oldConf.getSingleServerConfig()));
@ -810,4 +814,25 @@ public class Config {
this.useThreadClassLoader = useThreadClassLoader;
return this;
}
public long getReliableTopicWatchdogTimeout() {
return reliableTopicWatchdogTimeout;
}
/**
* Reliable Topic subscriber expires after <code>timeout</code> if watchdog
* didn't extend it to next <code>timeout</code> time interval.
* <p>
* This prevents against infinity grow of stored messages in topic due to Redisson client crush or
* any other reason when subscriber can't consumer messages anymore.
* <p>
* Default is 600000 milliseconds
*
* @param timeout timeout in milliseconds
* @return config
*/
public Config setReliableTopicWatchdogTimeout(long timeout) {
this.reliableTopicWatchdogTimeout = timeout;
return this;
}
}

@ -0,0 +1,85 @@
package org.redisson;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.redisson.api.RReliableTopic;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonReliableTopicTest extends BaseTest {
@Test
public void testAutoTrim() {
RReliableTopic rt = redisson.getReliableTopic("test1");
AtomicInteger counter = new AtomicInteger();
rt.addListener(Integer.class, (ch, m) -> {
counter.incrementAndGet();
});
RReliableTopic rt2 = redisson.getReliableTopic("test1");
rt2.addListener(Integer.class, (ch, m) -> {
counter.incrementAndGet();
});
for (int i = 0; i < 10; i++) {
assertThat(rt.publish(i)).isEqualTo(2);
}
Awaitility.waitAtMost(Duration.ofSeconds(1)).until(() -> counter.get() == 20);
assertThat(rt.size()).isEqualTo(0);
}
@Test
public void testListener() throws InterruptedException {
RReliableTopic rt = redisson.getReliableTopic("test2");
AtomicInteger i = new AtomicInteger();
String id = rt.addListener(String.class, (ch, m) -> {
i.incrementAndGet();
});
rt.publish("1");
assertThat(i).hasValue(1);
rt.removeListener(id);
assertThat(rt.publish("2")).isEqualTo(0);
Thread.sleep(5);
assertThat(i).hasValue(1);
}
@Test
public void test() throws InterruptedException {
RReliableTopic rt = redisson.getReliableTopic("test3");
CountDownLatch a = new CountDownLatch(1);
CountDownLatch twoMessages = new CountDownLatch(3);
rt.addListener(String.class, (ch, m) -> {
assertThat(m).isIn("m1", "m2");
a.countDown();
twoMessages.countDown();
});
assertThat(rt.publish("m1")).isEqualTo(1);
assertThat(a.await(1, TimeUnit.SECONDS)).isTrue();
assertThat(rt.size()).isEqualTo(0);
RReliableTopic rt2 = redisson.getReliableTopic("test3");
rt2.addListener(String.class, (ch, m) -> {
assertThat(m).isEqualTo("m2");
twoMessages.countDown();
});
assertThat(rt.publish("m2")).isEqualTo(2);
assertThat(twoMessages.await(1, TimeUnit.SECONDS)).isTrue();
Thread.sleep(5);
assertThat(rt.size()).isEqualTo(0);
}
}
Loading…
Cancel
Save