From ffac9cb35d798d9f69c82dd94026cc8ec3dac18b Mon Sep 17 00:00:00 2001 From: Nikita Date: Tue, 25 Apr 2017 17:21:17 +0300 Subject: [PATCH] InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT option added --- .../src/main/java/org/redisson/Redisson.java | 4 +- .../org/redisson/RedissonLocalCachedMap.java | 334 +++++++++++++----- .../java/org/redisson/RedissonSetCache.java | 4 +- .../redisson/api/LocalCachedMapOptions.java | 63 +++- .../redisson/eviction/EvictionScheduler.java | 4 +- ...onTask.java => ScoredSetEvictionTask.java} | 8 +- .../redisson/RedissonLocalCachedMapTest.java | 35 +- 7 files changed, 345 insertions(+), 107 deletions(-) rename redisson/src/main/java/org/redisson/eviction/{SetCacheEvictionTask.java => ScoredSetEvictionTask.java} (79%) diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 24d819500..f3b225ed3 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -253,12 +253,12 @@ public class Redisson implements RedissonClient { @Override public RLocalCachedMap getLocalCachedMap(String name, LocalCachedMapOptions options) { - return new RedissonLocalCachedMap(id, connectionManager.getCommandExecutor(), name, options, this); + return new RedissonLocalCachedMap(id, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this); } @Override public RLocalCachedMap getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options) { - return new RedissonLocalCachedMap(id, codec, connectionManager.getCommandExecutor(), name, options, this); + return new RedissonLocalCachedMap(id, codec, connectionManager.getCommandExecutor(), name, options, evictionScheduler, this); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index f4fe6f4b7..8700e6b71 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -31,12 +31,14 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy; import org.redisson.api.RFuture; import org.redisson.api.RLocalCachedMap; +import org.redisson.api.RScoredSortedSet; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.BaseStatusListener; @@ -46,6 +48,7 @@ import org.redisson.cache.LFUCacheMap; import org.redisson.cache.LRUCacheMap; import org.redisson.cache.NoneCacheMap; import org.redisson.cache.ReferenceCacheMap; +import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.StringCodec; @@ -57,8 +60,11 @@ import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.eviction.EvictionScheduler; import org.redisson.misc.Hash; import org.redisson.misc.RPromise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -72,6 +78,8 @@ import io.netty.util.internal.ThreadLocalRandom; @SuppressWarnings("serial") public class RedissonLocalCachedMap extends RedissonMap implements RLocalCachedMap { + private static final Logger log = LoggerFactory.getLogger(RedissonLocalCachedMap.class); + public static class LocalCachedMapClear implements Serializable { } @@ -185,69 +193,128 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } - private static final RedisCommand> ALL_KEYS = new RedisCommand>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); + private static final RedisCommand> ALL_KEYS = new RedisCommand>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY); private static final RedisCommand>> ALL_ENTRIES = new RedisCommand>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP); private static final RedisCommand> ALL_MAP = new RedisCommand>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP); private static final RedisCommand EVAL_PUT = new RedisCommand("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE); private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE); + private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10); private byte[] instanceId; private RTopic invalidationTopic; private Cache cache; private int invalidateEntryOnChange; private int invalidationListenerId; private int invalidationStatusListenerId; + private volatile long lastInvalidate; - protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, RedissonClient redisson) { + protected RedissonLocalCachedMap(UUID id, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { super(id, commandExecutor, name, redisson); - init(id, name, options); + init(id, name, options, redisson, evictionScheduler); } - protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, RedissonClient redisson) { + protected RedissonLocalCachedMap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options, EvictionScheduler evictionScheduler, RedissonClient redisson) { super(id, codec, connectionManager, name, redisson); - init(id, name, options); + init(id, name, options, redisson, evictionScheduler); } - private void init(UUID id, String name, LocalCachedMapOptions options) { + private void init(UUID id, String name, LocalCachedMapOptions options, RedissonClient redisson, EvictionScheduler evictionScheduler) { instanceId = generateId(); - if (options.getInvalidationPolicy() != InvalidationPolicy.NONE) { + if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE + || options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) { invalidateEntryOnChange = 1; } + if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) { + invalidateEntryOnChange = 2; + evictionScheduler.schedule(getUpdatesLogName(), cacheUpdateLogTime + TimeUnit.MINUTES.toMillis(1)); + } cache = createCache(options); + addListeners(name, options, redisson); + } + + private void addListeners(String name, LocalCachedMapOptions options, RedissonClient redisson) { invalidationTopic = new RedissonTopic(commandExecutor, suffixName(name, "topic")); - if (options.getInvalidationPolicy() != InvalidationPolicy.NONE) { - if (options.getInvalidationPolicy() != InvalidationPolicy.ON_CHANGE) { - invalidationStatusListenerId = invalidationTopic.addListener(new BaseStatusListener() { - @Override - public void onSubscribe(String channel) { - if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) { - cache.clear(); - } - } - }); - } - - invalidationListenerId = invalidationTopic.addListener(new MessageListener() { + + if (options.getInvalidationPolicy() == InvalidationPolicy.NONE) { + return; + } + + if (options.getInvalidationPolicy() != InvalidationPolicy.ON_CHANGE) { + invalidationStatusListenerId = invalidationTopic.addListener(new BaseStatusListener() { @Override - public void onMessage(String channel, Object msg) { - if (msg instanceof LocalCachedMapClear) { + public void onSubscribe(String channel) { + if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_CLEAR_ON_RECONNECT) { cache.clear(); } - if (msg instanceof LocalCachedMapInvalidate) { - LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg; - if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) { - for (byte[] keyHash : invalidateMsg.getKeyHashes()) { - CacheKey key = new CacheKey(keyHash); - cache.remove(key); - } + if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT + // check if instance has already been used + && lastInvalidate > 0) { + + if (System.currentTimeMillis() - lastInvalidate > cacheUpdateLogTime) { + cache.clear(); + return; } + + isExistsAsync().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + + if (!future.getNow()) { + cache.clear(); + return; + } + + RScoredSortedSet logs = redisson.getScoredSortedSet(getUpdatesLogName(), ByteArrayCodec.INSTANCE); + logs.valueRangeAsync(lastInvalidate, true, Double.POSITIVE_INFINITY, true) + .addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + log.error("Can't load update log", future.cause()); + return; + } + + for (byte[] entry : future.getNow()) { + byte[] keyHash = Arrays.copyOf(entry, 16); + CacheKey key = new CacheKey(keyHash); + cache.remove(key); + } + } + }); + } + }); + } } }); } + + invalidationListenerId = invalidationTopic.addListener(new MessageListener() { + @Override + public void onMessage(String channel, Object msg) { + if (msg instanceof LocalCachedMapClear) { + cache.clear(); + } + if (msg instanceof LocalCachedMapInvalidate) { + LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg; + if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) { + for (byte[] keyHash : invalidateMsg.getKeyHashes()) { + CacheKey key = new CacheKey(keyHash); + cache.remove(key); + } + } + if (options.getInvalidationPolicy() == InvalidationPolicy.ON_CHANGE_WITH_LOAD_ON_RECONNECT) { + lastInvalidate = System.currentTimeMillis(); + } + } + } + }); } protected Cache createCache(LocalCachedMapOptions options) { @@ -325,13 +392,30 @@ public class RedissonLocalCachedMap extends RedissonMap implements R return future; } - protected byte[] generateId() { + String getUpdatesLogName() { + return prefixName("redisson__cache_updates_log", getName()); + } + + protected static byte[] generateId() { byte[] id = new byte[16]; // TODO JDK UPGRADE replace to native ThreadLocalRandom ThreadLocalRandom.current().nextBytes(id); return id; } + protected static byte[] generateLogEntryId(byte[] keyHash) { + byte[] result = new byte[keyHash.length + 1 + 8]; + result[16] = ':'; + byte[] id = new byte[8]; + // TODO JDK UPGRADE replace to native ThreadLocalRandom + ThreadLocalRandom.current().nextBytes(id); + + System.arraycopy(keyHash, 0, result, 0, keyHash.length); + System.arraycopy(id, 0, result, 17, id.length); + return result; + } + + @Override public RFuture putAsync(K key, V value) { if (key == null) { @@ -343,17 +427,24 @@ public class RedissonLocalCachedMap extends RedissonMap implements R byte[] mapKey = encodeMapKey(key); CacheKey cacheKey = toCacheKey(mapKey); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); CacheValue cacheValue = new CacheValue(key, value); cache.put(cacheKey, cacheValue); return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT, "local v = redis.call('hget', KEYS[1], ARGV[1]); " - + "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 and ARGV[4] == '1' then " - + "redis.call('publish', KEYS[2], ARGV[3]); " + + "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then " + + "if ARGV[4] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[3]); " + + "end;" + + "if ARGV[4] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call('publish', KEYS[2], ARGV[3]); " + + "end;" + "end; " + "return v; ", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), - mapKey, encodeMapValue(value), msg, invalidateEntryOnChange); + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + mapKey, encodeMapValue(value), msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId); } @Override @@ -368,6 +459,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R byte[] encodedKey = encodeMapKey(key); byte[] encodedValue = encodeMapValue(value); CacheKey cacheKey = toCacheKey(encodedKey); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); CacheValue cacheValue = new CacheValue(key, value); cache.put(cacheKey, cacheValue); @@ -376,11 +468,15 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "if ARGV[4] == '1' then " + "redis.call('publish', KEYS[2], ARGV[3]); " + "end;" + + "if ARGV[4] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call('publish', KEYS[2], ARGV[3]); " + + "end;" + "return 0; " + "end; " + "return 1; ", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), - encodedKey, encodedValue, msg, invalidateEntryOnChange); + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + encodedKey, encodedValue, msg, invalidateEntryOnChange, System.currentTimeMillis(), entryId); } @Override @@ -401,16 +497,23 @@ public class RedissonLocalCachedMap extends RedissonMap implements R byte[] keyEncoded = encodeMapKey(key); CacheKey cacheKey = toCacheKey(keyEncoded); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); cache.remove(cacheKey); return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE, "local v = redis.call('hget', KEYS[1], ARGV[1]); " - + "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 and ARGV[3] == '1' then " - + "redis.call('publish', KEYS[2], ARGV[2]); " + + "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then " + + "if ARGV[3] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[2]); " + + "end; " + + "if ARGV[3] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[4], ARGV[5]);" + + "redis.call('publish', KEYS[2], ARGV[2]); " + + "end;" + "end; " + "return v", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), - keyEncoded, msgEncoded, invalidateEntryOnChange); + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + keyEncoded, msgEncoded, invalidateEntryOnChange, System.currentTimeMillis(), entryId); } @Override @@ -419,30 +522,60 @@ public class RedissonLocalCachedMap extends RedissonMap implements R throw new NullPointerException(); } - if (invalidateEntryOnChange == 1) { - List params = new ArrayList(keys.length*2); - for (K k : keys) { - byte[] keyEncoded = encodeMapKey(k); - params.add(keyEncoded); - - CacheKey cacheKey = toCacheKey(keyEncoded); - cache.remove(cacheKey); - byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); - params.add(msgEncoded); + if (invalidateEntryOnChange == 1) { + List params = new ArrayList(keys.length*2); + for (K k : keys) { + byte[] keyEncoded = encodeMapKey(k); + params.add(keyEncoded); + + CacheKey cacheKey = toCacheKey(keyEncoded); + cache.remove(cacheKey); + byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); + params.add(msgEncoded); + } + + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, + "local counter = 0; " + + "for j = 1, #ARGV, 2 do " + + "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then " + + "redis.call('publish', KEYS[2], ARGV[j+1]); " + + "counter = counter + 1;" + + "end;" + + "end;" + + "return counter;", + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), + params.toArray()); } - return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, - "local counter = 0; " + - "for j = 1, #ARGV, 2 do " - + "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then " - + "redis.call('publish', KEYS[2], ARGV[j+1]); " - + "counter = counter + 1;" - + "end;" - + "end;" - + "return counter;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), - params.toArray()); - } + if (invalidateEntryOnChange == 2) { + List params = new ArrayList(keys.length*3); + params.add(System.currentTimeMillis()); + for (K k : keys) { + byte[] keyEncoded = encodeMapKey(k); + params.add(keyEncoded); + + CacheKey cacheKey = toCacheKey(keyEncoded); + cache.remove(cacheKey); + byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); + params.add(msgEncoded); + + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); + params.add(entryId); + } + + return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, + "local counter = 0; " + + "for j = 2, #ARGV, 3 do " + + "if redis.call('hdel', KEYS[1], ARGV[j]) == 1 then " + + "redis.call('zadd', KEYS[3], ARGV[1], ARGV[j+2]);" + + "redis.call('publish', KEYS[2], ARGV[j+1]); " + + "counter = counter + 1;" + + "end;" + + "end;" + + "return counter;", + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + params.toArray()); + } List params = new ArrayList(keys.length + 1); params.add(getName()); @@ -463,12 +596,12 @@ public class RedissonLocalCachedMap extends RedissonMap implements R cache.clear(); byte[] msgEncoded = encode(new LocalCachedMapClear()); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, - "if redis.call('del', KEYS[1]) == 1 and ARGV[2] == '1' then " + "if redis.call('del', KEYS[1], KEYS[3]) > 0 and ARGV[2] ~= '0' then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1;" + "end; " + "return 0;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), msgEncoded, invalidateEntryOnChange); } @@ -779,18 +912,30 @@ public class RedissonLocalCachedMap extends RedissonMap implements R i++; } + if (invalidateEntryOnChange == 2) { + long time = System.currentTimeMillis(); + for (byte[] hash : hashes) { + byte[] entryId = generateLogEntryId(hash); + params.add(time); + params.add(entryId); + } + } + byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes)); params.add(msgEncoded); final RPromise result = newPromise(); RFuture future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, "redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));" - + "if ARGV[1] == '1' then " -// + "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do " - + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " -// + "end; " - + "end;", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray()); + + "if ARGV[1] == '1' then " + + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + + "end;" + + "if ARGV[1] == '2' then " + + "redis.call('zadd', KEYS[3], unpack(ARGV, tonumber(ARGV[2]) + 2 + 1, #ARGV - 1));" + + "redis.call('publish', KEYS[2], ARGV[#ARGV]); " + + "end;", + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + params.toArray()); future.addListener(new FutureListener() { @Override @@ -811,15 +956,19 @@ public class RedissonLocalCachedMap extends RedissonMap implements R final byte[] keyState = encodeMapKey(key); CacheKey cacheKey = toCacheKey(keyState); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); - + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); RFuture future = commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, new RedisCommand("EVAL", new NumberConvertor(value.getClass())), "local result = redis.call('HINCRBYFLOAT', KEYS[1], ARGV[1], ARGV[2]); " + "if ARGV[3] == '1' then " - + "redis.call('publish', KEYS[2], ARGV[4]); " - + "end; " + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end;" + + "if ARGV[3] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end;" + "return result; ", - Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)), - keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg); + Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + keyState, new BigDecimal(value.toString()).toPlainString(), invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); future.addListener(new FutureListener() { @Override @@ -994,21 +1143,28 @@ public class RedissonLocalCachedMap extends RedissonMap implements R final byte[] keyState = encodeMapKey(key); byte[] valueState = encodeMapValue(value); final CacheKey cacheKey = toCacheKey(keyState); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); RFuture future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE, "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then " + "local v = redis.call('hget', KEYS[1], ARGV[1]); " + "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " + + "if ARGV[3] == '1' then " + "redis.call('publish', KEYS[2], ARGV[4]); " - + "end; " + + "end;" + + "if ARGV[3] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end;" + + "return v; " + "else " + "return nil; " + "end", - Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0)), - keyState, valueState, invalidateEntryOnChange, msg); + Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); future.addListener(new FutureListener() { @Override @@ -1033,6 +1189,7 @@ public class RedissonLocalCachedMap extends RedissonMap implements R byte[] oldValueState = encodeMapValue(oldValue); byte[] newValueState = encodeMapValue(newValue); final CacheKey cacheKey = toCacheKey(keyState); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); RFuture future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, @@ -1040,13 +1197,17 @@ public class RedissonLocalCachedMap extends RedissonMap implements R + "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); " + "if ARGV[4] == '1' then " + "redis.call('publish', KEYS[2], ARGV[5]); " - + "end; " + + "end;" + + "if ARGV[4] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[6], ARGV[7]);" + + "redis.call('publish', KEYS[2], ARGV[5]); " + + "end;" + "return 1; " + "else " + "return 0; " + "end", - Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0)), - keyState, oldValueState, newValueState, invalidateEntryOnChange, msg); + Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); future.addListener(new FutureListener() { @Override @@ -1069,19 +1230,24 @@ public class RedissonLocalCachedMap extends RedissonMap implements R final byte[] keyState = encodeMapKey(key); byte[] valueState = encodeMapValue(value); final CacheKey cacheKey = toCacheKey(keyState); + byte[] entryId = generateLogEntryId(cacheKey.getKeyHash()); byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); RFuture future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then " + "if ARGV[3] == '1' then " + "redis.call('publish', KEYS[2], ARGV[4]); " - + "end; " + + "end;" + + "if ARGV[3] == '2' then " + + "redis.call('zadd', KEYS[3], ARGV[5], ARGV[6]);" + + "redis.call('publish', KEYS[2], ARGV[4]); " + + "end;" + "return redis.call('hdel', KEYS[1], ARGV[1]) " + "else " + "return 0 " + "end", - Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0)), - keyState, valueState, invalidateEntryOnChange, msg); + Arrays.asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()), + keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId); future.addListener(new FutureListener() { @Override diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index c9b461ee1..cb0418d0e 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -67,7 +67,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(commandExecutor, name); if (evictionScheduler != null) { - evictionScheduler.schedule(getName()); + evictionScheduler.schedule(getName(), 0); } this.redisson = redisson; } @@ -75,7 +75,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< public RedissonSetCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name, RedissonClient redisson) { super(codec, commandExecutor, name); if (evictionScheduler != null) { - evictionScheduler.schedule(getName()); + evictionScheduler.schedule(getName(), 0); } this.redisson = redisson; } diff --git a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java index 54241d691..7517f64bf 100644 --- a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java +++ b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java @@ -25,8 +25,65 @@ import java.util.concurrent.TimeUnit; */ public class LocalCachedMapOptions { - public enum InvalidationPolicy {NONE, ON_CHANGE, ON_CHANGE_WITH_CLEAR_ON_RECONNECT} - public enum EvictionPolicy {NONE, LRU, LFU, SOFT, WEAK}; + public enum InvalidationPolicy { + + /** + * No invalidation on map changes + */ + NONE, + + /** + * Invalidate cache entry across all LocalCachedMap instances on map entry change. + */ + ON_CHANGE, + + /** + * Invalidate cache entry across all LocalCachedMap instances on map entry change. + *

+ * Clear cache if LocalCachedMap instance has been disconnected for a while. + */ + ON_CHANGE_WITH_CLEAR_ON_RECONNECT, + + /** + * Invalidate cache entry across all LocalCachedMap instances on map entry change. + *

+ * Store invalidated entry hash in invalidation log for 10 minutes. + * Cache keys for stored invalidated entry hashes will be removed + * if LocalCachedMap instance has been disconnected less than 10 minutes + * or whole cache will be cleaned otherwise. + */ + ON_CHANGE_WITH_LOAD_ON_RECONNECT + } + + public enum EvictionPolicy { + + /** + * Cache without eviction. + */ + NONE, + + /** + * Least Recently Used cache. + */ + LRU, + + /** + * Least Frequently Used cache. + */ + LFU, + + /** + * Cache with Soft Reference used for values. + * All references will be collected by GC + */ + SOFT, + + /** + * Cache with Weak Reference used for values. + * All references will be collected by GC + */ + WEAK + }; private InvalidationPolicy invalidationPolicy; private EvictionPolicy evictionPolicy; @@ -63,7 +120,7 @@ public class LocalCachedMapOptions { return new LocalCachedMapOptions() .cacheSize(0).timeToLive(0).maxIdle(0) .evictionPolicy(EvictionPolicy.NONE) - .invalidateEntryOnChange(true); + .invalidationPolicy(InvalidationPolicy.ON_CHANGE); } public EvictionPolicy getEvictionPolicy() { diff --git a/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java b/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java index d51cd8042..6f229ff90 100644 --- a/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java +++ b/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java @@ -55,8 +55,8 @@ public class EvictionScheduler { } } - public void schedule(String name) { - EvictionTask task = new SetCacheEvictionTask(name, executor); + public void schedule(String name, long shiftInMilliseconds) { + EvictionTask task = new ScoredSetEvictionTask(name, executor, shiftInMilliseconds); EvictionTask prevTask = tasks.putIfAbsent(name, task); if (prevTask == null) { task.schedule(); diff --git a/redisson/src/main/java/org/redisson/eviction/SetCacheEvictionTask.java b/redisson/src/main/java/org/redisson/eviction/ScoredSetEvictionTask.java similarity index 79% rename from redisson/src/main/java/org/redisson/eviction/SetCacheEvictionTask.java rename to redisson/src/main/java/org/redisson/eviction/ScoredSetEvictionTask.java index 826c879bc..f7ea0e8b3 100644 --- a/redisson/src/main/java/org/redisson/eviction/SetCacheEvictionTask.java +++ b/redisson/src/main/java/org/redisson/eviction/ScoredSetEvictionTask.java @@ -25,18 +25,20 @@ import org.redisson.command.CommandAsyncExecutor; * @author Nikita Koksharov * */ -public class SetCacheEvictionTask extends EvictionTask { +public class ScoredSetEvictionTask extends EvictionTask { private final String name; + private final long shiftInMilliseconds; - public SetCacheEvictionTask(String name, CommandAsyncExecutor executor) { + public ScoredSetEvictionTask(String name, CommandAsyncExecutor executor, long shiftInMilliseconds) { super(executor); this.name = name; + this.shiftInMilliseconds = shiftInMilliseconds; } @Override RFuture execute() { - return executor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.ZREMRANGEBYSCORE, name, 0, System.currentTimeMillis()); + return executor.writeAsync(name, LongCodec.INSTANCE, RedisCommands.ZREMRANGEBYSCORE, name, 0, System.currentTimeMillis() - shiftInMilliseconds); } } diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index 67ecee903..de663a22c 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -6,7 +6,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -18,10 +17,10 @@ import org.redisson.RedissonMapTest.SimpleKey; import org.redisson.RedissonMapTest.SimpleValue; import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; -import org.redisson.cache.Cache; +import org.redisson.api.LocalCachedMapOptions.InvalidationPolicy; import org.redisson.api.RLocalCachedMap; import org.redisson.api.RMap; -import org.redisson.api.RedissonClient; +import org.redisson.cache.Cache; import mockit.Deencapsulation; @@ -62,7 +61,7 @@ public class RedissonLocalCachedMapTest extends BaseTest { expectedValuesSet.add(1); expectedValuesSet.add(2); expectedValuesSet.add(3); - HashSet actualValuesSet = new HashSet<>(m.readAllValues()); + Set actualValuesSet = new HashSet<>(m.readAllValues()); Assert.assertEquals(expectedValuesSet, actualValuesSet); Map expectedMap = new HashMap<>(); expectedMap.put("a", 1); @@ -91,7 +90,10 @@ public class RedissonLocalCachedMapTest extends BaseTest { @Test public void testInvalidationOnClear() throws InterruptedException { - LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true); + LocalCachedMapOptions options = LocalCachedMapOptions.defaults() + .evictionPolicy(EvictionPolicy.LFU) + .cacheSize(5); + RLocalCachedMap map1 = redisson.getLocalCachedMap("test", options); Cache cache1 = Deencapsulation.getField(map1, "cache"); @@ -125,7 +127,10 @@ public class RedissonLocalCachedMapTest extends BaseTest { @Test public void testInvalidationOnUpdate() throws InterruptedException { - LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true); + LocalCachedMapOptions options = LocalCachedMapOptions.defaults() + .evictionPolicy(EvictionPolicy.LFU) + .cacheSize(5); + RLocalCachedMap map1 = redisson.getLocalCachedMap("test", options); Cache cache1 = Deencapsulation.getField(map1, "cache"); @@ -151,7 +156,11 @@ public class RedissonLocalCachedMapTest extends BaseTest { @Test public void testNoInvalidationOnUpdate() throws InterruptedException { - LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(false); + LocalCachedMapOptions options = LocalCachedMapOptions.defaults() + .evictionPolicy(EvictionPolicy.LFU) + .cacheSize(5) + .invalidationPolicy(InvalidationPolicy.NONE); + RLocalCachedMap map1 = redisson.getLocalCachedMap("test", options); Cache cache1 = Deencapsulation.getField(map1, "cache"); @@ -177,7 +186,11 @@ public class RedissonLocalCachedMapTest extends BaseTest { @Test public void testNoInvalidationOnRemove() throws InterruptedException { - LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(false); + LocalCachedMapOptions options = LocalCachedMapOptions.defaults() + .evictionPolicy(EvictionPolicy.LFU) + .cacheSize(5) + .invalidationPolicy(InvalidationPolicy.NONE); + RLocalCachedMap map1 = redisson.getLocalCachedMap("test", options); Cache cache1 = Deencapsulation.getField(map1, "cache"); @@ -200,10 +213,10 @@ public class RedissonLocalCachedMapTest extends BaseTest { assertThat(cache1.size()).isEqualTo(1); assertThat(cache2.size()).isEqualTo(1); } - + @Test public void testInvalidationOnRemove() throws InterruptedException { - LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5).invalidateEntryOnChange(true); + LocalCachedMapOptions options = LocalCachedMapOptions.defaults().evictionPolicy(EvictionPolicy.LFU).cacheSize(5); RLocalCachedMap map1 = redisson.getLocalCachedMap("test", options); Cache cache1 = Deencapsulation.getField(map1, "cache"); @@ -576,7 +589,7 @@ public class RedissonLocalCachedMapTest extends BaseTest { LocalCachedMapOptions options = LocalCachedMapOptions.defaults() .evictionPolicy(EvictionPolicy.NONE) .cacheSize(3) - .invalidateEntryOnChange(false); + .invalidationPolicy(InvalidationPolicy.NONE); RLocalCachedMap map = redisson.getLocalCachedMap("test", options); assertThat(map.fastRemove("test")).isZero(); }