Merge branch 'master' of github.com:redisson/redisson

pull/3562/head
Nikita Koksharov 4 years ago
commit 87b9e7e604

@ -212,7 +212,6 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "
+ "local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), '+inf', 'withscores', 'limit', 0, 1); "
+ "local random, permits = struct.unpack('fI', nearest[1]);"
+ "return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);"
+ "else "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1])); "

@ -157,6 +157,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return "jcache_removed_channel:{" + getRawName() + "}";
}
String getOldValueListenerCounter() {
return "jcache_old_value_listeners:{" + getRawName() + "}";
}
long currentNanoTime() {
if (config.isStatisticsEnabled()) {
return System.nanoTime();
@ -388,7 +392,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
List<Object> res = evalWrite(getRawName(), codec, RedisCommands.EVAL_LIST,
"if ARGV[2] == '0' then "
"if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
@ -398,23 +402,42 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "local syncs = redis.call('publish', KEYS[7], syncMsg); "
+ "return {0, syncs};"
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
res.add(syncId);
@ -491,18 +514,37 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value), ARGV[4]); "
+ "syncs = syncs + redis.call('publish', KEYS[7], syncMsg); "
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], -1, ARGV[4]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[i]);"
+ "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[i]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value), ARGV[4]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], ARGV[4]); "
+ "syncs = syncs + redis.call('publish', KEYS[8], syncMsg); "
+ "added = added + 1;"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], -1, ARGV[4]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[i]);"
+ "redis.call('hset', KEYS[1], ARGV[i], ARGV[i+1]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], string.len(tostring(value)), tostring(value), ARGV[4]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(ARGV[i+1]), ARGV[i+1], ARGV[4]); "
+ "syncs = syncs + redis.call('publish', KEYS[8], syncMsg); "
+ "added = added + 1;"
+ "end; "
@ -528,7 +570,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "end; "
+ "return {added, syncs};",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
params.toArray());
RPromise<Long> result = new RedissonPromise<>();
@ -605,18 +647,37 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "local syncs = redis.call('publish', KEYS[7], syncMsg); "
+ "return {0, syncs};"
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, syncs};"
+ "end; "
@ -641,7 +702,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "end; "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
RPromise<Boolean> result = waitSync(syncId, res);
@ -1229,21 +1290,35 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, value, syncs};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, value, syncs};"
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName()),
getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
result.add(syncId);
@ -1299,19 +1374,33 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[6], syncMsg); "
+ "return {0, value, syncs};"
+ "elseif ARGV[2] ~= '-1' then "
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, value, syncs};"
+ "else "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], -1, ARGV[6]); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], string.len(tostring(value)), tostring(value), ARGV[6]); "
+ "end; "
+ "redis.call('publish', KEYS[5], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); "
+ "local syncs = redis.call('publish', KEYS[8], syncMsg); "
+ "return {1, value, syncs};"
+ "end; "
@ -1336,7 +1425,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "end; "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName()),
getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
future.onComplete((r, e) -> {
@ -2208,7 +2297,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
Long updateTimeout = getUpdateTimeout();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[2] == '0' then "
"if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]); "
@ -2216,14 +2305,24 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "redis.call('publish', KEYS[3], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[7]); "
+ "return redis.call('publish', KEYS[5], syncMsg); "
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "elseif ARGV[2] ~= '-1' then "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], -1, ARGV[7]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value), ARGV[7]); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], ARGV[7]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "else "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('publish', KEYS[4], msg); "
@ -2231,7 +2330,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId);
List<Object> result = Arrays.<Object>asList(syncs, syncId);
@ -2296,11 +2395,23 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); "
+ "local msg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], -1); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); "
+ "local msg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], -1); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return 1;"
@ -2316,7 +2427,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "return 0;"
+ "end; "
+ "return -1; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()),
accessTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue));
}
@ -2403,8 +2514,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
if (containsKey(key)) {
double syncId = ThreadLocalRandom.current().nextDouble();
Long updateTimeout = getUpdateTimeout();
Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
@ -2413,21 +2524,40 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "return redis.call('publish', KEYS[5], syncMsg); "
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1, ARGV[5]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "else "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1, ARGV[5]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
List<Object> result = Arrays.<Object>asList(syncs, syncId);
@ -2462,15 +2592,27 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); "
+ "local msg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); "
+ "local msg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return 1;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()),
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@ -2497,17 +2639,29 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); "
+ "local msg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); "
+ "local msg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); "
+ "else "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return value;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()),
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
private V getAndReplaceValueLocked(K key, V value) {
@ -2529,30 +2683,49 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
Long updateTimeout = getUpdateTimeout();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = evalWrite(getRawName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "return redis.call('publish', KEYS[5], msg); "
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "elseif ARGV[1] ~= '-1' then "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1, ARGV[5]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "else "
+ "local oldValueRequired = tonumber(redis.call('get', KEYS[7])); "
+ "local msg, syncMsg; "
+ "if oldValueRequired == nil or oldValueRequired < 1 then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0h', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1); "
+ "syncMsg = struct.pack('Lc0Lc0hd', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], -1, ARGV[5]); "
+ "else "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "syncMsg = struct.pack('Lc0Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[5]); "
+ "end; "
+ "redis.call('publish', KEYS[4], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], ARGV[5]); "
+ "return redis.call('publish', KEYS[6], syncMsg); "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
List<Object> result = Arrays.<Object>asList(syncs, syncId);
@ -2561,7 +2734,23 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return oldValue;
}
private void incrementOldValueListenerCounter(String counterName) {
evalWrite(getRawName(), codec, RedisCommands.EVAL_INTEGER,
"return redis.call('incr', KEYS[1]);",
Arrays.<Object>asList(counterName));
}
private void decrementOldValueListenerCounter(String counterName) {
evalWrite(getRawName(), codec, RedisCommands.EVAL_INTEGER,
"return redis.call('decr', KEYS[1]);",
Arrays.<Object>asList(counterName));
}
private Integer getOldValueListenerCount(String counterName) {
return evalWrite(getRawName(), codec, RedisCommands.EVAL_INTEGER,
"return tonumber(redis.call('get', KEYS[1]));",
Arrays.<Object>asList(counterName));
}
@Override
public boolean replace(K key, V value) {
@ -2992,7 +3181,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.REMOVED, msg.get(0), msg.get(1));
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.REMOVED, msg.get(0), msg.get(1), msg.get(1));
try {
if (filter == null || filter.evaluate(event)) {
List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
@ -3034,11 +3223,16 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
channelName = getUpdatedSyncChannelName();
}
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync));
if (cacheEntryListenerConfiguration.isOldValueRequired()) {
incrementOldValueListenerCounter(getOldValueListenerCounter());
}
RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync, true));
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1));
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.UPDATED, msg.get(0), msg.get(1), msg.get(2));
try {
if (filter == null || filter.evaluate(event)) {
List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
@ -3058,7 +3252,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
int listenerId = topic.addListener(List.class, new MessageListener<List<Object>>() {
@Override
public void onMessage(CharSequence channel, List<Object> msg) {
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1));
JCacheEntryEvent<K, V> event = new JCacheEntryEvent<K, V>(JCache.this, EventType.EXPIRED, msg.get(0), msg.get(1), msg.get(1));
if (filter == null || filter.evaluate(event)) {
List<CacheEntryEvent<? extends K, ? extends V>> events = Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
((CacheEntryExpiredListener<K, V>) listener).onExpired(events);
@ -3075,7 +3269,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
private void sendSync(boolean sync, List<Object> msg) {
if (sync) {
RSemaphore semaphore = redisson.getSemaphore(getSyncName(msg.get(2)));
Object syncId = msg.get(msg.size() - 1);
RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId));
semaphore.release();
}
}
@ -3088,6 +3283,15 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
redisson.getTopic(entry.getValue()).removeListener(entry.getKey());
}
}
if (cacheEntryListenerConfiguration.isOldValueRequired()) {
final CacheEntryListener<? super K, ? super V> listener = cacheEntryListenerConfiguration.getCacheEntryListenerFactory().create();
if (CacheEntryUpdatedListener.class.isAssignableFrom(listener.getClass())) {
decrementOldValueListenerCounter(getOldValueListenerCounter());
}
}
config.removeCacheEntryListenerConfiguration(cacheEntryListenerConfiguration);
}

