refactoring

pull/1423/head
Nikita 7 years ago
parent 4dcca4002f
commit 3e2087752e

@ -39,7 +39,7 @@ public class RedissonListMultimapCache<K, V> extends RedissonListMultimap<K, V>
private final RedissonMultimapCache<K> baseCache;
RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
public RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
if (evictionScheduler != null) {
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
@ -47,7 +47,7 @@ public class RedissonListMultimapCache<K, V> extends RedissonListMultimap<K, V>
baseCache = new RedissonMultimapCache<K>(connectionManager, this, getTimeoutSetName(), prefix);
}
RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
public RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
if (evictionScheduler != null) {
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());

@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.LocalCachedMapOptions;
@ -37,27 +36,18 @@ import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RFuture;
import org.redisson.api.RListMultimapCache;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.cache.Cache;
import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.LocalCacheListener;
import org.redisson.cache.LocalCachedMapClear;
import org.redisson.cache.LocalCachedMapDisable;
import org.redisson.cache.LocalCachedMapDisableAck;
import org.redisson.cache.LocalCachedMapDisabledKey;
import org.redisson.cache.LocalCachedMapEnable;
import org.redisson.cache.LocalCachedMapInvalidate;
import org.redisson.cache.LocalCachedMapUpdate;
import org.redisson.cache.LocalCachedMessageCodec;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
@ -73,12 +63,8 @@ import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
@ -95,8 +81,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public static final String DISABLED_KEYS_SUFFIX = "disabled-keys";
public static final String DISABLED_ACK_SUFFIX = ":topic";
private static final Logger log = LoggerFactory.getLogger(RedissonLocalCachedMap.class);
public static class CacheKey implements Serializable {
private final byte[] keyHash;
@ -188,15 +172,11 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10);
private byte[] instanceId;
private RTopic<Object> invalidationTopic;
private Cache<CacheKey, CacheValue> cache;
private Map<CacheKey, String> disabledKeys = new ConcurrentHashMap<CacheKey, String>();
private int invalidateEntryOnChange;
private int syncListenerId;
private int reconnectionListenerId;
private volatile long lastInvalidate;
private SyncStrategy syncStrategy;
private final Codec topicCodec = new LocalCachedMessageCodec();
private LocalCacheListener listener;
public RedissonLocalCachedMap(CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions<K, V> options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(commandExecutor, name, redisson, options);
@ -218,172 +198,27 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
invalidateEntryOnChange = 2;
evictionScheduler.schedule(getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1));
evictionScheduler.schedule(listener.getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1));
}
cache = createCache(options);
addListeners(name, options, redisson);
}
private void addListeners(final String name, final LocalCachedMapOptions<K, V> options, final RedissonClient redisson) {
invalidationTopic = new RedissonTopic<Object>(topicCodec, commandExecutor, suffixName(name, TOPIC_SUFFIX));
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) {
cache.clear();
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD
// check if instance has already been used
&& lastInvalidate > 0) {
if (System.currentTimeMillis() - lastInvalidate > cacheUpdateLogTime) {
cache.clear();
return;
}
isExistsAsync().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't check existance", future.cause());
return;
}
if (!future.getNow()) {
cache.clear();
return;
}
listener = new LocalCacheListener(name, commandExecutor, cache, this, instanceId, codec, options, cacheUpdateLogTime) {
RScoredSortedSet<byte[]> logs = redisson.getScoredSortedSet(getUpdatesLogName(), ByteArrayCodec.INSTANCE);
logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true)
.addListener(new FutureListener<Collection<byte[]>>() {
@Override
public void operationComplete(Future<Collection<byte[]>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't load update log", future.cause());
return;
}
for (byte[] entry : future.getNow()) {
byte[] keyHash = Arrays.copyOf(entry, 16);
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
});
}
});
}
}
});
}
if (options.getSyncStrategy() != SyncStrategy.NONE) {
syncListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapDisable) {
LocalCachedMapDisable m = (LocalCachedMapDisable) msg;
String requestId = m.getRequestId();
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (byte[] keyHash : ((LocalCachedMapDisable) msg).getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
keysToDisable.add(key);
}
disableKeys(requestId, keysToDisable, m.getTimeout());
RedissonTopic<Object> topic = new RedissonTopic<Object>(topicCodec, commandExecutor, suffixName(name, requestId + DISABLED_ACK_SUFFIX));
topic.publishAsync(new LocalCachedMapDisableAck());
}
if (msg instanceof LocalCachedMapEnable) {
LocalCachedMapEnable m = (LocalCachedMapEnable) msg;
for (byte[] keyHash : m.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
disabledKeys.remove(key, m.getRequestId());
}
}
if (msg instanceof LocalCachedMapClear) {
cache.clear();
}
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
}
if (msg instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg;
for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) {
ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
try {
protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException {
CacheKey cacheKey = toCacheKey(keyBuf);
Object key = codec.getMapKeyDecoder().decode(keyBuf, null);
Object value = codec.getMapValueDecoder().decode(valueBuf, null);
cachePut(cacheKey, key, value);
} catch (IOException e) {
log.error("Can't decode map entry", e);
} finally {
keyBuf.release();
valueBuf.release();
}
}
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
lastInvalidate = System.currentTimeMillis();
}
}
});
String disabledKeysName = RedissonObject.suffixName(getName(), DISABLED_KEYS_SUFFIX);
RListMultimapCache<LocalCachedMapDisabledKey, String> multimap = redisson.getListMultimapCache(disabledKeysName, getCodec());
for (LocalCachedMapDisabledKey key : multimap.readAllKeySet()) {
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (String hash : multimap.getAll(key)) {
CacheKey cacheKey = new CacheKey(ByteBufUtil.decodeHexDump(hash));
keysToDisable.add(cacheKey);
}
disableKeys(key.getRequestId(), keysToDisable, key.getTimeout());
}
}
}
private void disableKeys(final String requestId, final Set<CacheKey> keys, long timeout) {
for (CacheKey key : keys) {
disabledKeys.put(key, requestId);
cache.remove(key);
}
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
for (CacheKey cacheKey : keys) {
disabledKeys.remove(cacheKey, requestId);
}
}
}, timeout, TimeUnit.MILLISECONDS);
};
listener.add();
}
private void cachePut(CacheKey cacheKey, Object key, Object value) {
if (disabledKeys.containsKey(cacheKey)) {
if (listener.isDisabled(cacheKey)) {
return;
}
@ -471,10 +306,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
}
String getUpdatesLogName() {
return prefixName("redisson__cache_updates_log", getName());
}
protected static byte[] generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
@ -515,7 +346,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "return v; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
mapKey, mapValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
}
@ -547,18 +378,13 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "return 0; "
+ "end; "
+ "return 1; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
}
@Override
public void destroy() {
if (syncListenerId != 0) {
invalidationTopic.removeListener(syncListenerId);
}
if (reconnectionListenerId != 0) {
invalidationTopic.removeListener(reconnectionListenerId);
}
listener.remove();
}
@Override
@ -580,7 +406,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end;"
+ "end; "
+ "return v",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
keyEncoded, msgEncoded, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
}
@ -608,7 +434,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "table.insert(result, val);"
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName()),
params.toArray());
}
@ -639,7 +465,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "table.insert(result, val);"
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
params.toArray());
}
@ -688,7 +514,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName()),
params.toArray());
}
@ -718,7 +544,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
params.toArray());
}
@ -746,7 +572,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "return 1;"
+ "end; "
+ "return 0;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
msgEncoded, invalidateEntryOnChange);
}
@ -856,7 +682,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "end; "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
params.toArray());
future.addListener(new FutureListener<Void>() {
@ -890,7 +716,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return result; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<V>() {
@ -1027,21 +853,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public RFuture<Void> clearLocalCacheAsync() {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Long> future = invalidationTopic.publishAsync(new LocalCachedMapClear());
future.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
result.trySuccess(null);
}
});
return result;
return listener.clearLocalCacheAsync();
}
@Override
@ -1123,7 +935,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "else "
+ "return nil; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(key), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@ -1169,7 +981,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(key), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@ -1215,7 +1027,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
Arrays.<Object>asList(getName(key), listener.getInvalidationTopicName(), listener.getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}
@ -1262,7 +1074,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
@Override
public ByteBuf encode(Object value) {
try {
return topicCodec.getValueEncoder().encode(value);
return LocalCachedMessageCodec.INSTANCE.getValueEncoder().encode(value);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}

@ -0,0 +1,294 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonListMultimapCache;
import org.redisson.RedissonLocalCachedMap.CacheKey;
import org.redisson.RedissonObject;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.RedissonTopic;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.RFuture;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RListMultimapCache;
import org.redisson.api.RObject;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RTopic;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public abstract class LocalCacheListener {
public static final String TOPIC_SUFFIX = "topic";
public static final String DISABLED_KEYS_SUFFIX = "disabled-keys";
public static final String DISABLED_ACK_SUFFIX = ":topic";
private Map<CacheKey, String> disabledKeys = new ConcurrentHashMap<CacheKey, String>();
private static final Logger log = LoggerFactory.getLogger(LocalCacheListener.class);
private String name;
private CommandAsyncExecutor commandExecutor;
private Cache<?, ?> cache;
private RObject object;
private byte[] instanceId;
private Codec codec;
private LocalCachedMapOptions<?, ?> options;
private long cacheUpdateLogTime;
private volatile long lastInvalidate;
private RTopic<Object> invalidationTopic;
private int syncListenerId;
private int reconnectionListenerId;
public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor, Cache<?, ?> cache,
RObject object, byte[] instanceId, Codec codec, LocalCachedMapOptions<?, ?> options, long cacheUpdateLogTime) {
super();
this.name = name;
this.commandExecutor = commandExecutor;
this.cache = cache;
this.object = object;
this.instanceId = instanceId;
this.codec = codec;
this.options = options;
this.cacheUpdateLogTime = cacheUpdateLogTime;
}
public boolean isDisabled(Object key) {
return disabledKeys.containsKey(key);
}
public void add() {
invalidationTopic = new RedissonTopic<Object>(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
reconnectionListenerId = invalidationTopic.addListener(new BaseStatusListener() {
@Override
public void onSubscribe(String channel) {
if (options.getReconnectionStrategy() == ReconnectionStrategy.CLEAR) {
cache.clear();
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD
// check if instance has already been used
&& lastInvalidate > 0) {
if (System.currentTimeMillis() - lastInvalidate > cacheUpdateLogTime) {
cache.clear();
return;
}
object.isExistsAsync().addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't check existance", future.cause());
return;
}
if (!future.getNow()) {
cache.clear();
return;
}
RScoredSortedSet<byte[]> logs = new RedissonScoredSortedSet<byte[]>(ByteArrayCodec.INSTANCE, commandExecutor, getUpdatesLogName(), null);
logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true)
.addListener(new FutureListener<Collection<byte[]>>() {
@Override
public void operationComplete(Future<Collection<byte[]>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't load update log", future.cause());
return;
}
for (byte[] entry : future.getNow()) {
byte[] keyHash = Arrays.copyOf(entry, 16);
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
});
}
});
}
}
});
}
if (options.getSyncStrategy() != SyncStrategy.NONE) {
syncListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
@Override
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapDisable) {
LocalCachedMapDisable m = (LocalCachedMapDisable) msg;
String requestId = m.getRequestId();
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (byte[] keyHash : ((LocalCachedMapDisable) msg).getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
keysToDisable.add(key);
}
disableKeys(requestId, keysToDisable, m.getTimeout());
RedissonTopic<Object> topic = new RedissonTopic<Object>(LocalCachedMessageCodec.INSTANCE,
commandExecutor, RedissonObject.suffixName(name, requestId + DISABLED_ACK_SUFFIX));
topic.publishAsync(new LocalCachedMapDisableAck());
}
if (msg instanceof LocalCachedMapEnable) {
LocalCachedMapEnable m = (LocalCachedMapEnable) msg;
for (byte[] keyHash : m.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
disabledKeys.remove(key, m.getRequestId());
}
}
if (msg instanceof LocalCachedMapClear) {
cache.clear();
}
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
}
if (msg instanceof LocalCachedMapUpdate) {
LocalCachedMapUpdate updateMsg = (LocalCachedMapUpdate) msg;
for (LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) {
ByteBuf keyBuf = Unpooled.wrappedBuffer(entry.getKey());
ByteBuf valueBuf = Unpooled.wrappedBuffer(entry.getValue());
try {
updateCache(keyBuf, valueBuf);
} catch (IOException e) {
log.error("Can't decode map entry", e);
} finally {
keyBuf.release();
valueBuf.release();
}
}
}
if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) {
lastInvalidate = System.currentTimeMillis();
}
}
});
String disabledKeysName = RedissonObject.suffixName(name, DISABLED_KEYS_SUFFIX);
RListMultimapCache<LocalCachedMapDisabledKey, String> multimap = new RedissonListMultimapCache<LocalCachedMapDisabledKey, String>(null, codec, commandExecutor, disabledKeysName);
for (LocalCachedMapDisabledKey key : multimap.readAllKeySet()) {
Set<CacheKey> keysToDisable = new HashSet<CacheKey>();
for (String hash : multimap.getAll(key)) {
CacheKey cacheKey = new CacheKey(ByteBufUtil.decodeHexDump(hash));
keysToDisable.add(cacheKey);
}
disableKeys(key.getRequestId(), keysToDisable, key.getTimeout());
}
}
}
public RFuture<Void> clearLocalCacheAsync() {
final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Long> future = invalidationTopic.publishAsync(new LocalCachedMapClear());
future.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
result.trySuccess(null);
}
});
return result;
}
public String getInvalidationTopicName() {
return RedissonObject.suffixName(name, TOPIC_SUFFIX);
}
protected abstract void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException;
private void disableKeys(final String requestId, final Set<CacheKey> keys, long timeout) {
for (CacheKey key : keys) {
disabledKeys.put(key, requestId);
cache.remove(key);
}
commandExecutor.getConnectionManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
for (CacheKey cacheKey : keys) {
disabledKeys.remove(cacheKey, requestId);
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
public void remove() {
if (syncListenerId != 0) {
invalidationTopic.removeListener(syncListenerId);
}
if (reconnectionListenerId != 0) {
invalidationTopic.removeListener(reconnectionListenerId);
}
}
public String getUpdatesLogName() {
return RedissonObject.prefixName("redisson__cache_updates_log", name);
}
}
Loading…
Cancel
Save