From b25313bcd0ddc7e82d2be6e22131c09943fbb0a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=20Erik=20St=C3=B8wer?= Date: Wed, 31 Mar 2021 14:33:05 +0200 Subject: [PATCH] feature: implement oldValue in jcache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tom Erik Støwer --- .../main/java/org/redisson/jcache/JCache.java | 112 +++++++-------- .../org/redisson/jcache/JCacheEntryEvent.java | 17 ++- .../org/redisson/jcache/JCacheEventCodec.java | 13 ++ .../java/org/redisson/jcache/JCacheTest.java | 129 ++++++++++++++++++ 4 files changed, 210 insertions(+), 61 deletions(-) diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 8b1fb5fca..f0cafc48a 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -388,10 +388,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (containsKey(key)) { Long updateTimeout = getUpdateTimeout(); List res = evalWrite(getRawName(), codec, RedisCommands.EVAL_LIST, - "if ARGV[2] == '0' then " + "local value = redis.call('hget', KEYS[1], ARGV[4]);" + +"if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " - + "local value = redis.call('hget', KEYS[1], ARGV[4]);" + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[6]); " @@ -400,16 +400,16 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, syncs};" + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, syncs};" + "end; ", @@ -482,10 +482,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs "exists = false;" + "end;" + "if exists then " + + "local value = redis.call('hget', KEYS[1], ARGV[i]);" + "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[i]); " + "redis.call('zrem', KEYS[2], ARGV[i]); " - + "local value = redis.call('hget', KEYS[1], ARGV[i]);" + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value), ARGV[4]); " @@ -493,16 +493,16 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], ARGV[4]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value), ARGV[4]); " + "syncs = syncs + redis.call('publish', KEYS[8], syncMsg); " + "added = added + 1;" + "else " + "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], ARGV[4]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value), ARGV[4]); " + "syncs = syncs + redis.call('publish', KEYS[8], syncMsg); " + "added = added + 1;" + "end; " @@ -595,10 +595,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs "exists = false;" + "end;" + "if exists then " + + "local value = redis.call('hget', KEYS[1], ARGV[4]);" + "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " - + "local value = redis.call('hget', KEYS[1], ARGV[4]);" + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[6]); " @@ -607,16 +607,16 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, syncs};" + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, syncs};" + "end; " @@ -1229,16 +1229,16 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" + "end; ", @@ -1302,16 +1302,16 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[5], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" + "end; " @@ -2208,10 +2208,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs Long updateTimeout = getUpdateTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG, - "if ARGV[2] == '0' then " + "local value = redis.call('hget', KEYS[1], ARGV[4]); " + + "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " - + "local value = redis.call('hget', KEYS[1], ARGV[4]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[3], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[7]); " @@ -2219,9 +2219,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], ARGV[7]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value), ARGV[7]); " + "return redis.call('publish', KEYS[6], syncMsg); " + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " @@ -2296,11 +2296,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "end; " + "return 1;" @@ -2403,11 +2403,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (containsKey(key)) { double syncId = ThreadLocalRandom.current().nextDouble(); Long updateTimeout = getUpdateTimeout(); - Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG, - "if ARGV[1] == '0' then " + Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG, + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + + "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " - + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[3], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value), ARGV[5]); " @@ -2415,15 +2415,15 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[1] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); " + "return redis.call('publish', KEYS[6], syncMsg); " + "else " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); " + "return redis.call('publish', KEYS[6], syncMsg); " + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), @@ -2462,11 +2462,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[1] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "else " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "end; " + "return 1;", @@ -2497,11 +2497,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[1] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "else " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " + "end; " + "return value;", @@ -2529,8 +2529,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs Long updateTimeout = getUpdateTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG, - "if ARGV[1] == '0' then " - + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + + "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); " @@ -2540,15 +2540,15 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "elseif ARGV[1] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); " + "return redis.call('publish', KEYS[6], syncMsg); " + "else " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); " + + "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[4], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); " + + "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); " + "return redis.call('publish', KEYS[6], syncMsg); " + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), @@ -2992,14 +2992,14 @@ public class JCache extends RedissonObject implements Cache, CacheAs int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { - JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.REMOVED, msg.get(0), msg.get(1)); + JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.REMOVED, msg.get(0), msg.get(1), msg.get(1)); try { if (filter == null || filter.evaluate(event)) { List> events = Collections.>singletonList(event); ((CacheEntryRemovedListener) listener).onRemoved(events); } } finally { - sendSync(sync, msg); + sendSync(sync, msg.get(2)); } } }); @@ -3022,7 +3022,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs ((CacheEntryCreatedListener) listener).onCreated(events); } } finally { - sendSync(sync, msg); + sendSync(sync, msg.get(2)); } } }); @@ -3034,18 +3034,18 @@ public class JCache extends RedissonObject implements Cache, CacheAs channelName = getUpdatedSyncChannelName(); } - RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync)); + RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync, true)); int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { - JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1)); + JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1), msg.get(2)); try { if (filter == null || filter.evaluate(event)) { List> events = Collections.>singletonList(event); ((CacheEntryUpdatedListener) listener).onUpdated(events); } } finally { - sendSync(sync, msg); + sendSync(sync, msg.get(3)); } } }); @@ -3058,7 +3058,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs int listenerId = topic.addListener(List.class, new MessageListener>() { @Override public void onMessage(CharSequence channel, List msg) { - JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1)); + JCacheEntryEvent event = new JCacheEntryEvent(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1), msg.get(1)); if (filter == null || filter.evaluate(event)) { List> events = Collections.>singletonList(event); ((CacheEntryExpiredListener) listener).onExpired(events); @@ -3073,9 +3073,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs } } - private void sendSync(boolean sync, List msg) { + private void sendSync(boolean sync, Object syncId) { if (sync) { - RSemaphore semaphore = redisson.getSemaphore(getSyncName(msg.get(2))); + RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); semaphore.release(); } } diff --git a/redisson/src/main/java/org/redisson/jcache/JCacheEntryEvent.java b/redisson/src/main/java/org/redisson/jcache/JCacheEntryEvent.java index 10e6dc8a3..32dda8654 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCacheEntryEvent.java +++ b/redisson/src/main/java/org/redisson/jcache/JCacheEntryEvent.java @@ -33,11 +33,20 @@ public class JCacheEntryEvent extends CacheEntryEvent { private final Object key; private final Object value; - + private final Object oldValue; + public JCacheEntryEvent(Cache source, EventType eventType, Object key, Object value) { super(source, eventType); this.key = key; this.value = value; + this.oldValue = null; + } + + public JCacheEntryEvent(Cache source, EventType eventType, Object key, Object value, Object oldValue) { + super(source, eventType); + this.key = key; + this.value = value; + this.oldValue = oldValue; } @Override @@ -61,14 +70,12 @@ public class JCacheEntryEvent extends CacheEntryEvent { @Override public V getOldValue() { - // TODO Auto-generated method stub - return null; + return (V) oldValue; } @Override public boolean isOldValueAvailable() { - // TODO Auto-generated method stub - return false; + return oldValue != null; } } diff --git a/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java b/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java index e9a1ec000..ee1cafbab 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java +++ b/redisson/src/main/java/org/redisson/jcache/JCacheEventCodec.java @@ -33,6 +33,7 @@ import java.util.List; public class JCacheEventCodec extends BaseEventCodec { private final boolean sync; + private final boolean expectOldValueInMsg; private final Decoder decoder = new Decoder() { @Override @@ -44,6 +45,11 @@ public class JCacheEventCodec extends BaseEventCodec { Object value = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder()); result.add(value); + + if (expectOldValueInMsg) { + Object oldValue = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder()); + result.add(oldValue); + } if (sync) { double syncId = buf.readDoubleLE(); @@ -57,6 +63,13 @@ public class JCacheEventCodec extends BaseEventCodec { public JCacheEventCodec(Codec codec, OSType osType, boolean sync) { super(codec, osType); this.sync = sync; + this.expectOldValueInMsg = false; + } + + public JCacheEventCodec(Codec codec, OSType osType, boolean sync, boolean expectOldValueInMsg) { + super(codec, osType); + this.sync = sync; + this.expectOldValueInMsg = expectOldValueInMsg; } @Override diff --git a/redisson/src/test/java/org/redisson/jcache/JCacheTest.java b/redisson/src/test/java/org/redisson/jcache/JCacheTest.java index 2dad846c8..92c06f386 100644 --- a/redisson/src/test/java/org/redisson/jcache/JCacheTest.java +++ b/redisson/src/test/java/org/redisson/jcache/JCacheTest.java @@ -26,6 +26,8 @@ import javax.cache.configuration.MutableConfiguration; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryExpiredListener; import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.expiry.CreatedExpiryPolicy; import javax.cache.expiry.Duration; @@ -397,6 +399,80 @@ public class JCacheTest extends BaseTest { cache.close(); runner.stop(); } + + @Test + public void testUpdate() throws IOException, InterruptedException, URISyntaxException { + RedisProcess runner = new RedisRunner() + .nosave() + .randomDir() + .port(6311) + .run(); + + MutableConfiguration config = new MutableConfiguration<>(); + config.setStoreByValue(true); + + URI configUri = getClass().getResource("redisson-jcache.json").toURI(); + Cache cache = Caching.getCachingProvider().getCacheManager(configUri, null) + .createCache("test", config); + + CountDownLatch latch = new CountDownLatch(1); + + String key = "123"; + + UpdatedListener clientListener = new UpdatedListener(latch, key, "80", "90"); + MutableCacheEntryListenerConfiguration listenerConfiguration = + new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true); + cache.registerCacheEntryListener(listenerConfiguration); + + cache.put(key, "80"); + Assert.assertNotNull(cache.get(key)); + + cache.put(key, "90"); + + latch.await(); + + //Assert.assertNotNull(cache.get(key)); + + cache.close(); + runner.stop(); + } + + @Test + public void testRemoveListener() throws IOException, InterruptedException, URISyntaxException { + RedisProcess runner = new RedisRunner() + .nosave() + .randomDir() + .port(6311) + .run(); + + MutableConfiguration config = new MutableConfiguration<>(); + config.setStoreByValue(true); + + URI configUri = getClass().getResource("redisson-jcache.json").toURI(); + Cache cache = Caching.getCachingProvider().getCacheManager(configUri, null) + .createCache("test", config); + + CountDownLatch latch = new CountDownLatch(1); + + String key = "123"; + + RemovedListener clientListener = new RemovedListener(latch, key, "80"); + MutableCacheEntryListenerConfiguration listenerConfiguration = + new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true); + cache.registerCacheEntryListener(listenerConfiguration); + + cache.put(key, "80"); + Assert.assertNotNull(cache.get(key)); + + cache.remove(key); + + latch.await(); + + Assert.assertNull(cache.get(key)); + + cache.close(); + runner.stop(); + } public static class ExpiredListener implements CacheEntryExpiredListener, Serializable { @@ -420,10 +496,63 @@ public class JCacheTest extends BaseTest { assertThat(entry.getKey()).isEqualTo(key); assertThat(entry.getValue()).isEqualTo(value); + assertThat(entry.getOldValue()).isEqualTo(value); + latch.countDown(); } } + public static class UpdatedListener implements CacheEntryUpdatedListener, Serializable { + private Object key; + private Object oldValue; + private Object value; + private CountDownLatch latch; + + public UpdatedListener(CountDownLatch latch, Object key, Object oldValue, Object value) { + super(); + this.latch = latch; + this.key = key; + this.oldValue = oldValue; + this.value = value; + } + + @Override + public void onUpdated(Iterable> events) + throws CacheEntryListenerException { + CacheEntryEvent entry = events.iterator().next(); + + assertThat(entry.getKey()).isEqualTo(key); + assertThat(entry.getOldValue()).isEqualTo(oldValue); + assertThat(entry.getValue()).isEqualTo(value); + + latch.countDown(); + } + } + + public static class RemovedListener implements CacheEntryRemovedListener, Serializable { + private Object key; + private Object value; + private CountDownLatch latch; + + public RemovedListener(CountDownLatch latch, Object key, Object value) { + super(); + this.latch = latch; + this.key = key; + this.value = value; + } + + @Override + public void onRemoved(Iterable> events) + throws CacheEntryListenerException { + CacheEntryEvent entry = events.iterator().next(); + + assertThat(entry.getKey()).isEqualTo(key); + assertThat(entry.getValue()).isEqualTo(value); + assertThat(entry.getOldValue()).isEqualTo(value); + + latch.countDown(); + } + } }