InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT option added

Nikita 8 years ago
parent 0a876bd5fa
commit ffac9cb35d

@ -253,12 +253,12 @@ public class Redisson implements RedissonClient {
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(id, connectionManager.getCommandExecutor(), name, options, this);
return new RedissonLocalCachedMap<K, V>(id, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this);
public <K, V> RLocalCachedMap<K, V> getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options) {
return new RedissonLocalCachedMap<K, V>(id, codec, connectionManager.getCommandExecutor(), name, options, this);
return new RedissonLocalCachedMap<K, V>(id, codec, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this);

@ -31,12 +31,14 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.BaseStatusListener;
@ -46,6 +48,7 @@ import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
@ -57,8 +60,11 @@ import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -72,6 +78,8 @@ import io.netty.util.internal.ThreadLocalRandom;
public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements RLocalCachedMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(RedissonLocalCachedMap.class);
public static class LocalCachedMapClear implements Serializable {
@ -185,69 +193,128 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private static final RedisCommand<Set<Object>> ALL_KEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
private static final RedisCommand<Set<Object>> ALL_KEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>(), ValueType.MAP_KEY);
private static final RedisCommand<Set<Entry<Object, Object>>> ALL_ENTRIES = new RedisCommand<Set<Entry<Object, Object>>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Map<Object, Object>> ALL_MAP = new RedisCommand<Map<Object, Object>>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10);
private byte[] instanceId;
private RTopic<Object> invalidationTopic;
private Cache<CacheKey, CacheValue> cache;
private int invalidateEntryOnChange;
private int invalidationListenerId;
private int invalidationStatusListenerId;
private volatile long lastInvalidate;
protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, RedissonClient redisson) {
protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(id, commandExecutor, name, redisson);
init(id, name, options);
init(id, name, options, redisson, evictionScheduler);
protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, RedissonClient redisson) {
protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(id, codec, connectionManager, name, redisson);
init(id, name, options);
init(id, name, options, redisson, evictionScheduler);
private void init(UUID id, String name, LocalCachedMapOptions options) {
private void init(UUID id, String name, LocalCachedMapOptions options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
instanceId = generateId();
if (options.getInvalidationPolicy() != InvalidationPolicy.NONE) {
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE
|| options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) {
invalidateEntryOnChange = 1;
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) {
invalidateEntryOnChange = 2;
evictionScheduler.schedule(getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1));
cache = createCache(options);
addListeners(name, options, redisson);
private void addListeners(String name, LocalCachedMapOptions options, RedissonClient redisson) {
invalidationTopic = new RedissonTopic<Object>(commandExecutor, suffixName(name, "topic"));
if (options.getInvalidationPolicy() != InvalidationPolicy.NONE) {
if (options.getInvalidationPolicy() != InvalidationPolicy.ON_CHANGE) {
invalidationStatusListenerId = invalidationTopic.addListener(new BaseStatusListener() {
public void onSubscribe(String channel) {
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) {
invalidationListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) {
if (options.getInvalidationPolicy() != InvalidationPolicy.ON_CHANGE) {
invalidationStatusListenerId = invalidationTopic.addListener(new BaseStatusListener() {
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
public void onSubscribe(String channel) {
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) {
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT
// check if instance has already been used
&& lastInvalidate > 0) {
if (System.currentTimeMillis() - lastInvalidate > cacheUpdateLogTime) {
isExistsAsync().addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (!future.getNow()) {
RScoredSortedSet<byte[]> logs = redisson.getScoredSortedSet(getUpdatesLogName(), ByteArrayCodec.INSTANCE);
logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true)
.addListener(new FutureListener<Collection<byte[]>>() {
public void operationComplete(Future<Collection<byte[]>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't load update log", future.cause());
for (byte[] entry : future.getNow()) {
byte[] keyHash = Arrays.copyOf(entry, 16);
CacheKey key = new CacheKey(keyHash);
invalidationListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) {
lastInvalidate = System.currentTimeMillis();
protected Cache<CacheKey, CacheValue> createCache(LocalCachedMapOptions options) {
@ -325,13 +392,30 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
protected byte[] generateId() {
String getUpdatesLogName() {
return prefixName("redisson__cache_updates_log", getName());
protected static byte[] generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
return id;
protected static byte[] generateLogEntryId(byte[] keyHash) {
byte[] result = new byte[keyHash.length + 1 + 8];
result[16] = ':';
byte[] id = new byte[8];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
System.arraycopy(keyHash, 0, result, 0, keyHash.length);
System.arraycopy(id, 0, result, 17, id.length);
return result;
public RFuture<V> putAsync(K key, V value) {
if (key == null) {
@ -343,17 +427,24 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"local v ='hget', KEYS[1], ARGV[1]); "
+ "if'hset', KEYS[1], ARGV[1], ARGV[2]) == 0 and ARGV[4] == '1' then "
+ "'publish', KEYS[2], ARGV[3]); "
+ "if'hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "if ARGV[4] == '1' then "
+ "'publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "'zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "'publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "end; "
+ "return v; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
mapKey, encodeMapValue(value), msg, invalidateEntryOnChange);
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
mapKey, encodeMapValue(value), msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
@ -368,6 +459,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] encodedKey = encodeMapKey(key);
byte[] encodedValue = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(encodedKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
@ -376,11 +468,15 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "if ARGV[4] == '1' then "
+ "'publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "'zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "'publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "return 0; "
+ "end; "
+ "return 1; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
encodedKey, encodedValue, msg, invalidateEntryOnChange);
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
@ -401,16 +497,23 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
"local v ='hget', KEYS[1], ARGV[1]); "
+ "if'hdel', KEYS[1], ARGV[1]) == 1 and ARGV[3] == '1' then "
+ "'publish', KEYS[2], ARGV[2]); "
+ "if'hdel', KEYS[1], ARGV[1]) == 1 then "
+ "if ARGV[3] == '1' then "
+ "'publish', KEYS[2], ARGV[2]); "
+ "end; "
+ "if ARGV[3] == '2' then "
+ "'zadd', KEYS[3], ARGV[4], ARGV[5]);"
+ "'publish', KEYS[2], ARGV[2]); "
+ "end;"
+ "end; "
+ "return v",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
keyEncoded, msgEncoded, invalidateEntryOnChange);
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyEncoded, msgEncoded, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
@ -419,30 +522,60 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
throw new NullPointerException();
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 1, #ARGV, 2 do "
+ "if'hdel', KEYS[1], ARGV[j]) == 1 then "
+ "'publish', KEYS[2], ARGV[j+1]); "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 1, #ARGV, 2 do "
+ "if'hdel', KEYS[1], ARGV[j]) == 1 then "
+ "'publish', KEYS[2], ARGV[j+1]); "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
if (invalidateEntryOnChange == 2) {
List<Object> params = new ArrayList<Object>(keys.length*3);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 2, #ARGV, 3 do "
+ "if'hdel', KEYS[1], ARGV[j]) == 1 then "
+ "'zadd', KEYS[3], ARGV[1], ARGV[j+2]);"
+ "'publish', KEYS[2], ARGV[j+1]); "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
List<Object> params = new ArrayList<Object>(keys.length + 1);
@ -463,12 +596,12 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] msgEncoded = encode(new LocalCachedMapClear());
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if'del', KEYS[1]) == 1 and ARGV[2] == '1' then "
"if'del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then "
+ "'publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end; "
+ "return 0;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
msgEncoded, invalidateEntryOnChange);
@ -779,18 +912,30 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (invalidateEntryOnChange == 2) {
long time = System.currentTimeMillis();
for (byte[] hash : hashes) {
byte[] entryId = generateLogEntryId(hash);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes));
final RPromise<Void> result = newPromise();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"'hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));"
+ "if ARGV[1] == '1' then "
// + "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do "
+ "'publish', KEYS[2], ARGV[#ARGV]); "
// + "end; "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray());
+ "if ARGV[1] == '1' then "
+ "'publish', KEYS[2], ARGV[#ARGV]); "
+ "end;"
+ "if ARGV[1] == '2' then "
+ "'zadd', KEYS[3], unpack(ARGV, tonumber(ARGV[2]) + 2 + 1, #ARGV - 1));"
+ "'publish', KEYS[2], ARGV[#ARGV]); "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
future.addListener(new FutureListener<Void>() {
@ -811,15 +956,19 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
final byte[] keyState = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyState);
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RFuture<V> future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
"local result ='HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "'publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "'publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "'zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "'publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return result; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg);
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<V>() {
@ -994,21 +1143,28 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<V> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if'hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v ='hget', KEYS[1], ARGV[1]); "
+ "'hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "'publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "'zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "'publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return v; "
+ "else "
+ "return nil; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, valueState, invalidateEntryOnChange, msg);
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<V>() {
@ -1033,6 +1189,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] oldValueState = encodeMapValue(oldValue);
byte[] newValueState = encodeMapValue(newValue);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -1040,13 +1197,17 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "'hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "if ARGV[4] == '1' then "
+ "'publish', KEYS[2], ARGV[5]); "
+ "end; "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "'zadd', KEYS[3], ARGV[6], ARGV[7]);"
+ "'publish', KEYS[2], ARGV[5]); "
+ "end;"
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg);
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<Boolean>() {
@ -1069,19 +1230,24 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if'hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "if ARGV[3] == '1' then "
+ "'publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "'zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "'publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return'hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, valueState, invalidateEntryOnChange, msg);
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<Boolean>() {

@ -67,7 +67,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(commandExecutor, name);
if (evictionScheduler != null) {
evictionScheduler.schedule(getName(), 0);
this.redisson = redisson;
@ -75,7 +75,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
public RedissonSetCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) {
super(codec, commandExecutor, name);
if (evictionScheduler != null) {
evictionScheduler.schedule(getName(), 0);
this.redisson = redisson;

@ -25,8 +25,65 @@ import java.util.concurrent.TimeUnit;
public class LocalCachedMapOptions {
public enum EvictionPolicy {NONE, LRU, LFU, SOFT, WEAK};
public enum InvalidationPolicy {
* No invalidation on map changes
* Invalidate cache entry across all LocalCachedMap instances on map entry change.
* Invalidate cache entry across all LocalCachedMap instances on map entry change.
* <p>
* Clear cache if LocalCachedMap instance has been disconnected for a while.
* Invalidate cache entry across all LocalCachedMap instances on map entry change.
* <p>
* Store invalidated entry hash in invalidation log for 10 minutes.
* Cache keys for stored invalidated entry hashes will be removed
* if LocalCachedMap instance has been disconnected less than 10 minutes
* or whole cache will be cleaned otherwise.
public enum EvictionPolicy {
* Cache without eviction.
* Least Recently Used cache.
* Least Frequently Used cache.
* Cache with Soft Reference used for values.
* All references will be collected by GC
* Cache with Weak Reference used for values.
* All references will be collected by GC
private InvalidationPolicy invalidationPolicy;
private EvictionPolicy evictionPolicy;
@ -63,7 +120,7 @@ public class LocalCachedMapOptions {
return new LocalCachedMapOptions()
public EvictionPolicy getEvictionPolicy() {

@ -55,8 +55,8 @@ public class EvictionScheduler {
public void schedule(String name) {
EvictionTask task = new SetCacheEvictionTask(name, executor);
public void schedule(String name, long shiftInMilliseconds) {
EvictionTask task = new ScoredSetEvictionTask(name, executor, shiftInMilliseconds);
EvictionTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask == null) {

@ -25,18 +25,20 @@ import org.redisson.command.CommandAsyncExecutor;
* @author Nikita Koksharov
public class SetCacheEvictionTask extends EvictionTask {
public class ScoredSetEvictionTask extends EvictionTask {
private final String name;
private final long shiftInMilliseconds;
public SetCacheEvictionTask(String name, CommandAsyncExecutor executor) {
public ScoredSetEvictionTask(String name, CommandAsyncExecutor executor, long shiftInMilliseconds) {
super(executor); = name;
this.shiftInMilliseconds = shiftInMilliseconds;
RFuture<Integer> execute() {
return executor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.ZREMRANGEBYSCORE, name, 0, System.currentTimeMillis());
return executor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.ZREMRANGEBYSCORE, name, 0, System.currentTimeMillis() - shiftInMilliseconds);

@ -6,7 +6,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@ -18,10 +17,10 @@ import org.redisson.RedissonMapTest.SimpleKey;
import org.redisson.RedissonMapTest.SimpleValue;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.cache.Cache;
import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.cache.Cache;
import mockit.Deencapsulation;
@ -62,7 +61,7 @@ public class RedissonLocalCachedMapTest extends BaseTest {
HashSet actualValuesSet = new HashSet<>(m.readAllValues());
Set<Object> actualValuesSet = new HashSet<>(m.readAllValues());
Assert.assertEquals(expectedValuesSet, actualValuesSet);
Map<String, Integer> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
@ -91,7 +90,10 @@ public class RedissonLocalCachedMapTest extends BaseTest {
public void testInvalidationOnClear() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true);
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
@ -125,7 +127,10 @@ public class RedissonLocalCachedMapTest extends BaseTest {
public void testInvalidationOnUpdate() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true);
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
@ -151,7 +156,11 @@ public class RedissonLocalCachedMapTest extends BaseTest {
public void testNoInvalidationOnUpdate() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(false);
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
@ -177,7 +186,11 @@ public class RedissonLocalCachedMapTest extends BaseTest {
public void testNoInvalidationOnRemove() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(false);
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
@ -200,10 +213,10 @@ public class RedissonLocalCachedMapTest extends BaseTest {
public void testInvalidationOnRemove() throws InterruptedException {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true);
LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5);
RLocalCachedMap<String, Integer> map1 = redisson.getLocalCachedMap("test", options);
Cache<CacheKey, CacheValue> cache1 = Deencapsulation.getField(map1, "cache");
@ -576,7 +589,7 @@ public class RedissonLocalCachedMapTest extends BaseTest {
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);
