@ -31,18 +31,24 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy;
import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.BaseStatusListener;
import org.redisson.api.listener.MessageListener;
import org.redisson.cache.Cache;
import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.SoftCacheMap;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
@ -51,10 +57,14 @@ import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -65,8 +75,11 @@ import io.netty.util.internal.ThreadLocalRandom;
* @author Nikita Koksharov
public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements RLocalCachedMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(RedissonLocalCachedMap.class);
public static class LocalCachedMapClear implements Serializable {
@ -180,66 +193,147 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private static final RedisCommand<Set<Object>> ALL_KEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
private static final RedisCommand<Set<Object>> ALL_KEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>(), ValueType.MAP_KEY);
private static final RedisCommand<Set<Entry<Object, Object>>> ALL_ENTRIES = new RedisCommand<Set<Entry<Object, Object>>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Map<Object, Object>> ALL_MAP = new RedisCommand<Map<Object, Object>>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10);
private byte[] instanceId;
private RTopic<Object> invalidationTopic;
private Cache<CacheKey, CacheValue> cache;
private int invalidateEntryOnChange;
private int invalidationListenerId;
private int invalidationStatusListenerId;
private volatile long lastInvalidate;
protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options) {
super(id, commandExecutor, name);
init(id, name, options);
protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(id, commandExecutor, name, redisson);
init(id, name, options, redisson, evictionScheduler);
protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options) {
super(id, codec, connectionManager, name);
init(id, name, options);
protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) {
super(id, codec, connectionManager, name, redisson);
init(id, name, options, redisson, evictionScheduler);
private void init(UUID id, String name, LocalCachedMapOptions options) {
private void init(UUID id, String name, LocalCachedMapOptions options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
instanceId = generateId();
if (options.isInvalidateEntryOnChange()) {
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE
|| options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) {
invalidateEntryOnChange = 1;
if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
cache = new NoneCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
cache = new LRUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
cache = new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
if (options.getEvictionPolicy() == EvictionPolicy.SOFT) {
cache = new SoftCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) {
invalidateEntryOnChange = 2;
evictionScheduler.schedule(getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1));
cache = createCache(options);
addListeners(name, options, redisson);
private void addListeners(String name, final LocalCachedMapOptions options, final RedissonClient redisson) {
invalidationTopic = new RedissonTopic<Object>(commandExecutor, suffixName(name, "topic"));
if (options.isInvalidateEntryOnChange()) {
invalidationListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) {
if (options.getInvalidationPolicy() != InvalidationPolicy.ON_CHANGE) {
invalidationStatusListenerId = invalidationTopic.addListener(new BaseStatusListener() {
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
public void onSubscribe(String channel) {
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) {
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT
// check if instance has already been used
&& lastInvalidate > 0) {
if (System.currentTimeMillis() - lastInvalidate > cacheUpdateLogTime) {
isExistsAsync().addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (!future.getNow()) {
RScoredSortedSet<byte[]> logs = redisson.getScoredSortedSet(getUpdatesLogName(), ByteArrayCodec.INSTANCE);
logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true)
.addListener(new FutureListener<Collection<byte[]>>() {
public void operationComplete(Future<Collection<byte[]>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't load update log", future.cause());
for (byte[] entry : future.getNow()) {
byte[] keyHash = Arrays.copyOf(entry, 16);
CacheKey key = new CacheKey(keyHash);
invalidationListenerId = invalidationTopic.addListener(new MessageListener<Object>() {
public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapClear) {
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) {
lastInvalidate = System.currentTimeMillis();
protected Cache<CacheKey, CacheValue> createCache(LocalCachedMapOptions options) {
if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
return new NoneCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
return new LRUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
return new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
if (options.getEvictionPolicy() == EvictionPolicy.SOFT) {
return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
if (options.getEvictionPolicy() == EvictionPolicy.WEAK) {
return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy());
private CacheKey toCacheKey(Object key) {
@ -298,13 +392,30 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
protected byte[] generateId() {
String getUpdatesLogName() {
return prefixName("redisson__cache_updates_log", getName());
protected static byte[] generateId() {
byte[] id = new byte[16];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
return id;
protected static byte[] generateLogEntryId(byte[] keyHash) {
byte[] result = new byte[keyHash.length + 1 + 8];
result[16] = ':';
byte[] id = new byte[8];
// TODO JDK UPGRADE replace to native ThreadLocalRandom
System.arraycopy(keyHash, 0, result, 0, keyHash.length);
System.arraycopy(id, 0, result, 17, id.length);
return result;
public RFuture<V> putAsync(K key, V value) {
if (key == null) {
@ -316,17 +427,24 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 and ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "end; "
+ "return v; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
mapKey, encodeMapValue(value), msg, invalidateEntryOnChange);
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
mapKey, encodeMapValue(value), msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
@ -339,8 +457,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] encodedKey = encodeMapKey(key);
byte[] encodedValue = encodeMapKey(value);
byte[] encodedValue = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(encodedKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
@ -349,11 +468,15 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[3]); "
+ "end;"
+ "return 0; "
+ "end; "
+ "return 1; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
encodedKey, encodedValue, msg, invalidateEntryOnChange);
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
@ -361,6 +484,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (invalidationListenerId != 0) {
if (invalidationStatusListenerId != 0) {
@ -371,16 +497,23 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 and ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[2]); "
+ "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[2]); "
+ "end; "
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[4], ARGV[5]);"
+ "redis.call('publish', KEYS[2], ARGV[2]); "
+ "end;"
+ "end; "
+ "return v",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
keyEncoded, msgEncoded, invalidateEntryOnChange);
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyEncoded, msgEncoded, invalidateEntryOnChange, System.currentTimeMillis(), entryId);
@ -389,30 +522,60 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
throw new NullPointerException();
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 1, #ARGV, 2 do "
+ "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then "
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 1, #ARGV, 2 do "
+ "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then "
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
if (invalidateEntryOnChange == 2) {
List<Object> params = new ArrayList<Object>(keys.length*3);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0; " +
"for j = 2, #ARGV, 3 do "
+ "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[j+2]);"
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "counter = counter + 1;"
+ "end;"
+ "end;"
+ "return counter;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
List<Object> params = new ArrayList<Object>(keys.length + 1);
@ -433,12 +596,12 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] msgEncoded = encode(new LocalCachedMapClear());
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('del', KEYS[1]) == 1 and ARGV[2] == '1' then "
"if redis.call('del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then "
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1;"
+ "end; "
+ "return 0;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
msgEncoded, invalidateEntryOnChange);
@ -749,18 +912,30 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (invalidateEntryOnChange == 2) {
long time = System.currentTimeMillis();
for (byte[] hash : hashes) {
byte[] entryId = generateLogEntryId(hash);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes));
final RPromise<Void> result = newPromise();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));"
+ "if ARGV[1] == '1' then "
// + "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
// + "end; "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray());
+ "if ARGV[1] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;"
+ "if ARGV[1] == '2' then "
+ "redis.call('zadd', KEYS[3], unpack(ARGV, tonumber(ARGV[2]) + 2 + 1, #ARGV - 1));"
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
future.addListener(new FutureListener<Void>() {
@ -781,15 +956,19 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
final byte[] keyState = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyState);
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
RFuture<V> future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand<Object>("EVAL", new NumberConvertor(value.getClass())),
"local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return result; ",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg);
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<V>() {
@ -874,6 +1053,37 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return promise;
public RFuture<Map<K, V>> readAllMapAsync() {
final Map<K, V> result = new HashMap<K, V>();
List<Object> mapKeys = new ArrayList<Object>();
for (CacheValue value : cache.values()) {
result.put((K)value.getKey(), (V)value.getValue());
final RPromise<Map<K, V>> promise = newPromise();
RFuture<Map<K, V>> future = readAll(ALL_MAP, mapKeys, result);
future.addListener(new FutureListener<Map<K, V>>() {
public void operationComplete(Future<Map<K, V>> future) throws Exception {
if (!future.isSuccess()) {
for (java.util.Map.Entry<K, V> entry : future.getNow().entrySet()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
cache.put(cacheKey, new CacheValue(entry.getKey(), entry.getValue()));
return promise;
public RFuture<Set<Entry<K, V>>> readAllEntrySetAsync() {
final Set<Entry<K, V>> result = new HashSet<Entry<K, V>>();
@ -882,9 +1092,31 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
result.add(new AbstractMap.SimpleEntry<K, V>((K)value.getKey(), (V)value.getValue()));
final RPromise<Set<Entry<K, V>>> promise = newPromise();
RFuture<Set<Entry<K, V>>> future = commandExecutor.evalReadAsync(getName(), codec, ALL_ENTRIES,
RFuture<Set<Entry<K, V>>> future = readAll(ALL_ENTRIES, mapKeys, result);
future.addListener(new FutureListener<Set<Entry<K, V>>>() {
public void operationComplete(Future<Set<Entry<K, V>>> future) throws Exception {
if (!future.isSuccess()) {
for (java.util.Map.Entry<K, V> entry : future.getNow()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
cache.put(cacheKey, new CacheValue(entry.getKey(), entry.getValue()));
return promise;
private <R> RFuture<R> readAll(RedisCommand<?> evalCommandType, List<Object> mapKeys, R result) {
return commandExecutor.evalReadAsync(getName(), codec, evalCommandType,
"local entries = redis.call('hgetall', KEYS[1]); "
+ "local result = {};"
+ "for j, v in ipairs(entries) do "
@ -904,24 +1136,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "return result; ",
future.addListener(new FutureListener<Set<Entry<K, V>>>() {
public void operationComplete(Future<Set<Entry<K, V>>> future) throws Exception {
if (!future.isSuccess()) {
for (java.util.Map.Entry<K, V> entry : future.getNow()) {
CacheKey cacheKey = toCacheKey(entry.getKey());
cache.put(cacheKey, new CacheValue(entry.getKey(), entry.getValue()));
return promise;
@ -929,21 +1143,28 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<V> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return v; "
+ "else "
+ "return nil; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, valueState, invalidateEntryOnChange, msg);
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<V>() {
@ -968,6 +1189,7 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
byte[] oldValueState = encodeMapValue(oldValue);
byte[] newValueState = encodeMapValue(newValue);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -975,13 +1197,17 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "if ARGV[4] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[5]); "
+ "end; "
+ "end;"
+ "if ARGV[4] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[6], ARGV[7]);"
+ "redis.call('publish', KEYS[2], ARGV[5]); "
+ "end;"
+ "return 1; "
+ "else "
+ "return 0; "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg);
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<Boolean>() {
@ -1004,19 +1230,24 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end; "
+ "end;"
+ "if ARGV[3] == '2' then "
+ "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);"
+ "redis.call('publish', KEYS[2], ARGV[4]); "
+ "end;"
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
+ "else "
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0)),
keyState, valueState, invalidateEntryOnChange, msg);
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
future.addListener(new FutureListener<Boolean>() {