From b011036b989cef648edcf682caf8b3aa53f22b30 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 7 Apr 2021 10:46:51 +0300 Subject: [PATCH] Feature - nameMapper setting added. #3103 --- .../java/org/redisson/RedissonDelayedQueue.java | 2 +- .../org/redisson/RedissonExecutorService.java | 2 +- .../src/main/java/org/redisson/RedissonTopic.java | 15 ++++++++++++++- .../org/redisson/cache/LocalCacheListener.java | 4 ++-- .../redisson/transaction/RedissonTransaction.java | 6 +++--- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java index 642af3068..096a1b1e4 100644 --- a/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java @@ -78,7 +78,7 @@ public class RedissonDelayedQueue extends RedissonExpirable implements RDelay @Override protected RTopic getTopic() { - return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName); + return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName); } }; diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 3012cf987..5a27675ce 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -243,7 +243,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { QueueTransferTask task = new QueueTransferTask(connectionManager) { @Override protected RTopic getTopic() { - return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, schedulerChannelName); + return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, schedulerChannelName); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonTopic.java b/redisson/src/main/java/org/redisson/RedissonTopic.java index 8cf8c9a04..558097e72 100644 --- a/redisson/src/main/java/org/redisson/RedissonTopic.java +++ b/redisson/src/main/java/org/redisson/RedissonTopic.java @@ -15,6 +15,7 @@ */ package org.redisson; +import org.redisson.api.NameMapper; import org.redisson.api.RFuture; import org.redisson.api.RTopic; import org.redisson.api.listener.MessageListener; @@ -56,9 +57,21 @@ public class RedissonTopic implements RTopic { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } + public static RedissonTopic createRaw(CommandAsyncExecutor commandExecutor, String name) { + return new RedissonTopic(commandExecutor.getConnectionManager().getCodec(), commandExecutor, NameMapper.direct(), name); + } + + public static RedissonTopic createRaw(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + return new RedissonTopic(codec, commandExecutor, NameMapper.direct(), name); + } + public RedissonTopic(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + this(codec, commandExecutor, commandExecutor.getConnectionManager().getConfig().getNameMapper(), name); + } + + public RedissonTopic(Codec codec, CommandAsyncExecutor commandExecutor, NameMapper nameMapper, String name) { this.commandExecutor = commandExecutor; - this.name = name; + this.name = nameMapper.map(name); this.channelName = new ChannelName(name); this.codec = codec; this.subscribeService = commandExecutor.getConnectionManager().getSubscribeService(); diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java index 30105c306..e23de5439 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java @@ -139,7 +139,7 @@ public abstract class LocalCacheListener { public void add(Map cache) { this.cache = cache; - invalidationTopic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName()); + invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName()); if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) { reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() { @@ -173,7 +173,7 @@ public abstract class LocalCacheListener { disableKeys(requestId, keysToDisable, m.getTimeout()); - RedissonTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, + RedissonTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, RedissonObject.suffixName(name, requestId + DISABLED_ACK_SUFFIX)); topic.publishAsync(new LocalCachedMapDisableAck()); } diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java index a16f987e5..bf4ec5477 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransaction.java @@ -373,7 +373,7 @@ public class RedissonTransaction implements RTransaction { CountDownLatch latch = new CountDownLatch(hashes.size()); List topics = new ArrayList<>(); for (Entry entry : hashes.entrySet()) { - RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, + RTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX)); topics.add(topic); topic.addListener(Object.class, new MessageListener() { @@ -473,8 +473,8 @@ public class RedissonTransaction implements RTransaction { List topics = new ArrayList<>(); for (Entry entry : hashes.entrySet()) { String disabledAckName = RedissonObject.suffixName(entry.getKey().getName(), requestId + RedissonLocalCachedMap.DISABLED_ACK_SUFFIX); - RTopic topic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, - commandExecutor, disabledAckName); + RTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, + commandExecutor, disabledAckName); topics.add(topic); RFuture topicFuture = topic.addListenerAsync(Object.class, new MessageListener() { @Override