Feature - LocalCachedMapOptions.useTopicPattern() setting added. #4039

pull/6236/head
Nikita Koksharov 5 months ago
parent 54655636a3
commit 77fdf510ef

@ -372,6 +372,7 @@ public final class Redisson implements RedissonClient {
.timeToLive(params.getTimeToLiveInMillis())
.syncStrategy(LocalCachedMapOptions.SyncStrategy.valueOf(params.getSyncStrategy().toString()))
.useObjectAsCacheKey(params.isUseObjectAsCacheKey())
.useTopicPattern(params.isUseTopicPattern())
.expirationEventPolicy(LocalCachedMapOptions.ExpirationEventPolicy.valueOf(params.getExpirationEventPolicy().toString()))
.writer(params.getWriter())
.writerAsync(params.getWriterAsync())

@ -41,7 +41,6 @@ import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.*;
@SuppressWarnings("serial")
public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements RLocalCachedMap<K, V> {
public static final String TOPIC_SUFFIX = "topic";
@ -87,7 +86,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
storeMode = options.getStoreMode();
storeCacheMiss = options.isStoreCacheMiss();
isUseObjectAsCacheKey = options.isUseObjectAsCacheKey();
publishCommand = commandExecutor.getConnectionManager().getSubscribeService().getPublishCommand();
localCacheView = new LocalCacheView<>(options, this);
cache = localCacheView.getCache();
cacheKeyMap = localCacheView.getCacheKeyMap();
@ -105,6 +103,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
};
listener.add(cache, cacheKeyMap);
instanceId = listener.getInstanceId();
publishCommand = listener.getPublishCommand();
if (options.getSyncStrategy() != SyncStrategy.NONE) {
invalidateEntryOnChange = 1;
@ -133,7 +132,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} else {
msg = new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash());
}
listener.getInvalidationTopic().publishAsync(msg);
listener.publishAsync(msg);
}
mapKey.release();
}
@ -422,7 +421,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (storeMode == LocalCachedMapOptions.StoreMode.LOCALCACHE) {
keyEncoded.release();
LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash());
listener.getInvalidationTopic().publishAsync(msg);
listener.publishAsync(msg);
V val = null;
if (value != null) {
@ -546,7 +545,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (val != null) {
count++;
LocalCachedMapInvalidate msg = new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash());
listener.getInvalidationTopic().publishAsync(msg);
listener.publishAsync(msg);
}
}
return new CompletableFutureWrapper<>(count);

@ -1169,6 +1169,7 @@ public class RedissonReactive implements RedissonReactiveClient {
.timeToLive(params.getTimeToLiveInMillis())
.syncStrategy(LocalCachedMapOptions.SyncStrategy.valueOf(params.getSyncStrategy().toString()))
.useObjectAsCacheKey(params.isUseObjectAsCacheKey())
.useTopicPattern(params.isUseTopicPattern())
.expirationEventPolicy(LocalCachedMapOptions.ExpirationEventPolicy.valueOf(params.getExpirationEventPolicy().toString()))
.writer(params.getWriter())
.writerAsync(params.getWriterAsync())

@ -1145,6 +1145,7 @@ public class RedissonRx implements RedissonRxClient {
.timeToLive(params.getTimeToLiveInMillis())
.syncStrategy(LocalCachedMapOptions.SyncStrategy.valueOf(params.getSyncStrategy().toString()))
.useObjectAsCacheKey(params.isUseObjectAsCacheKey())
.useTopicPattern(params.isUseTopicPattern())
.expirationEventPolicy(LocalCachedMapOptions.ExpirationEventPolicy.valueOf(params.getExpirationEventPolicy().toString()))
.writer(params.getWriter())
.writerAsync(params.getWriterAsync())

@ -162,6 +162,7 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
private boolean storeCacheMiss;
private ExpirationEventPolicy expirationEventPolicy;
private boolean useObjectAsCacheKey;
private boolean useTopicPattern;
protected LocalCachedMapOptions() {
}
@ -209,6 +210,7 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
.syncStrategy(SyncStrategy.INVALIDATE)
.storeCacheMiss(false)
.useObjectAsCacheKey(false)
.useTopicPattern(false)
.expirationEventPolicy(ExpirationEventPolicy.SUBSCRIBE_WITH_KEYEVENT_PATTERN);
}
@ -396,6 +398,10 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
return useObjectAsCacheKey;
}
public boolean isUseTopicPattern() {
return useTopicPattern;
}
/**
* Defines whether to store a cache miss into the local cache.
*
@ -419,6 +425,18 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
return this;
}
/**
* Defines whether to use a global topic pattern listener
* that applies to all local cache instances belonging to the same Redisson instance.
*
* @param value whether to use a global topic pattern listener
* @return LocalCachedMapOptions instance
*/
public LocalCachedMapOptions<K, V> useTopicPattern(boolean value) {
this.useTopicPattern = value;
return this;
}
/**
* Use {@link #expirationEventPolicy(ExpirationEventPolicy)} instead
*

@ -271,5 +271,13 @@ public interface LocalCachedMapOptions<K, V> extends ExMapOptions<LocalCachedMap
*/
LocalCachedMapOptions<K, V> useObjectAsCacheKey(boolean useObjectAsCacheKey);
/**
* Defines whether to use a global topic pattern listener
* that applies to all local cache instances belonging to the same Redisson instance.
*
* @param value whether to use a global topic pattern listener
* @return LocalCachedMapOptions instance
*/
LocalCachedMapOptions<K, V> useTopicPattern(boolean value);
}

