Feature - Micronaut Session store should use sharded pubsub in Redis Cluster 7.0+ #5090

pull/5099/head
Nikita Koksharov 2 years ago
parent db34f65c0e
commit 6e668db5d3

@ -27,6 +27,7 @@ import io.micronaut.session.SessionStore;
import io.micronaut.session.event.SessionCreatedEvent;
import io.micronaut.session.event.SessionDeletedEvent;
import io.micronaut.session.event.SessionExpiredEvent;
import org.redisson.Redisson;
import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.PatternMessageListener;
@ -34,6 +35,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -86,7 +88,7 @@ public class RedissonSessionStore implements SessionStore<RedissonSession>, Patt
deletedTopic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE);
expiredTopic = redisson.getPatternTopic("__keyevent@*:expired", StringCodec.INSTANCE);
createdTopic = redisson.getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE);
createdTopic = getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE);
deletedTopic.addListener(String.class, this);
expiredTopic.addListener(String.class, this);
@ -239,7 +241,15 @@ public class RedissonSessionStore implements SessionStore<RedissonSession>, Patt
String keyPrefix = sessionConfiguration.getKeyPrefix();
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
final String name = keyPrefix + separator + "redisson:session_updates";
return redisson.getTopic(name);
return getTopic(name, getCodec());
}
private RTopic getTopic(String name, Codec codec) {
PublishSubscribeService ss = ((Redisson) redisson).getConnectionManager().getSubscribeService();
if (ss.isShardingSupported()) {
return redisson.getShardedTopic(name, codec);
}
return redisson.getTopic(name, codec);
}
public String getNodeId() {

@ -28,6 +28,7 @@ import io.micronaut.session.event.SessionCreatedEvent;
import io.micronaut.session.event.SessionDeletedEvent;
import io.micronaut.session.event.SessionExpiredEvent;
import jakarta.inject.Singleton;
import org.redisson.Redisson;
import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.PatternMessageListener;
@ -35,6 +36,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -86,7 +88,7 @@ public class RedissonSessionStore implements SessionStore<RedissonSession>, Patt
deletedTopic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE);
expiredTopic = redisson.getPatternTopic("__keyevent@*:expired", StringCodec.INSTANCE);
createdTopic = redisson.getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE);
createdTopic = getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE);
deletedTopic.addListener(String.class, this);
expiredTopic.addListener(String.class, this);
@ -239,7 +241,15 @@ public class RedissonSessionStore implements SessionStore<RedissonSession>, Patt
String keyPrefix = sessionConfiguration.getKeyPrefix();
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
final String name = keyPrefix + separator + "redisson:session_updates";
return redisson.getTopic(name);
return getTopic(name, getCodec());
}
private RTopic getTopic(String name, Codec codec) {
PublishSubscribeService ss = ((Redisson) redisson).getConnectionManager().getSubscribeService();
if (ss.isShardingSupported()) {
return redisson.getShardedTopic(name, codec);
}
return redisson.getTopic(name, codec);
}
public String getNodeId() {

Loading…
Cancel
Save