From 96e7baad8406ff684b744f8881169541a31ddadf Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 12 May 2017 15:49:25 +0300 Subject: [PATCH] JSR107 cache implementation should throw javax.cache.CacheException #850 --- .../main/java/org/redisson/jcache/JCache.java | 209 ++++++++++-------- 1 file changed, 111 insertions(+), 98 deletions(-) diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index cf898931d..46b3364ac 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.cache.Cache; +import javax.cache.CacheException; import javax.cache.CacheManager; import javax.cache.configuration.CacheEntryListenerConfiguration; import javax.cache.configuration.Configuration; @@ -57,6 +58,7 @@ import org.redisson.api.RLock; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; import org.redisson.api.listener.MessageListener; +import org.redisson.client.codec.Codec; import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -209,7 +211,7 @@ public class JCache extends RedissonObject implements Cache { V getValueLocked(K key) { - V value = (V) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL, + V value = evalWrite(getName(), codec, EVAL_GET_TTL, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return nil; " @@ -226,7 +228,7 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return value; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName()), - 0, System.currentTimeMillis(), key)); + 0, System.currentTimeMillis(), key); if (value != null) { List result = new ArrayList(3); @@ -234,7 +236,7 @@ public class JCache extends RedissonObject implements Cache { Long accessTimeout = getAccessTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); - Long syncs = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, + Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG, "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " @@ -250,7 +252,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), - accessTimeout, System.currentTimeMillis(), encodeMapKey(key), syncId)); + accessTimeout, System.currentTimeMillis(), encodeMapKey(key), syncId); result.add(syncs); result.add(syncId); @@ -265,7 +267,7 @@ public class JCache extends RedissonObject implements Cache { private V getValue(K key) { Long accessTimeout = getAccessTimeout(); - V value = (V) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL, + V value = evalWrite(getName(), codec, EVAL_GET_TTL, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return nil; " @@ -292,7 +294,7 @@ public class JCache extends RedissonObject implements Cache { + "return value; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName()), - accessTimeout, System.currentTimeMillis(), key)); + accessTimeout, System.currentTimeMillis(), key); return value; } @@ -311,8 +313,7 @@ public class JCache extends RedissonObject implements Cache { } V load(K key) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { V value = getValueLocked(key); if (value == null) { @@ -339,12 +340,30 @@ public class JCache extends RedissonObject implements Cache { return value; } + private R write(String key, RedisCommand command, Object ... params) { + RFuture future = commandExecutor.writeAsync(key, command, params); + try { + return get(future); + } catch (Exception e) { + throw new CacheException(e); + } + } + + private R evalWrite(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params) { + RFuture future = commandExecutor.evalWriteAsync(key, codec, evalCommandType, script, keys, params); + try { + return get(future); + } catch (Exception e) { + throw new CacheException(e); + } + } + private boolean putValueLocked(K key, Object value) { double syncId = ThreadLocalRandom.current().nextDouble(); if (containsKey(key)) { Long updateTimeout = getUpdateTimeout(); - List res = (List) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + List res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST, "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " @@ -372,7 +391,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), - 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId)); + 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); res.add(syncId); waitSync(res); @@ -381,7 +400,7 @@ public class JCache extends RedissonObject implements Cache { } Long creationTimeout = getCreationTimeout(); - List res = (List) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + List res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST, "if ARGV[1] == '0' then " + "return {0};" + "elseif ARGV[1] ~= '-1' then " @@ -402,7 +421,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), - creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId)); + creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); res.add(syncId); waitSync(res); @@ -417,7 +436,7 @@ public class JCache extends RedissonObject implements Cache { Long creationTimeout = getCreationTimeout(); Long updateTimeout = getUpdateTimeout(); - List res = (List) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + List res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST, "if redis.call('hexists', KEYS[1], ARGV[4]) == 1 then " + "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " @@ -466,7 +485,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), - creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId)); + creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); res.add(syncId); waitSync(res); @@ -504,7 +523,7 @@ public class JCache extends RedissonObject implements Cache { private boolean putIfAbsentValue(K key, Object value) { Long creationTimeout = getCreationTimeout(); - return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_IF_ABSENT, + return evalWrite(getName(), codec, EVAL_PUT_IF_ABSENT, "if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " + "return 0; " + "else " @@ -524,7 +543,7 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getCreatedChannelName()), - creationTimeout, key, value)); + creationTimeout, key, value); } private boolean putIfAbsentValueLocked(K key, Object value) { @@ -533,7 +552,7 @@ public class JCache extends RedissonObject implements Cache { } Long creationTimeout = getCreationTimeout(); - return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_IF_ABSENT, + return evalWrite(getName(), codec, EVAL_PUT_IF_ABSENT, "if ARGV[1] == '0' then " + "return 0;" + "elseif ARGV[1] ~= '-1' then " @@ -549,7 +568,7 @@ public class JCache extends RedissonObject implements Cache { + "return 1;" + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getCreatedChannelName()), - creationTimeout, key, value)); + creationTimeout, key, value); } @@ -590,7 +609,7 @@ public class JCache extends RedissonObject implements Cache { args.add(System.currentTimeMillis()); args.addAll(keys); - Map res = (Map) get(commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand>("EVAL", new MapGetAllDecoder(args, 2, true), 8, ValueType.MAP_KEY, ValueType.MAP_VALUE), + Map res = evalWrite(getName(), codec, new RedisCommand>("EVAL", new MapGetAllDecoder(args, 2, true), 8, ValueType.MAP_KEY, ValueType.MAP_VALUE), "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" + "local accessTimeout = ARGV[1]; " + "local currentTime = tonumber(ARGV[2]); " @@ -625,7 +644,7 @@ public class JCache extends RedissonObject implements Cache { + "table.insert(result, value); " + "end; " + "return result;", - Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName()), args.toArray())); + Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName()), args.toArray()); Map result = new HashMap(); for (Map.Entry entry : res.entrySet()) { @@ -655,7 +674,7 @@ public class JCache extends RedissonObject implements Cache { throw new NullPointerException(); } - return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_KEY, + return evalWrite(getName(), codec, EVAL_CONTAINS_KEY, "if redis.call('hexists', KEYS[1], ARGV[2]) == 0 then " + "return 0;" + "end;" @@ -671,7 +690,7 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return 1;", Arrays.asList(getName(), getTimeoutSetName()), - System.currentTimeMillis(), key)); + System.currentTimeMillis(), key); } @Override @@ -700,8 +719,7 @@ public class JCache extends RedissonObject implements Cache { for (K key : keys) { try { if (!containsKey(key) || replaceExistingValues) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { if (!containsKey(key)|| replaceExistingValues) { V value; @@ -732,16 +750,14 @@ public class JCache extends RedissonObject implements Cache { }); } - private RLock getLock(K key) { - String lockName = getLockName(key); - RLock lock = redisson.getLock(lockName); - return lock; - } - private RLock getLockedLock(K key) { String lockName = getLockName(key); RLock lock = redisson.getLock(lockName); - lock.lock(); + try { + lock.lock(); + } catch (Exception e) { + throw new CacheException(e); + } return lock; } @@ -758,8 +774,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = currentNanoTime(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { List result = getAndPutValueLocked(key, value); if (result.isEmpty()) { @@ -818,17 +833,17 @@ public class JCache extends RedissonObject implements Cache { } private long removeValues(Object... keys) { - return (Long) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUES, + return evalWrite(getName(), codec, EVAL_REMOVE_VALUES, "redis.call('zrem', KEYS[2], unpack(ARGV)); " + "return redis.call('hdel', KEYS[1], unpack(ARGV)); ", - Arrays.asList(getName(), getTimeoutSetName()), keys)); + Arrays.asList(getName(), getTimeoutSetName()), keys); } private List getAndPutValueLocked(K key, V value) { double syncId = ThreadLocalRandom.current().nextDouble(); if (containsKey(key)) { Long updateTimeout = getUpdateTimeout(); - List result = (List) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + List result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST, "local value = redis.call('hget', KEYS[1], ARGV[4]);" + "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " @@ -856,7 +871,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName()), - 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId)); + 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); result.add(syncId); waitSync(result); @@ -864,7 +879,7 @@ public class JCache extends RedissonObject implements Cache { } Long creationTimeout = getCreationTimeout(); - List result = (List) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + List result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST, "if ARGV[1] == '0' then " + "return {nil};" + "elseif ARGV[1] ~= '-1' then " @@ -884,7 +899,7 @@ public class JCache extends RedissonObject implements Cache { + "return {1, syncs};" + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getCreatedSyncChannelName()), - creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId)); + creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); result.add(syncId); waitSync(result); @@ -898,7 +913,7 @@ public class JCache extends RedissonObject implements Cache { double syncId = ThreadLocalRandom.current().nextDouble(); - List result = (List) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + List result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST, "local value = redis.call('hget', KEYS[1], ARGV[4]);" + "if value ~= false then " + "if ARGV[2] == '0' then " @@ -947,7 +962,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName()), - creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId)); + creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); if (!result.isEmpty()) { result.add(syncId); @@ -968,8 +983,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = currentNanoTime(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { List result = getAndPutValueLocked(key, value); if (result.isEmpty()) { @@ -1060,14 +1074,15 @@ public class JCache extends RedissonObject implements Cache { } } + List lockedLocks = new ArrayList(); for (Map.Entry entry : map.entrySet()) { K key = entry.getKey(); V value = entry.getValue(); long startTime = currentNanoTime(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); + lockedLocks.add(lock); List result = getAndPutValue(key, value); if (result.isEmpty()) { @@ -1133,8 +1148,8 @@ public class JCache extends RedissonObject implements Cache { throw new CacheWriterException(e); } } finally { - for (Map.Entry entry : map.entrySet()) { - getLock(entry.getKey()).unlock(); + for (RLock lock : lockedLocks) { + lock.unlock(); } } } @@ -1170,8 +1185,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = currentNanoTime(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { boolean result = putIfAbsentValueLocked(key, value); if (result) { @@ -1209,7 +1223,7 @@ public class JCache extends RedissonObject implements Cache { private boolean removeValue(K key) { double syncId = ThreadLocalRandom.current().nextDouble(); - List res = (List) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + List res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST, "local value = redis.call('hexists', KEYS[1], ARGV[2]); " + "if value == 0 then " + "return {0}; " @@ -1234,7 +1248,7 @@ public class JCache extends RedissonObject implements Cache { + "local syncs = redis.call('publish', KEYS[4], syncMsg); " + "return {1, syncs};", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), - System.currentTimeMillis(), encodeMapKey(key), syncId)); + System.currentTimeMillis(), encodeMapKey(key), syncId); res.add(syncId); waitSync(res); @@ -1252,8 +1266,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = System.currentTimeMillis(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { V oldValue = getValue(key); boolean result = removeValue(key); @@ -1291,7 +1304,7 @@ public class JCache extends RedissonObject implements Cache { private boolean removeValueLocked(K key, V value) { - Boolean result = (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_KEY_VALUE, + Boolean result = evalWrite(getName(), codec, EVAL_REMOVE_KEY_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return 0; " @@ -1316,12 +1329,12 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return nil;", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName()), - 0, System.currentTimeMillis(), key, value)); + 0, System.currentTimeMillis(), key, value); if (result == null) { Long accessTimeout = getAccessTimeout(); - return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_KEY_VALUE, + return evalWrite(getName(), codec, EVAL_REMOVE_KEY_VALUE, "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " @@ -1333,7 +1346,7 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return 0; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName()), - accessTimeout, System.currentTimeMillis(), key, value)); + accessTimeout, System.currentTimeMillis(), key, value); } return result; @@ -1342,7 +1355,7 @@ public class JCache extends RedissonObject implements Cache { private boolean removeValue(K key, V value) { Long accessTimeout = getAccessTimeout(); - return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_KEY_VALUE, + return evalWrite(getName(), codec, EVAL_REMOVE_KEY_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return 0; " @@ -1376,7 +1389,7 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return 0; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName()), - accessTimeout, System.currentTimeMillis(), key, value)); + accessTimeout, System.currentTimeMillis(), key, value); } @@ -1393,8 +1406,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = currentNanoTime(); boolean result; if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { result = removeValueLocked(key, value); if (result) { @@ -1439,7 +1451,7 @@ public class JCache extends RedissonObject implements Cache { private V getAndRemoveValue(K key) { double syncId = ThreadLocalRandom.current().nextDouble(); - List result = (List) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_REMOVE_VALUE_LIST, + List result = evalWrite(getName(), codec, EVAL_GET_REMOVE_VALUE_LIST, "local value = redis.call('hget', KEYS[1], ARGV[2]); " + "if value == false then " + "return {nil}; " @@ -1463,7 +1475,7 @@ public class JCache extends RedissonObject implements Cache { + "local syncs = redis.call('publish', KEYS[4], syncMsg); " + "return {value, syncs}; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), - System.currentTimeMillis(), encodeMapKey(key), syncId)); + System.currentTimeMillis(), encodeMapKey(key), syncId); if (result.isEmpty()) { return null; @@ -1485,8 +1497,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = currentNanoTime(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { Object value = getAndRemoveValue(key); if (value != null) { @@ -1530,7 +1541,7 @@ public class JCache extends RedissonObject implements Cache { } private long replaceValueLocked(K key, V oldValue, V newValue) { - Long res = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE, + Long res = evalWrite(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[4]); " + "if value == false then " + "return 0; " @@ -1551,12 +1562,12 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return -1;", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), - 0, 0, System.currentTimeMillis(), key, oldValue, newValue)); + 0, 0, System.currentTimeMillis(), key, oldValue, newValue); if (res == 1) { Long updateTimeout = getUpdateTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); - Long syncs = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, + Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG, "if ARGV[2] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " @@ -1581,7 +1592,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), - 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId)); + 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId); List result = Arrays.asList(syncs, syncId); waitSync(result); @@ -1594,7 +1605,7 @@ public class JCache extends RedissonObject implements Cache { Long accessTimeout = getAccessTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); - List result = (List) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST, + List result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST, "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " @@ -1610,7 +1621,7 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return {-1}; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), - accessTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId)); + accessTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId); result.add(syncId); waitSync(result); @@ -1623,7 +1634,7 @@ public class JCache extends RedissonObject implements Cache { Long updateTimeout = getUpdateTimeout(); - return (Long) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE, + return evalWrite(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[4]); " + "if value == false then " + "return 0; " @@ -1669,7 +1680,7 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return -1; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), - accessTimeout, updateTimeout, System.currentTimeMillis(), key, oldValue, newValue)); + accessTimeout, updateTimeout, System.currentTimeMillis(), key, oldValue, newValue); } @@ -1688,8 +1699,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = currentNanoTime(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { long result = replaceValueLocked(key, oldValue, newValue); if (result == 1) { @@ -1746,7 +1756,7 @@ public class JCache extends RedissonObject implements Cache { if (containsKey(key)) { double syncId = ThreadLocalRandom.current().nextDouble(); Long updateTimeout = getUpdateTimeout(); - Long syncs = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, + Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG, "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " @@ -1771,7 +1781,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), - updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId)); + updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); List result = Arrays.asList(syncs, syncId); waitSync(result); @@ -1786,7 +1796,7 @@ public class JCache extends RedissonObject implements Cache { private boolean replaceValue(K key, V value) { Long updateTimeout = getUpdateTimeout(); - return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_VALUE, + return evalWrite(getName(), codec, EVAL_REPLACE_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return 0; " @@ -1819,14 +1829,14 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return 1;", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), - updateTimeout, System.currentTimeMillis(), key, value)); + updateTimeout, System.currentTimeMillis(), key, value); } private V getAndReplaceValue(K key, V value) { Long updateTimeout = getUpdateTimeout(); - return (V) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_REPLACE, + return evalWrite(getName(), codec, EVAL_GET_REPLACE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return nil; " @@ -1859,12 +1869,12 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return value;", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), - updateTimeout, System.currentTimeMillis(), key, value)); + updateTimeout, System.currentTimeMillis(), key, value); } private V getAndReplaceValueLocked(K key, V value) { - V oldValue = (V) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_REPLACE, + V oldValue = evalWrite(getName(), codec, EVAL_GET_REPLACE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return nil; " @@ -1881,12 +1891,12 @@ public class JCache extends RedissonObject implements Cache { + "end; " + "return value;", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), - 0, System.currentTimeMillis(), key, value)); + 0, System.currentTimeMillis(), key, value); if (oldValue != null) { Long updateTimeout = getUpdateTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); - Long syncs = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, + Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG, "if ARGV[1] == '0' then " + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "redis.call('hdel', KEYS[1], ARGV[3]); " @@ -1911,7 +1921,7 @@ public class JCache extends RedissonObject implements Cache { + "end; ", Arrays.asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), - updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId)); + updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); List result = Arrays.asList(syncs, syncId); waitSync(result); @@ -1932,8 +1942,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = currentNanoTime(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { boolean result = replaceValueLocked(key, value); if (result) { @@ -1986,8 +1995,7 @@ public class JCache extends RedissonObject implements Cache { long startTime = currentNanoTime(); if (config.isWriteThrough()) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); try { V result = getAndReplaceValueLocked(key, value); if (result != null) { @@ -2041,11 +2049,12 @@ public class JCache extends RedissonObject implements Cache { } } + List lockedLocks = new ArrayList(); long startTime = currentNanoTime(); if (config.isWriteThrough()) { for (K key : keys) { - RLock lock = getLock(key); - lock.lock(); + RLock lock = getLockedLock(key); + lockedLocks.add(lock); V result = getAndRemoveValue(key); if (result != null) { deletedKeys.put(key, result); @@ -2072,8 +2081,8 @@ public class JCache extends RedissonObject implements Cache { } cacheManager.getStatBean(this).addRemovals(deletedKeys.size()); } finally { - for (K key : keys) { - getLock(key).unlock(); + for (RLock lock : lockedLocks) { + lock.unlock(); } } } else { @@ -2086,7 +2095,11 @@ public class JCache extends RedissonObject implements Cache { MapScanResult scanIterator(String name, InetSocketAddress client, long startPos) { RFuture> f = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); - return get(f); + try { + return get(f); + } catch (Exception e) { + throw new CacheException(e); + } } protected Iterator keyIterator() { @@ -2123,7 +2136,7 @@ public class JCache extends RedissonObject implements Cache { } } else { long startTime = currentNanoTime(); - long removedObjects = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG, + long removedObjects = evalWrite(getName(), codec, RedisCommands.EVAL_LONG, "local expiredEntriesCount = redis.call('zcount', KEYS[2], 0, ARGV[1]); " + "local result = 0; " + "if expiredEntriesCount > 0 then " @@ -2134,7 +2147,7 @@ public class JCache extends RedissonObject implements Cache { + "redis.call('del', KEYS[1], KEYS[2]); " + "return result; ", Arrays.asList(getName(), getTimeoutSetName()), - System.currentTimeMillis())); + System.currentTimeMillis()); cacheManager.getStatBean(this).addRemovals(removedObjects); cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime); } @@ -2143,7 +2156,7 @@ public class JCache extends RedissonObject implements Cache { @Override public void clear() { checkNotClosed(); - get(commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName())); + write(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()); } @Override @@ -2414,7 +2427,7 @@ public class JCache extends RedissonObject implements Cache { if (accessTimeout == 0) { remove(); } else if (accessTimeout != -1) { - get(commandExecutor.writeAsync(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, entry.getKey().getObj())); + write(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, entry.getKey().getObj()); } return je; }