refactoring

pull/5732/head
Nikita Koksharov 11 months ago
parent 26ff36e6f6
commit 7dc9541297

@ -47,20 +47,20 @@ import java.util.concurrent.*;
*
*/
public abstract class LocalCacheListener {
public static final String TOPIC_SUFFIX = "topic";
public static final String DISABLED_KEYS_SUFFIX = "disabled-keys";
public static final String DISABLED_ACK_SUFFIX = ":topic";
private ConcurrentMap<CacheKey, String> disabledKeys = new ConcurrentHashMap<CacheKey, String>();
private static final Logger log = LoggerFactory.getLogger(LocalCacheListener.class);
private String name;
private CommandAsyncExecutor commandExecutor;
String name;
CommandAsyncExecutor commandExecutor;
private Map<CacheKey, ? extends CacheValue> cache;
private RObject object;
private byte[] instanceId;
byte[] instanceId;
private Codec codec;
private LocalCachedMapOptions<?, ?> options;
private final String keyeventPattern;
@ -80,7 +80,7 @@ public abstract class LocalCacheListener {
private boolean isSharded;
public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor,
RObject object, Codec codec, LocalCachedMapOptions<?, ?> options, long cacheUpdateLogTime, boolean isSharded) {
RObject object, Codec codec, LocalCachedMapOptions<?, ?> options, long cacheUpdateLogTime, boolean isSharded) {
super();
this.name = name;
this.commandExecutor = commandExecutor;
@ -93,62 +93,19 @@ public abstract class LocalCacheListener {
instanceId = commandExecutor.getServiceManager().generateIdArray();
}
public byte[] getInstanceId() {
return instanceId;
}
public ConcurrentMap<CacheKey, CacheValue> createCache(LocalCachedMapOptions<?, ?> options) {
if (options.getCacheProvider() == LocalCachedMapOptions.CacheProvider.CAFFEINE) {
Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder();
if (options.getTimeToLiveInMillis() > 0) {
caffeineBuilder.expireAfterWrite(options.getTimeToLiveInMillis(), TimeUnit.MILLISECONDS);
}
if (options.getMaxIdleInMillis() > 0) {
caffeineBuilder.expireAfterAccess(options.getMaxIdleInMillis(), TimeUnit.MILLISECONDS);
}
if (options.getCacheSize() > 0) {
caffeineBuilder.maximumSize(options.getCacheSize());
}
if (options.getEvictionPolicy() == LocalCachedMapOptions.EvictionPolicy.SOFT) {
caffeineBuilder.softValues();
}
if (options.getEvictionPolicy() == LocalCachedMapOptions.EvictionPolicy.WEAK) {
caffeineBuilder.weakValues();
}
return caffeineBuilder.<CacheKey, CacheValue>build().asMap();
}
if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
return new NoneCacheMap<>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
return new LRUCacheMap<>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
return new LFUCacheMap<>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.SOFT) {
return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.WEAK) {
return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy());
}
public boolean isDisabled(Object key) {
return disabledKeys.containsKey(key);
}
public void add(Map<CacheKey, ? extends CacheValue> cache) {
this.cache = cache;
if (isSharded) {
invalidationTopic = RedissonShardedTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
} else {
invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
}
createTopic(name, commandExecutor);
if (options.getExpirationEventPolicy() == LocalCachedMapOptions.ExpirationEventPolicy.SUBSCRIBE_WITH_KEYEVENT_PATTERN) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
@ -167,118 +124,142 @@ public abstract class LocalCacheListener {
}
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) {
cache.clear();
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD
// check if instance has already been used
&& lastInvalidate > 0) {
loadAfterReconnection();
}
}
});
reconnectionListenerId = addReconnectionListener();
}
if (options.getSyncStrategy() != SyncStrategy.NONE) {
syncListenerId = invalidationTopic.addListener(Object.class, new MessageListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
if (msg instanceof LocalCachedMapDisable) {
LocalCachedMapDisable m = (LocalCachedMapDisable) msg;
String requestId = m.getRequestId();
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (byte[] keyHash : ((LocalCachedMapDisable) msg).getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
keysToDisable.add(key);
}
disableKeys(requestId, keysToDisable, m.getTimeout());
RedissonTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(name, requestId + DISABLED_ACK_SUFFIX));
topic.publishAsync(new LocalCachedMapDisableAck());
}
if (msg instanceof LocalCachedMapEnable) {
LocalCachedMapEnable m = (LocalCachedMapEnable) msg;
for (byte[] keyHash : m.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
disabledKeys.remove(key, m.getRequestId());
}
}
if (msg instanceof LocalCachedMapClear) {
LocalCachedMapClear clearMsg = (LocalCachedMapClear) msg;
if (!Arrays.equals(clearMsg.getExcludedId(), instanceId)) {
cache.clear();
if (clearMsg.isReleaseSemaphore()) {
RSemaphore semaphore = getClearSemaphore(clearMsg.getRequestId());
semaphore.releaseAsync();
}
}
}
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate) msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
CacheValue value = cache.remove(key);
if (value == null) {
continue;
}
notifyInvalidate(value);
}
}
}
if (msg instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg;
if (!Arrays.equals(updateMsg.getExcludedId(), instanceId)) {
for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) {
ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
try {
CacheValue value = updateCache(keyBuf, valueBuf);
notifyUpdate(value);
} catch (IOException e) {
log.error("Can't decode map entry", e);
} finally {
keyBuf.release();
valueBuf.release();
}
}
}
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
lastInvalidate = System.currentTimeMillis();
}
}
syncListenerId = addMessageListener();
});
String disabledKeysName = RedissonObject.suffixName(name, DISABLED_KEYS_SUFFIX);
RListMultimapCache<LocalCachedMapDisabledKey, String> multimap = new RedissonListMultimapCache<LocalCachedMapDisabledKey, String>(null, codec, commandExecutor, disabledKeysName);
for (LocalCachedMapDisabledKey key : multimap.readAllKeySet()) {
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (String hash : multimap.getAll(key)) {
CacheKey cacheKey = new CacheKey(ByteBufUtil.decodeHexDump(hash));
keysToDisable.add(cacheKey);
}
disableKeys(key.getRequestId(), keysToDisable, key.getTimeout());
}
}
}
void createTopic(String name, CommandAsyncExecutor commandExecutor) {
if (isSharded) {
invalidationTopic = RedissonShardedTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
} else {
invalidationTopic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
}
}
int addMessageListener() {
return invalidationTopic.addListener(Object.class, new MessageListener<Object>() {
@Override
public void onMessage(CharSequence channel, Object msg) {
LocalCacheListener.this.onMessage(msg);
}
});
}
int addReconnectionListener() {
return invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
LocalCacheListener.this.onSubscribe();
}
});
}
final void onMessage(Object msg) {
if (msg instanceof LocalCachedMapDisable) {
LocalCachedMapDisable m = (LocalCachedMapDisable) msg;
String requestId = m.getRequestId();
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (byte[] keyHash : ((LocalCachedMapDisable) msg).getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
keysToDisable.add(key);
}
disableKeys(requestId, keysToDisable, m.getTimeout());
RedissonTopic topic = RedissonTopic.createRaw(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(name, requestId + DISABLED_ACK_SUFFIX));
topic.publishAsync(new LocalCachedMapDisableAck());
}
if (msg instanceof LocalCachedMapEnable) {
LocalCachedMapEnable m = (LocalCachedMapEnable) msg;
for (byte[] keyHash : m.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
disabledKeys.remove(key, m.getRequestId());
}
}
if (msg instanceof LocalCachedMapClear) {
LocalCachedMapClear clearMsg = (LocalCachedMapClear) msg;
if (!Arrays.equals(clearMsg.getExcludedId(), instanceId)) {
cache.clear();
if (clearMsg.isReleaseSemaphore()) {
RSemaphore semaphore = getClearSemaphore(clearMsg.getRequestId());
semaphore.releaseAsync();
}
}
}
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate) msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
CacheValue value = cache.remove(key);
if (value == null) {
continue;
}
notifyInvalidate(value);
}
}
}
if (msg instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg;
if (!Arrays.equals(updateMsg.getExcludedId(), instanceId)) {
for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) {
ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
try {
CacheValue value = updateCache(keyBuf, valueBuf);
notifyUpdate(value);
} catch (IOException e) {
log.error("Can't decode map entry", e);
} finally {
keyBuf.release();
valueBuf.release();
}
}
}
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
lastInvalidate = System.currentTimeMillis();
}
}
final void onSubscribe() {
if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) {
cache.clear();
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD
// check if instance has already been used
&& lastInvalidate > 0) {
loadAfterReconnection();
}
}
public void notifyUpdate(CacheValue value) {
for (LocalCacheUpdateListener listener : updateListeners.values()) {
listener.onUpdate(value.getKey(), value.getValue());
@ -298,7 +279,7 @@ public abstract class LocalCacheListener {
}
byte[] id = commandExecutor.getServiceManager().generateIdArray();
RFuture<Long> future = invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true));
RFuture<Long> future = publishAsync(id);
CompletionStage<Void> f = future.thenCompose(res -> {
if (res.intValue() == 0) {
return CompletableFuture.completedFuture(null);
@ -313,6 +294,10 @@ public abstract class LocalCacheListener {
return new CompletableFutureWrapper<>(f);
}
RFuture<Long> publishAsync(byte[] id) {
return invalidationTopic.publishAsync(new LocalCachedMapClear(instanceId, id, true));
}
public RTopic getInvalidationTopic() {
return invalidationTopic;
}
@ -322,20 +307,20 @@ public abstract class LocalCacheListener {
}
protected abstract CacheValue updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException;
private void disableKeys(final String requestId, final Set<CacheKey> keys, long timeout) {
for (CacheKey key : keys) {
disabledKeys.put(key, requestId);
cache.remove(key);
}
commandExecutor.getServiceManager().newTimeout(t -> {
for (CacheKey cacheKey : keys) {
disabledKeys.remove(cacheKey, requestId);
}
}, timeout, TimeUnit.MILLISECONDS);
}
public void remove() {
List<Integer> ids = new ArrayList<Integer>(2);
if (syncListenerId != 0) {
@ -344,7 +329,7 @@ public abstract class LocalCacheListener {
if (reconnectionListenerId != 0) {
ids.add(reconnectionListenerId);
}
invalidationTopic.removeListenerAsync(ids.toArray(new Integer[0]));
removeAsync(ids);
if (options.getExpirationEventPolicy() == LocalCachedMapOptions.ExpirationEventPolicy.SUBSCRIBE_WITH_KEYEVENT_PATTERN) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
@ -355,6 +340,10 @@ public abstract class LocalCacheListener {
}
}
void removeAsync(List<Integer> ids) {
invalidationTopic.removeListenerAsync(ids.toArray(new Integer[0]));
}
public String getUpdatesLogName() {
return RedissonObject.prefixName("redisson__cache_updates_log", name);
}
@ -364,32 +353,32 @@ public abstract class LocalCacheListener {
cache.clear();
return;
}
object.isExistsAsync().whenComplete((res, e) -> {
if (e != null) {
log.error("Can't check existance", e);
return;
}
if (!res) {
if (!res) {
cache.clear();
return;
}
RScoredSortedSet<byte[]> logs = new RedissonScoredSortedSet<>(ByteArrayCodec.INSTANCE, commandExecutor, getUpdatesLogName(), null);
logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true)
.whenComplete((r, ex) -> {
if (ex != null) {
log.error("Can't load update log", ex);
return;
}
for (byte[] entry : r) {
byte[] keyHash = Arrays.copyOf(entry, 16);
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
});
.whenComplete((r, ex) -> {
if (ex != null) {
log.error("Can't load update log", ex);
return;
}
for (byte[] entry : r) {
byte[] keyHash = Arrays.copyOf(entry, 16);
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
});
});
}

Loading…
Cancel
Save