feature: implement oldValue in jcache

Signed-off-by: Tom Erik Støwer <testower@gmail.com>
pull/3531/head
Tom Erik Støwer 4 years ago
parent 549c310204
commit b25313bcd0

@ -388,10 +388,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
List<Object> res = evalWrite(getRawName(), codec, RedisCommands.EVAL_LIST,
"if ARGV[2] == '0' then "
"local value = redis.call('hget', KEYS[1], ARGV[4]);"
+"if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[6]); "
@ -400,16 +400,16 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "end; ",
@ -482,10 +482,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
"exists = false;" +
"end;" +
"if exists then "
+ "local value = redis.call('hget', KEYS[1], ARGV[i]);"
+ "if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[i]); "
+ "redis.call('zrem', KEYS[2], ARGV[i]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[i]);"
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value), ARGV[4]); "
@ -493,16 +493,16 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], ARGV[4]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value), ARGV[4]); "
+ "syncs = syncs + redis.call('publish', KEYS[8], syncMsg); "
+ "added = added + 1;"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], ARGV[4]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value), ARGV[4]); "
+ "syncs = syncs + redis.call('publish', KEYS[8], syncMsg); "
+ "added = added + 1;"
+ "end; "
@ -595,10 +595,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
"exists = false;" +
"end;" +
"if exists then "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[6]); "
@ -607,16 +607,16 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "end; "
@ -1229,16 +1229,16 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, value, syncs};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, value, syncs};"
+ "end; ",
@ -1302,16 +1302,16 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, value, syncs};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, value, syncs};"
+ "end; "
@ -2208,10 +2208,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
Long updateTimeout = getUpdateTimeout();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[2] == '0' then "
"local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[7]); "
@ -2219,9 +2219,9 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], ARGV[7]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value), ARGV[7]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
@ -2296,11 +2296,11 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return 1;"
@ -2403,11 +2403,11 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
if (containsKey(key)) {
double syncId = ThreadLocalRandom.current().nextDouble();
Long updateTimeout = getUpdateTimeout();
Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value), ARGV[5]); "
@ -2415,15 +2415,15 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(),
@ -2462,11 +2462,11 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return 1;",
@ -2497,11 +2497,11 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return value;",
@ -2529,8 +2529,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
Long updateTimeout = getUpdateTimeout();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); "
@ -2540,15 +2540,15 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); "
+ "local syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(),
@ -2992,14 +2992,14 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.REMOVED, msg.get(0), msg.get(1));
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.REMOVED, msg.get(0), msg.get(1), msg.get(1));
try {
if (filter == null || filter.evaluate(event)) {
List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
((CacheEntryRemovedListener<K, V>) listener).onRemoved(events);
}
} finally {
sendSync(sync, msg);
sendSync(sync, msg.get(2));
}
}
});
@ -3022,7 +3022,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
((CacheEntryCreatedListener<K, V>) listener).onCreated(events);
}
} finally {
sendSync(sync, msg);
sendSync(sync, msg.get(2));
}
}
});
@ -3034,18 +3034,18 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
channelName = getUpdatedSyncChannelName();
}
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync));
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync, true));
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1));
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1), msg.get(2));
try {
if (filter == null || filter.evaluate(event)) {
List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
((CacheEntryUpdatedListener<K, V>) listener).onUpdated(events);
}
} finally {
sendSync(sync, msg);
sendSync(sync, msg.get(3));
}
}
});
@ -3058,7 +3058,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1));
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1), msg.get(1));
if (filter == null || filter.evaluate(event)) {
List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
((CacheEntryExpiredListener<K, V>) listener).onExpired(events);
@ -3073,9 +3073,9 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
}
private void sendSync(boolean sync, List<Object> msg) {
private void sendSync(boolean sync, Object syncId) {
if (sync) {
RSemaphore semaphore = redisson.getSemaphore(getSyncName(msg.get(2)));
RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId));
semaphore.release();
}
}

