Feature - nameMapper setting added. #3103

pull/3941/head
Nikita Koksharov 4 years ago
parent 5951da9810
commit b011036b98

@ -78,7 +78,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
@Override
protected RTopic getTopic() {
return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);
return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
}
};

@ -243,7 +243,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
QueueTransferTask task = new QueueTransferTask(connectionManager) {
@Override
protected RTopic getTopic() {
return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, schedulerChannelName);
return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, schedulerChannelName);
}
@Override

@ -15,6 +15,7 @@
*/
package org.redisson;
import org.redisson.api.NameMapper;
import org.redisson.api.RFuture;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
@ -56,9 +57,21 @@ public class RedissonTopic implements RTopic {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
public static RedissonTopic createRaw(CommandAsyncExecutor commandExecutor, String name) {
return new RedissonTopic(commandExecutor.getConnectionManager().getCodec(), commandExecutor, NameMapper.direct(), name);
}
public static RedissonTopic createRaw(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
return new RedissonTopic(codec, commandExecutor, NameMapper.direct(), name);
}
public RedissonTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
this(codec, commandExecutor, commandExecutor.getConnectionManager().getConfig().getNameMapper(), name);
}
public RedissonTopic(Codec codec, CommandAsyncExecutor commandExecutor, NameMapper nameMapper, String name) {
this.commandExecutor = commandExecutor;
this.name = name;
this.name = nameMapper.map(name);
this.channelName = new ChannelName(name);
this.codec = codec;
this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService();

@ -139,7 +139,7 @@ public abstract class LocalCacheListener {
public void add(Map<?, ?> cache) {
this.cache = cache;
invalidationTopic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@ -173,7 +173,7 @@ public abstract class LocalCacheListener {
disableKeys(requestId, keysToDisable, m.getTimeout());
RedissonTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE,
RedissonTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(name, requestId + DISABLED_ACK_SUFFIX));
topic.publishAsync(new LocalCachedMapDisableAck());
}

@ -373,7 +373,7 @@ public class RedissonTransaction implements RTransaction {
CountDownLatch latch = new CountDownLatch(hashes.size());
List<RTopic> topics = new ArrayList<>();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE,
RTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX));
topics.add(topic);
topic.addListener(Object.class, new MessageListener<Object>() {
@ -473,8 +473,8 @@ public class RedissonTransaction implements RTransaction {
List<RTopic> topics = new ArrayList<>();
for (Entry<HashKey, HashValue> entry : hashes.entrySet()) {
String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX);
RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE,
commandExecutor, disabledAckName);
RTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE,
commandExecutor, disabledAckName);
topics.add(topic);
RFuture<Integer> topicFuture = topic.addListenerAsync(Object.class, new MessageListener<Object>() {
@Override

Loading…
Cancel
Save