@ -41,6 +41,7 @@ public final class LocalCachedMapParams<K, V> extends BaseMapOptions<LocalCached
private ExpirationEventPolicy expirationEventPolicy = ExpirationEventPolicy.SUBSCRIBE_WITH_KEYEVENT_PATTERN;
private boolean useObjectAsCacheKey;
private boolean useTopicPattern;
LocalCachedMapParams(String name) {
this.name = name;
@ -240,6 +241,12 @@ public final class LocalCachedMapParams<K, V> extends BaseMapOptions<LocalCached
return this;
}
@Override
public LocalCachedMapOptions<K, V> useTopicPattern(boolean value) {
this.useTopicPattern = value;
return this;
}
public ExpirationEventPolicy getExpirationEventPolicy() {
return expirationEventPolicy;
}
@ -251,4 +258,8 @@ public final class LocalCachedMapParams<K, V> extends BaseMapOptions<LocalCached
public boolean isUseObjectAsCacheKey() {
return useObjectAsCacheKey;
}
public boolean isUseTopicPattern() {
return useTopicPattern;
}
}

@ -22,13 +22,11 @@ import org.redisson.*;
import org.redisson.api.*;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.LocalCacheInvalidateListener;
import org.redisson.api.listener.LocalCacheUpdateListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.*;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
@ -68,6 +66,7 @@ public abstract class LocalCacheListener {
private long cacheUpdateLogTime;
private volatile long lastInvalidate;
private RTopic invalidationTopic;
private RPatternTopic patternTopic;
private int syncListenerId;
private int reconnectionListenerId;
@ -154,24 +153,44 @@ public abstract class LocalCacheListener {
}
void createTopic(String name, CommandAsyncExecutor commandExecutor) {
if (isSharded) {
if (isSharded && !options.isUseTopicPattern()) {
invalidationTopic = RedissonShardedTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
} else {
invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
}
if (options.isUseTopicPattern()) {
patternTopic = new RedissonPatternTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, "*:topic");
}
}
int addMessageListener() {
return invalidationTopic.addListener(Object.class, new MessageListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
LocalCacheListener.this.onMessage(msg);
}
});
if (patternTopic != null) {
return patternTopic.addListener(Object.class,
(pattern, channel, msg) -> {
if (!getInvalidationTopicName().equals(channel.toString())) {
return;
}
LocalCacheListener.this.onMessage(msg);
});
}
return invalidationTopic.addListener(Object.class,
(channel, msg) -> LocalCacheListener.this.onMessage(msg));
}
int addReconnectionListener() {
if (patternTopic != null) {
return patternTopic.addListener(new PatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
LocalCacheListener.this.onSubscribe();
}
@Override
public void onPUnsubscribe(String pattern) {
// skip
}
});
}
return invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
@ -315,11 +334,18 @@ public abstract class LocalCacheListener {
}
RFuture<Long> publishAsync(byte[] id) {
return invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true));
return publishAsync(new LocalCachedMapClear(instanceId, id, true));
}
public RFuture<Long> publishAsync(Object msg) {
return invalidationTopic.publishAsync(msg);
}
public RTopic getInvalidationTopic() {
return invalidationTopic;
public String getPublishCommand() {
if (isSharded && !options.isUseTopicPattern()) {
return RedisCommands.SPUBLISH.getName();
}
return RedisCommands.PUBLISH.getName();
}
public String getInvalidationTopicName() {
@ -364,6 +390,11 @@ public abstract class LocalCacheListener {
}
void removeAsync(List<Integer> ids) {
if (patternTopic != null) {
patternTopic.removeListenerAsync(ids.toArray(new Integer[0]));
return;
}
invalidationTopic.removeListenerAsync(ids.toArray(new Integer[0]));
}

Loading…
Cancel
Save