diff --git a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java index c36e52b44..7ba2fd86d 100644 --- a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java +++ b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java @@ -212,7 +212,6 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit + "if tonumber(currentValue) < tonumber(ARGV[1]) then " + "local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1); " - + "local random, permits = struct.unpack('fI', nearest[1]);" + "return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);" + "else " + "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); " diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 8b1fb5fca..df82fac09 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -157,6 +157,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs return "jcache_removed_channel:{" + getRawName() + "}"; } + String getOldValueListenerCounter() { + return "jcache_old_value_listeners:{" + getRawName() + "}"; + } + long currentNanoTime() { if (config.isStatisticsEnabled()) { return System.nanoTime(); @@ -388,7 +392,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (containsKey(key)) { Long updateTimeout = getUpdateTimeout(); List res = evalWrite(getRawName(), codec, RedisCommands.EVAL_LIST, - "if ARGV[2] == '0' then " + "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " + "local value = redis.call('hget', KEYS[1], ARGV[4]);" @@ -398,23 +402,42 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncs = redis.call('publish', KEYS[7], syncMsg); " + "return {0, syncs};" + "elseif ARGV[2] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[4]);" + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, syncs};" + "else " - + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[4]);" + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, syncs};" + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), - getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), + getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); res.add(syncId); @@ -491,18 +514,37 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value), ARGV[4]); " + "syncs = syncs + redis.call('publish', KEYS[7], syncMsg); " + "elseif ARGV[2] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); " - + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], -1, ARGV[4]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[i]);" + + "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value), ARGV[4]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], ARGV[4]); " + "syncs = syncs + redis.call('publish', KEYS[8], syncMsg); " + "added = added + 1;" + "else " - + "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], -1, ARGV[4]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[i]);" + + "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value), ARGV[4]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], ARGV[4]); " + "syncs = syncs + redis.call('publish', KEYS[8], syncMsg); " + "added = added + 1;" + "end; " @@ -528,7 +570,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; " + "return {added, syncs};", Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), - getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), + getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), params.toArray()); RPromise result = new RedissonPromise<>(); @@ -605,18 +647,37 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncs = redis.call('publish', KEYS[7], syncMsg); " + "return {0, syncs};" + "elseif ARGV[2] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[4]);" + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, syncs};" + "else " - + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[4]);" + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, syncs};" + "end; " @@ -641,7 +702,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; " + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), - getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), + getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); RPromise result = waitSync(syncId, res); @@ -1229,21 +1290,35 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(), - getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName()), + getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); result.add(syncId); @@ -1299,19 +1374,33 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[6], syncMsg); " + "return {0, value, syncs};" - + "elseif ARGV[2] ~= '-1' then " + + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" - + "else " + + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + + "end; " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" + "end; " @@ -1336,7 +1425,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; " + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(), - getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName()), + getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); future.onComplete((r, e) -> { @@ -2208,7 +2297,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs Long updateTimeout = getUpdateTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG, - "if ARGV[2] == '0' then " + "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " + "local value = redis.call('hget', KEYS[1], ARGV[4]); " @@ -2216,14 +2305,24 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('publish', KEYS[3], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[7]); " + "return redis.call('publish', KEYS[5], syncMsg); " - + "elseif ARGV[2] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " - + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "elseif ARGV[2] ~= '-1' then " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], -1, ARGV[7]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[4]); " + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " + + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value), ARGV[7]); " + + "end; " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], ARGV[7]); " + "return redis.call('publish', KEYS[6], syncMsg); " - + "else " + + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); " + "redis.call('publish', KEYS[4], msg); " @@ -2231,7 +2330,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return redis.call('publish', KEYS[6], syncMsg); " + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), - getRemovedSyncChannelName(), getUpdatedSyncChannelName()), + getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId); List result = Arrays.asList(syncs, syncId); @@ -2296,11 +2395,23 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + + "local msg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], -1); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); " + + "end; " + "redis.call('publish', KEYS[4], msg); " + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + + "local msg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], -1); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); " + + "end; " + "redis.call('publish', KEYS[4], msg); " + "end; " + "return 1;" @@ -2316,7 +2427,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return 0;" + "end; " + "return -1; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), + Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()), accessTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue)); } @@ -2403,8 +2514,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (containsKey(key)) { double syncId = ThreadLocalRandom.current().nextDouble(); Long updateTimeout = getUpdateTimeout(); - Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG, - "if ARGV[1] == '0' then " + Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG, + "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " + "local value = redis.call('hget', KEYS[1], ARGV[3]); " @@ -2413,21 +2524,40 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value), ARGV[5]); " + "return redis.call('publish', KEYS[5], syncMsg); " + "elseif ARGV[1] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1, ARGV[5]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); " + + "end; " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); " + "return redis.call('publish', KEYS[6], syncMsg); " - + "else " - + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "else " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1, ARGV[5]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); " + + "end; " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); " + "return redis.call('publish', KEYS[6], syncMsg); " + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), - getRemovedSyncChannelName(), getUpdatedSyncChannelName()), + getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); List result = Arrays.asList(syncs, syncId); @@ -2462,15 +2592,27 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[1] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + + "local msg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + + "end; " + "redis.call('publish', KEYS[4], msg); " + "else " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + + "local msg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + + "end; " + "redis.call('publish', KEYS[4], msg); " + "end; " + "return 1;", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), + Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } @@ -2497,17 +2639,29 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[1] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + + "local msg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + + "end; " + "redis.call('publish', KEYS[4], msg); " + "else " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + + "local msg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); " + + "else " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + + "end; " + "redis.call('publish', KEYS[4], msg); " + "end; " + "return value;", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), + Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); - + } private V getAndReplaceValueLocked(K key, V value) { @@ -2529,30 +2683,49 @@ public class JCache extends RedissonObject implements Cache, CacheAs Long updateTimeout = getUpdateTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG, - "if ARGV[1] == '0' then " - + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " + + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[3], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value), ARGV[5]); " + "return redis.call('publish', KEYS[5], msg); " - + "elseif ARGV[1] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "elseif ARGV[1] ~= '-1' then " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1, ARGV[5]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); " + + "end; " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); " + "return redis.call('publish', KEYS[6], syncMsg); " - + "else " - + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "else " + + "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); " + + "local msg, syncMsg; " + + "if oldValueRequired == nil or oldValueRequired < 1 then " + + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); " + + "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1, ARGV[5]); " + + "else " + + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + + "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); " + + "end; " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); " + "return redis.call('publish', KEYS[6], syncMsg); " + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), - getRemovedSyncChannelName(), getUpdatedSyncChannelName()), + getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); List result = Arrays.asList(syncs, syncId); @@ -2561,7 +2734,23 @@ public class JCache extends RedissonObject implements Cache, CacheAs return oldValue; } + private void incrementOldValueListenerCounter(String counterName) { + evalWrite(getRawName(), codec, RedisCommands.EVAL_INTEGER, + "return redis.call('incr', KEYS[1]);", + Arrays.asList(counterName)); + } + private void decrementOldValueListenerCounter(String counterName) { + evalWrite(getRawName(), codec, RedisCommands.EVAL_INTEGER, + "return redis.call('decr', KEYS[1]);", + Arrays.asList(counterName)); + } + + private Integer getOldValueListenerCount(String counterName) { + return evalWrite(getRawName(), codec, RedisCommands.EVAL_INTEGER, + "return tonumber(redis.call('get', KEYS[1]));", + Arrays.asList(counterName)); + } @Override public boolean replace(K key, V value) { @@ -2992,7 +3181,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { - JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.REMOVED, msg.get(0), msg.get(1)); + JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.REMOVED, msg.get(0), msg.get(1), msg.get(1)); try { if (filter == null || filter.evaluate(event)) { List> events = Collections.>singletonList(event); @@ -3034,11 +3223,16 @@ public class JCache extends RedissonObject implements Cache, CacheAs channelName = getUpdatedSyncChannelName(); } - RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync)); + if (cacheEntryListenerConfiguration.isOldValueRequired()) { + incrementOldValueListenerCounter(getOldValueListenerCounter()); + } + + RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync, true)); int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { - JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1)); + JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1), msg.get(2)); + try { if (filter == null || filter.evaluate(event)) { List> events = Collections.>singletonList(event); @@ -3058,7 +3252,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { - JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1)); + JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1), msg.get(1)); if (filter == null || filter.evaluate(event)) { List> events = Collections.>singletonList(event); ((CacheEntryExpiredListener) listener).onExpired(events); @@ -3075,7 +3269,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs private void sendSync(boolean sync, List msg) { if (sync) { - RSemaphore semaphore = redisson.getSemaphore(getSyncName(msg.get(2))); + Object syncId = msg.get(msg.size() - 1); + RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); semaphore.release(); } } @@ -3088,6 +3283,15 @@ public class JCache extends RedissonObject implements Cache, CacheAs redisson.getTopic(entry.getValue()).removeListener(entry.getKey()); } } + + if (cacheEntryListenerConfiguration.isOldValueRequired()) { + final CacheEntryListener listener = cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create(); + + if (CacheEntryUpdatedListener.class.isAssignableFrom(listener.getClass())) { + decrementOldValueListenerCounter(getOldValueListenerCounter()); + } + } + config.removeCacheEntryListenerConfiguration(cacheEntryListenerConfiguration); } diff --git a/redisson/src/main/java/org/redisson/jcache/JCacheEntryEvent.java b/redisson/src/main/java/org/redisson/jcache/JCacheEntryEvent.java index 10e6dc8a3..32dda8654 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCacheEntryEvent.java +++ b/redisson/src/main/java/org/redisson/jcache/JCacheEntryEvent.java @@ -33,11 +33,20 @@ public class JCacheEntryEvent extends CacheEntryEvent { private final Object key; private final Object value; - + private final Object oldValue; + public JCacheEntryEvent(Cache source, EventType eventType, Object key, Object value) { super(source, eventType); this.key = key; this.value = value; + this.oldValue = null; + } + + public JCacheEntryEvent(Cache source, EventType eventType, Object key, Object value, Object oldValue) { + super(source, eventType); + this.key = key; + this.value = value; + this.oldValue = oldValue; } @Override @@ -61,14 +70,12 @@ public class JCacheEntryEvent extends CacheEntryEvent { @Override public V getOldValue() { - // TODO Auto-generated method stub - return null; + return (V) oldValue; } @Override public boolean isOldValueAvailable() { - // TODO Auto-generated method stub - return false; + return oldValue != null; } } diff --git a/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java b/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java index e9a1ec000..9a2161b66 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java +++ b/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java @@ -33,6 +33,7 @@ import java.util.List; public class JCacheEventCodec extends BaseEventCodec { private final boolean sync; + private final boolean expectOldValueInMsg; private final Decoder decoder = new Decoder() { @Override @@ -44,6 +45,17 @@ public class JCacheEventCodec extends BaseEventCodec { Object value = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder()); result.add(value); + + if (expectOldValueInMsg) { + ByteBuf copy = buf.copy(); + if (copy.readShortLE() != -1) { + Object oldValue = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder()); + result.add(oldValue); + } else { + buf.readShortLE(); + result.add(null); + } + } if (sync) { double syncId = buf.readDoubleLE(); @@ -57,6 +69,13 @@ public class JCacheEventCodec extends BaseEventCodec { public JCacheEventCodec(Codec codec, OSType osType, boolean sync) { super(codec, osType); this.sync = sync; + this.expectOldValueInMsg = false; + } + + public JCacheEventCodec(Codec codec, OSType osType, boolean sync, boolean expectOldValueInMsg) { + super(codec, osType); + this.sync = sync; + this.expectOldValueInMsg = expectOldValueInMsg; } @Override diff --git a/redisson/src/test/java/org/redisson/jcache/JCacheTest.java b/redisson/src/test/java/org/redisson/jcache/JCacheTest.java index a1fa70d9b..ac37e0479 100644 --- a/redisson/src/test/java/org/redisson/jcache/JCacheTest.java +++ b/redisson/src/test/java/org/redisson/jcache/JCacheTest.java @@ -26,6 +26,8 @@ import javax.cache.configuration.MutableConfiguration; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryExpiredListener; import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; @@ -397,6 +399,161 @@ public class JCacheTest extends BaseTest { cache.close(); runner.stop(); } + + @Test + public void testUpdate() throws IOException, InterruptedException, URISyntaxException { + RedisProcess runner = new RedisRunner() + .nosave() + .randomDir() + .port(6311) + .run(); + + MutableConfiguration config = new MutableConfiguration<>(); + config.setStoreByValue(true); + + URI configUri = getClass().getResource("redisson-jcache.json").toURI(); + Cache cache = Caching.getCachingProvider().getCacheManager(configUri, null) + .createCache("test", config); + + CountDownLatch latch = new CountDownLatch(1); + + String key = "123"; + + UpdatedListener clientListener = new UpdatedListener(latch, key, "80", "90"); + MutableCacheEntryListenerConfiguration listenerConfiguration = + new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true); + cache.registerCacheEntryListener(listenerConfiguration); + + cache.put(key, "80"); + Assert.assertNotNull(cache.get(key)); + + cache.put(key, "90"); + + latch.await(); + + Assert.assertNotNull(cache.get(key)); + + cache.close(); + runner.stop(); + } + + @Test + public void testUpdateAsync() throws IOException, InterruptedException, URISyntaxException { + RedisProcess runner = new RedisRunner() + .nosave() + .randomDir() + .port(6311) + .run(); + + MutableConfiguration config = new MutableConfiguration<>(); + config.setStoreByValue(true); + + URI configUri = getClass().getResource("redisson-jcache.json").toURI(); + Cache cache = Caching.getCachingProvider().getCacheManager(configUri, null) + .createCache("test", config); + + CountDownLatch latch = new CountDownLatch(2); + + String key = "123"; + + UpdatedListener clientListener = new UpdatedListener(latch, key, "80", "90"); + MutableCacheEntryListenerConfiguration listenerConfiguration = + new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, false); + cache.registerCacheEntryListener(listenerConfiguration); + + UpdatedListener secondClientListener = new UpdatedListener(latch, key, "80", "90"); + MutableCacheEntryListenerConfiguration secondListenerConfiguration = + new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(secondClientListener), null, false, false); + cache.registerCacheEntryListener(secondListenerConfiguration); + + + + cache.put(key, "80"); + Assert.assertNotNull(cache.get(key)); + + cache.put(key, "90"); + + latch.await(); + + Assert.assertNotNull(cache.get(key)); + + cache.close(); + runner.stop(); + } + + @Test + public void testUpdateWithoutOldValue() throws IOException, InterruptedException, URISyntaxException { + RedisProcess runner = new RedisRunner() + .nosave() + .randomDir() + .port(6311) + .run(); + + MutableConfiguration config = new MutableConfiguration<>(); + config.setStoreByValue(true); + + URI configUri = getClass().getResource("redisson-jcache.json").toURI(); + Cache cache = Caching.getCachingProvider().getCacheManager(configUri, null) + .createCache("test", config); + + CountDownLatch latch = new CountDownLatch(1); + + String key = "123"; + + UpdatedListener secondClientListener = new UpdatedListener(latch, key, null, "90"); + MutableCacheEntryListenerConfiguration secondListenerConfiguration = + new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(secondClientListener), null, false, true); + cache.registerCacheEntryListener(secondListenerConfiguration); + + cache.put(key, "80"); + Assert.assertNotNull(cache.get(key)); + + cache.put(key, "90"); + + latch.await(); + + Assert.assertNotNull(cache.get(key)); + + cache.close(); + runner.stop(); + } + + @Test + public void testRemoveListener() throws IOException, InterruptedException, URISyntaxException { + RedisProcess runner = new RedisRunner() + .nosave() + .randomDir() + .port(6311) + .run(); + + MutableConfiguration config = new MutableConfiguration<>(); + config.setStoreByValue(true); + + URI configUri = getClass().getResource("redisson-jcache.json").toURI(); + Cache cache = Caching.getCachingProvider().getCacheManager(configUri, null) + .createCache("test", config); + + CountDownLatch latch = new CountDownLatch(1); + + String key = "123"; + + RemovedListener clientListener = new RemovedListener(latch, key, "80"); + MutableCacheEntryListenerConfiguration listenerConfiguration = + new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true); + cache.registerCacheEntryListener(listenerConfiguration); + + cache.put(key, "80"); + Assert.assertNotNull(cache.get(key)); + + cache.remove(key); + + latch.await(); + + Assert.assertNull(cache.get(key)); + + cache.close(); + runner.stop(); + } public static class ExpiredListener implements CacheEntryExpiredListener, Serializable { @@ -420,10 +577,63 @@ public class JCacheTest extends BaseTest { assertThat(entry.getKey()).isEqualTo(key); assertThat(entry.getValue()).isEqualTo(value); + assertThat(entry.getOldValue()).isEqualTo(value); + latch.countDown(); } } + public static class UpdatedListener implements CacheEntryUpdatedListener, Serializable { + private Object key; + private Object oldValue; + private Object value; + private CountDownLatch latch; + + public UpdatedListener(CountDownLatch latch, Object key, Object oldValue, Object value) { + super(); + this.latch = latch; + this.key = key; + this.oldValue = oldValue; + this.value = value; + } + + @Override + public void onUpdated(Iterable> events) + throws CacheEntryListenerException { + CacheEntryEvent entry = events.iterator().next(); + + assertThat(entry.getKey()).isEqualTo(key); + assertThat(entry.getOldValue()).isEqualTo(oldValue); + assertThat(entry.getValue()).isEqualTo(value); + + latch.countDown(); + } + } + + public static class RemovedListener implements CacheEntryRemovedListener, Serializable { + private Object key; + private Object value; + private CountDownLatch latch; + + public RemovedListener(CountDownLatch latch, Object key, Object value) { + super(); + this.latch = latch; + this.key = key; + this.value = value; + } + + @Override + public void onRemoved(Iterable> events) + throws CacheEntryListenerException { + CacheEntryEvent entry = events.iterator().next(); + + assertThat(entry.getKey()).isEqualTo(key); + assertThat(entry.getValue()).isEqualTo(value); + assertThat(entry.getOldValue()).isEqualTo(value); + + latch.countDown(); + } + } }