@ -33,11 +33,20 @@ public class JCacheEntryEvent<K, V> extends CacheEntryEvent<K, V> {
private final Object key;
private final Object value;
private final Object oldValue;
public JCacheEntryEvent(Cache<K, V> source, EventType eventType, Object key, Object value) {
super(source, eventType);
this.key = key;
this.value = value;
this.oldValue = null;
}
public JCacheEntryEvent(Cache<K, V> source, EventType eventType, Object key, Object value, Object oldValue) {
super(source, eventType);
this.key = key;
this.value = value;
this.oldValue = oldValue;
}
@Override
@ -61,14 +70,12 @@ public class JCacheEntryEvent<K, V> extends CacheEntryEvent<K, V> {
@Override
public V getOldValue() {
// TODO Auto-generated method stub
return null;
return (V) oldValue;
}
@Override
public boolean isOldValueAvailable() {
// TODO Auto-generated method stub
return false;
return oldValue != null;
}
}

@ -33,6 +33,7 @@ import java.util.List;
public class JCacheEventCodec extends BaseEventCodec {
private final boolean sync;
private final boolean expectOldValueInMsg;
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
@ -44,6 +45,17 @@ public class JCacheEventCodec extends BaseEventCodec {
Object value = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder());
result.add(value);
if (expectOldValueInMsg) {
ByteBuf copy = buf.copy();
if (copy.readShortLE() != -1) {
Object oldValue = JCacheEventCodec.this.decode(buf, state, codec.getMapValueDecoder());
result.add(oldValue);
} else {
buf.readShortLE();
result.add(null);
}
}
if (sync) {
double syncId = buf.readDoubleLE();
@ -57,6 +69,13 @@ public class JCacheEventCodec extends BaseEventCodec {
public JCacheEventCodec(Codec codec, OSType osType, boolean sync) {
super(codec, osType);
this.sync = sync;
this.expectOldValueInMsg = false;
}
public JCacheEventCodec(Codec codec, OSType osType, boolean sync, boolean expectOldValueInMsg) {
super(codec, osType);
this.sync = sync;
this.expectOldValueInMsg = expectOldValueInMsg;
}
@Override

@ -26,6 +26,8 @@ import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
@ -397,6 +399,161 @@ public class JCacheTest extends BaseTest {
cache.close();
runner.stop();
}
@Test
public void testUpdate() throws IOException, InterruptedException, URISyntaxException {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setStoreByValue(true);
URI configUri = getClass().getResource("redisson-jcache.json").toURI();
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(1);
String key = "123";
UpdatedListener clientListener = new UpdatedListener(latch, key, "80", "90");
MutableCacheEntryListenerConfiguration<String, String> listenerConfiguration =
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true);
cache.registerCacheEntryListener(listenerConfiguration);
cache.put(key, "80");
Assert.assertNotNull(cache.get(key));
cache.put(key, "90");
latch.await();
Assert.assertNotNull(cache.get(key));
cache.close();
runner.stop();
}
@Test
public void testUpdateAsync() throws IOException, InterruptedException, URISyntaxException {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setStoreByValue(true);
URI configUri = getClass().getResource("redisson-jcache.json").toURI();
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(2);
String key = "123";
UpdatedListener clientListener = new UpdatedListener(latch, key, "80", "90");
MutableCacheEntryListenerConfiguration<String, String> listenerConfiguration =
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, false);
cache.registerCacheEntryListener(listenerConfiguration);
UpdatedListener secondClientListener = new UpdatedListener(latch, key, "80", "90");
MutableCacheEntryListenerConfiguration<String, String> secondListenerConfiguration =
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(secondClientListener), null, false, false);
cache.registerCacheEntryListener(secondListenerConfiguration);
cache.put(key, "80");
Assert.assertNotNull(cache.get(key));
cache.put(key, "90");
latch.await();
Assert.assertNotNull(cache.get(key));
cache.close();
runner.stop();
}
@Test
public void testUpdateWithoutOldValue() throws IOException, InterruptedException, URISyntaxException {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setStoreByValue(true);
URI configUri = getClass().getResource("redisson-jcache.json").toURI();
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(1);
String key = "123";
UpdatedListener secondClientListener = new UpdatedListener(latch, key, null, "90");
MutableCacheEntryListenerConfiguration<String, String> secondListenerConfiguration =
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(secondClientListener), null, false, true);
cache.registerCacheEntryListener(secondListenerConfiguration);
cache.put(key, "80");
Assert.assertNotNull(cache.get(key));
cache.put(key, "90");
latch.await();
Assert.assertNotNull(cache.get(key));
cache.close();
runner.stop();
}
@Test
public void testRemoveListener() throws IOException, InterruptedException, URISyntaxException {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setStoreByValue(true);
URI configUri = getClass().getResource("redisson-jcache.json").toURI();
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(configUri, null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(1);
String key = "123";
RemovedListener clientListener = new RemovedListener(latch, key, "80");
MutableCacheEntryListenerConfiguration<String, String> listenerConfiguration =
new MutableCacheEntryListenerConfiguration<>(FactoryBuilder.factoryOf(clientListener), null, true, true);
cache.registerCacheEntryListener(listenerConfiguration);
cache.put(key, "80");
Assert.assertNotNull(cache.get(key));
cache.remove(key);
latch.await();
Assert.assertNull(cache.get(key));
cache.close();
runner.stop();
}
public static class ExpiredListener implements CacheEntryExpiredListener<String, String>, Serializable {
@ -420,10 +577,63 @@ public class JCacheTest extends BaseTest {
assertThat(entry.getKey()).isEqualTo(key);
assertThat(entry.getValue()).isEqualTo(value);
assertThat(entry.getOldValue()).isEqualTo(value);
latch.countDown();
}
}
public static class UpdatedListener implements CacheEntryUpdatedListener<String, String>, Serializable {
private Object key;
private Object oldValue;
private Object value;
private CountDownLatch latch;
public UpdatedListener(CountDownLatch latch, Object key, Object oldValue, Object value) {
super();
this.latch = latch;
this.key = key;
this.oldValue = oldValue;
this.value = value;
}
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends String>> events)
throws CacheEntryListenerException {
CacheEntryEvent<? extends String, ? extends String> entry = events.iterator().next();
assertThat(entry.getKey()).isEqualTo(key);
assertThat(entry.getOldValue()).isEqualTo(oldValue);
assertThat(entry.getValue()).isEqualTo(value);
latch.countDown();
}
}
public static class RemovedListener implements CacheEntryRemovedListener<String, String>, Serializable {
private Object key;
private Object value;
private CountDownLatch latch;
public RemovedListener(CountDownLatch latch, Object key, Object value) {
super();
this.latch = latch;
this.key = key;
this.value = value;
}
@Override
public void onRemoved(Iterable<CacheEntryEvent<? extends String, ? extends String>> events)
throws CacheEntryListenerException {
CacheEntryEvent<? extends String, ? extends String> entry = events.iterator().next();
assertThat(entry.getKey()).isEqualTo(key);
assertThat(entry.getValue()).isEqualTo(value);
assertThat(entry.getOldValue()).isEqualTo(value);
latch.countDown();
}
}
}

Loading…
Cancel
Save