@ -33,11 +33,20 @@ public class JCacheEntryEvent<K, V> extends CacheEntryEvent<K, V> {
private final Object key;
private final Object value;
private final Object oldValue;
public JCacheEntryEvent(Cache<K, V> source, EventType eventType, Object key, Object value) {
super(source, eventType);
this.key = key;
this.value = value;
this.oldValue = null;
}
public JCacheEntryEvent(Cache<K, V> source, EventType eventType, Object key, Object value, Object oldValue) {
super(source, eventType);
this.key = key;
this.value = value;
this.oldValue = oldValue;
}
@Override
@ -61,14 +70,12 @@ public class JCacheEntryEvent<K, V> extends CacheEntryEvent<K, V> {
@Override
public V getOldValue() {
// TODO Auto-generated method stub
return null;
return (V) oldValue;
}
@Override
public boolean isOldValueAvailable() {
// TODO Auto-generated method stub
return false;
return oldValue != null;
}
}

@ -33,6 +33,7 @@ import java.util.List;
public class JCacheEventCodec extends BaseEventCodec {
private final boolean sync;
private final boolean expectOldValueInMsg;
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
@ -45,6 +46,11 @@ public class JCacheEventCodec extends BaseEventCodec {
Object value = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder());
result.add(value);
if (expectOldValueInMsg) {
Object oldValue = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder());
result.add(oldValue);
}
if (sync) {
double syncId = buf.readDoubleLE();
result.add(syncId);
@ -57,6 +63,13 @@ public class JCacheEventCodec extends BaseEventCodec {
public JCacheEventCodec(Codec codec, OSType osType, boolean sync) {
super(codec, osType);
this.sync = sync;
this.expectOldValueInMsg = false;
}
public JCacheEventCodec(Codec codec, OSType osType, boolean sync, boolean expectOldValueInMsg) {
super(codec, osType);
this.sync = sync;
this.expectOldValueInMsg = expectOldValueInMsg;
}
@Override

@ -26,6 +26,8 @@ import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
@ -398,6 +400,80 @@ public class JCacheTest extends BaseTest {
runner.stop();
}
@Test
public void testUpdate() throws IOException, InterruptedException, URISyntaxException {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setStoreByValue(true);
URI configUri = getClass().getResource("redisson-jcache.json").toURI();
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(1);
String key = "123";
UpdatedListener clientListener = new UpdatedListener(latch, key, "80", "90");
MutableCacheEntryListenerConfiguration<String, String> listenerConfiguration =
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true);
cache.registerCacheEntryListener(listenerConfiguration);
cache.put(key, "80");
Assert.assertNotNull(cache.get(key));
cache.put(key, "90");
latch.await();
//Assert.assertNotNull(cache.get(key));
cache.close();
runner.stop();
}
@Test
public void testRemoveListener() throws IOException, InterruptedException, URISyntaxException {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setStoreByValue(true);
URI configUri = getClass().getResource("redisson-jcache.json").toURI();
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(1);
String key = "123";
RemovedListener clientListener = new RemovedListener(latch, key, "80");
MutableCacheEntryListenerConfiguration<String, String> listenerConfiguration =
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true);
cache.registerCacheEntryListener(listenerConfiguration);
cache.put(key, "80");
Assert.assertNotNull(cache.get(key));
cache.remove(key);
latch.await();
Assert.assertNull(cache.get(key));
cache.close();
runner.stop();
}
public static class ExpiredListener implements CacheEntryExpiredListener<String, String>, Serializable {
private Object key;
@ -420,10 +496,63 @@ public class JCacheTest extends BaseTest {
assertThat(entry.getKey()).isEqualTo(key);
assertThat(entry.getValue()).isEqualTo(value);
assertThat(entry.getOldValue()).isEqualTo(value);
latch.countDown();
}
}
public static class UpdatedListener implements CacheEntryUpdatedListener<String, String>, Serializable {
private Object key;
private Object oldValue;
private Object value;
private CountDownLatch latch;
public UpdatedListener(CountDownLatch latch, Object key, Object oldValue, Object value) {
super();
this.latch = latch;
this.key = key;
this.oldValue = oldValue;
this.value = value;
}
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends String>> events)
throws CacheEntryListenerException {
CacheEntryEvent<? extends String, ? extends String> entry = events.iterator().next();
assertThat(entry.getKey()).isEqualTo(key);
assertThat(entry.getOldValue()).isEqualTo(oldValue);
assertThat(entry.getValue()).isEqualTo(value);
latch.countDown();
}
}
public static class RemovedListener implements CacheEntryRemovedListener<String, String>, Serializable {
private Object key;
private Object value;
private CountDownLatch latch;
public RemovedListener(CountDownLatch latch, Object key, Object value) {
super();
this.latch = latch;
this.key = key;
this.value = value;
}
@Override
public void onRemoved(Iterable<CacheEntryEvent<? extends String, ? extends String>> events)
throws CacheEntryListenerException {
CacheEntryEvent<? extends String, ? extends String> entry = events.iterator().next();
assertThat(entry.getKey()).isEqualTo(key);
assertThat(entry.getValue()).isEqualTo(value);
assertThat(entry.getOldValue()).isEqualTo(value);
latch.countDown();
}
}
}

Loading…
Cancel
Save