Feature - LocalCachedMapOptions.useKeyEventsPattern() setting introduced. #5183

pull/5239/head^2
Nikita Koksharov 2 years ago
parent c4ae3f27e4
commit 594434fa6c

@ -139,6 +139,7 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
private CacheProvider cacheProvider;
private StoreMode storeMode;
private boolean storeCacheMiss;
private boolean useKeyEventsPattern;
protected LocalCachedMapOptions() {
}
@ -183,7 +184,8 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
.cacheProvider(CacheProvider.REDISSON)
.storeMode(StoreMode.LOCALCACHE_REDIS)
.syncStrategy(SyncStrategy.INVALIDATE)
.storeCacheMiss(false);
.storeCacheMiss(false)
.useKeyEventsPattern(true);
}
public CacheProvider getCacheProvider() {
@ -377,6 +379,21 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
return this;
}
public boolean isUseKeyEventsPattern() {
return useKeyEventsPattern;
}
/**
* Defines whether to use __keyevent pattern topic to listen for expired events.
*
* @param useKeyEventsPattern - whether to use __keyevent pattern topic
* @return LocalCachedMapOptions instance
*/
public LocalCachedMapOptions<K, V> useKeyEventsPattern(boolean useKeyEventsPattern) {
this.useKeyEventsPattern = useKeyEventsPattern;
return this;
}
@Override
public LocalCachedMapOptions<K, V> writeBehindBatchSize(int writeBehindBatchSize) {
return (LocalCachedMapOptions<K, V>) super.writeBehindBatchSize(writeBehindBatchSize);

@ -63,7 +63,7 @@ public abstract class LocalCacheListener {
private byte[] instanceId;
private Codec codec;
private LocalCachedMapOptions<?, ?> options;
private final String pattern;
private final String keyeventPattern;
private long cacheUpdateLogTime;
private volatile long lastInvalidate;
@ -89,7 +89,7 @@ public abstract class LocalCacheListener {
this.options = options;
this.cacheUpdateLogTime = cacheUpdateLogTime;
this.isSharded = isSharded;
this.pattern = "__keyspace@" + commandExecutor.getServiceManager().getConfig().getDatabase() + "__:" + name;
this.keyeventPattern = "__keyspace@" + commandExecutor.getServiceManager().getConfig().getDatabase() + "__:" + name;
instanceId = commandExecutor.getServiceManager().generateIdArray();
}
@ -150,12 +150,21 @@ public abstract class LocalCacheListener {
invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
}
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, pattern);
expireListenerId = topic.addListener(String.class, (pattern, channel, msg) -> {
if (msg.equals("expired")) {
cache.clear();
}
});
if (options.isUseKeyEventsPattern()) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
expireListenerId = topic.addListener(String.class, (pattern, channel, msg) -> {
if (msg.equals(name)) {
cache.clear();
}
});
} else {
RTopic topic = new RedissonTopic(StringCodec.INSTANCE, commandExecutor, keyeventPattern);
expireListenerId = topic.addListener(String.class, (channel, msg) -> {
if (msg.equals("expired")) {
cache.clear();
}
});
}
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@ -340,8 +349,13 @@ public abstract class LocalCacheListener {
}
invalidationTopic.removeListenerAsync(ids.toArray(new Integer[0]));
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, pattern);
topic.removeListenerAsync(expireListenerId);
if (options.isUseKeyEventsPattern()) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
topic.removeListenerAsync(expireListenerId);
} else {
RTopic topic = new RedissonTopic(StringCodec.INSTANCE, commandExecutor, keyeventPattern);
topic.removeListenerAsync(expireListenerId);
}
}
public String getUpdatesLogName() {

Loading…
Cancel
Save