diff --git a/redisson-micronaut/redisson-micronaut-20/src/main/java/org/redisson/micronaut/session/RedissonSessionStore.java b/redisson-micronaut/redisson-micronaut-20/src/main/java/org/redisson/micronaut/session/RedissonSessionStore.java index effd1a4e8..720cbb353 100644 --- a/redisson-micronaut/redisson-micronaut-20/src/main/java/org/redisson/micronaut/session/RedissonSessionStore.java +++ b/redisson-micronaut/redisson-micronaut-20/src/main/java/org/redisson/micronaut/session/RedissonSessionStore.java @@ -27,6 +27,7 @@ import io.micronaut.session.SessionStore; import io.micronaut.session.event.SessionCreatedEvent; import io.micronaut.session.event.SessionDeletedEvent; import io.micronaut.session.event.SessionExpiredEvent; +import org.redisson.Redisson; import org.redisson.api.*; import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.PatternMessageListener; @@ -34,6 +35,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.IntegerCodec; import org.redisson.client.codec.StringCodec; import org.redisson.codec.CompositeCodec; +import org.redisson.pubsub.PublishSubscribeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +88,7 @@ public class RedissonSessionStore implements SessionStore, Patt deletedTopic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE); expiredTopic = redisson.getPatternTopic("__keyevent@*:expired", StringCodec.INSTANCE); - createdTopic = redisson.getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE); + createdTopic = getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE); deletedTopic.addListener(String.class, this); expiredTopic.addListener(String.class, this); @@ -239,7 +241,15 @@ public class RedissonSessionStore implements SessionStore, Patt String keyPrefix = sessionConfiguration.getKeyPrefix(); String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":"; final String name = keyPrefix + separator + "redisson:session_updates"; - return redisson.getTopic(name); + return getTopic(name, getCodec()); + } + + private RTopic getTopic(String name, Codec codec) { + PublishSubscribeService ss = ((Redisson) redisson).getConnectionManager().getSubscribeService(); + if (ss.isShardingSupported()) { + return redisson.getShardedTopic(name, codec); + } + return redisson.getTopic(name, codec); } public String getNodeId() { diff --git a/redisson-micronaut/redisson-micronaut-30/src/main/java/org/redisson/micronaut/session/RedissonSessionStore.java b/redisson-micronaut/redisson-micronaut-30/src/main/java/org/redisson/micronaut/session/RedissonSessionStore.java index 3a300a1dd..99c4584e6 100644 --- a/redisson-micronaut/redisson-micronaut-30/src/main/java/org/redisson/micronaut/session/RedissonSessionStore.java +++ b/redisson-micronaut/redisson-micronaut-30/src/main/java/org/redisson/micronaut/session/RedissonSessionStore.java @@ -28,6 +28,7 @@ import io.micronaut.session.event.SessionCreatedEvent; import io.micronaut.session.event.SessionDeletedEvent; import io.micronaut.session.event.SessionExpiredEvent; import jakarta.inject.Singleton; +import org.redisson.Redisson; import org.redisson.api.*; import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.PatternMessageListener; @@ -35,6 +36,7 @@ import org.redisson.client.codec.Codec; import org.redisson.client.codec.IntegerCodec; import org.redisson.client.codec.StringCodec; import org.redisson.codec.CompositeCodec; +import org.redisson.pubsub.PublishSubscribeService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +88,7 @@ public class RedissonSessionStore implements SessionStore, Patt deletedTopic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE); expiredTopic = redisson.getPatternTopic("__keyevent@*:expired", StringCodec.INSTANCE); - createdTopic = redisson.getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE); + createdTopic = getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE); deletedTopic.addListener(String.class, this); expiredTopic.addListener(String.class, this); @@ -239,7 +241,15 @@ public class RedissonSessionStore implements SessionStore, Patt String keyPrefix = sessionConfiguration.getKeyPrefix(); String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":"; final String name = keyPrefix + separator + "redisson:session_updates"; - return redisson.getTopic(name); + return getTopic(name, getCodec()); + } + + private RTopic getTopic(String name, Codec codec) { + PublishSubscribeService ss = ((Redisson) redisson).getConnectionManager().getSubscribeService(); + if (ss.isShardingSupported()) { + return redisson.getShardedTopic(name, codec); + } + return redisson.getTopic(name, codec); } public String getNodeId() {