JSR107 cache implementation should throw javax.cache.CacheException #850

pull/672/merge
Nikita 8 years ago
parent 5b3bb84712
commit 96e7baad84

@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.CacheManager;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Configuration;
@ -57,6 +58,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -209,7 +211,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
V getValueLocked(K key) {
V value = (V) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL,
V value = evalWrite(getName(), codec, EVAL_GET_TTL,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
@ -226,7 +228,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return value; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
0, System.currentTimeMillis(), key));
0, System.currentTimeMillis(), key);
if (value != null) {
List<Object> result = new ArrayList<Object>(3);
@ -234,7 +236,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
Long accessTimeout = getAccessTimeout();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
@ -250,7 +252,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(),
getRemovedSyncChannelName()),
accessTimeout, System.currentTimeMillis(), encodeMapKey(key), syncId));
accessTimeout, System.currentTimeMillis(), encodeMapKey(key), syncId);
result.add(syncs);
result.add(syncId);
@ -265,7 +267,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private V getValue(K key) {
Long accessTimeout = getAccessTimeout();
V value = (V) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL,
V value = evalWrite(getName(), codec, EVAL_GET_TTL,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
@ -292,7 +294,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "return value; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
accessTimeout, System.currentTimeMillis(), key));
accessTimeout, System.currentTimeMillis(), key);
return value;
}
@ -311,8 +313,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
V load(K key) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
V value = getValueLocked(key);
if (value == null) {
@ -339,12 +340,30 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
return value;
}
private <T, R> R write(String key, RedisCommand<T> command, Object ... params) {
RFuture<R> future = commandExecutor.writeAsync(key, command, params);
try {
return get(future);
} catch (Exception e) {
throw new CacheException(e);
}
}
private <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
RFuture<R> future = commandExecutor.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
try {
return get(future);
} catch (Exception e) {
throw new CacheException(e);
}
}
private boolean putValueLocked(K key, Object value) {
double syncId = ThreadLocalRandom.current().nextDouble();
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
List<Object> res = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
List<Object> res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
@ -372,7 +391,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId));
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
res.add(syncId);
waitSync(res);
@ -381,7 +400,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
Long creationTimeout = getCreationTimeout();
List<Object> res = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
List<Object> res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"if ARGV[1] == '0' then "
+ "return {0};"
+ "elseif ARGV[1] ~= '-1' then "
@ -402,7 +421,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId));
creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
res.add(syncId);
waitSync(res);
@ -417,7 +436,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
Long creationTimeout = getCreationTimeout();
Long updateTimeout = getUpdateTimeout();
List<Object> res = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
List<Object> res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"if redis.call('hexists', KEYS[1], ARGV[4]) == 1 then "
+ "if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
@ -466,7 +485,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId));
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
res.add(syncId);
waitSync(res);
@ -504,7 +523,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private boolean putIfAbsentValue(K key, Object value) {
Long creationTimeout = getCreationTimeout();
return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_IF_ABSENT,
return evalWrite(getName(), codec, EVAL_PUT_IF_ABSENT,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then "
+ "return 0; "
+ "else "
@ -524,7 +543,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName()),
creationTimeout, key, value));
creationTimeout, key, value);
}
private boolean putIfAbsentValueLocked(K key, Object value) {
@ -533,7 +552,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
Long creationTimeout = getCreationTimeout();
return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_IF_ABSENT,
return evalWrite(getName(), codec, EVAL_PUT_IF_ABSENT,
"if ARGV[1] == '0' then "
+ "return 0;"
+ "elseif ARGV[1] ~= '-1' then "
@ -549,7 +568,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "return 1;"
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName()),
creationTimeout, key, value));
creationTimeout, key, value);
}
@ -590,7 +609,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
args.add(System.currentTimeMillis());
args.addAll(keys);
Map<K, V> res = (Map<K, V>) get(commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args, 2, true), 8, ValueType.MAP_KEY, ValueType.MAP_VALUE),
Map<K, V> res = evalWrite(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args, 2, true), 8, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');"
+ "local accessTimeout = ARGV[1]; "
+ "local currentTime = tonumber(ARGV[2]); "
@ -625,7 +644,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "table.insert(result, value); "
+ "end; "
+ "return result;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()), args.toArray()));
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()), args.toArray());
Map<K, V> result = new HashMap<K, V>();
for (Map.Entry<K, V> entry : res.entrySet()) {
@ -655,7 +674,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
throw new NullPointerException();
}
return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_KEY,
return evalWrite(getName(), codec, EVAL_CONTAINS_KEY,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 0 then "
+ "return 0;"
+ "end;"
@ -671,7 +690,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return 1;",
Arrays.<Object>asList(getName(), getTimeoutSetName()),
System.currentTimeMillis(), key));
System.currentTimeMillis(), key);
}
@Override
@ -700,8 +719,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
for (K key : keys) {
try {
if (!containsKey(key) || replaceExistingValues) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
if (!containsKey(key)|| replaceExistingValues) {
V value;
@ -732,16 +750,14 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
});
}
private RLock getLock(K key) {
String lockName = getLockName(key);
RLock lock = redisson.getLock(lockName);
return lock;
}
private RLock getLockedLock(K key) {
String lockName = getLockName(key);
RLock lock = redisson.getLock(lockName);
try {
lock.lock();
} catch (Exception e) {
throw new CacheException(e);
}
return lock;
}
@ -758,8 +774,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
List<Object> result = getAndPutValueLocked(key, value);
if (result.isEmpty()) {
@ -818,17 +833,17 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
private long removeValues(Object... keys) {
return (Long) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_VALUES,
return evalWrite(getName(), codec, EVAL_REMOVE_VALUES,
"redis.call('zrem', KEYS[2], unpack(ARGV)); "
+ "return redis.call('hdel', KEYS[1], unpack(ARGV)); ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), keys));
Arrays.<Object>asList(getName(), getTimeoutSetName()), keys);
}
private List<Object> getAndPutValueLocked(K key, V value) {
double syncId = ThreadLocalRandom.current().nextDouble();
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
List<Object> result = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
List<Object> result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
@ -856,7 +871,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName()),
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId));
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
result.add(syncId);
waitSync(result);
@ -864,7 +879,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
Long creationTimeout = getCreationTimeout();
List<Object> result = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
List<Object> result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"if ARGV[1] == '0' then "
+ "return {nil};"
+ "elseif ARGV[1] ~= '-1' then "
@ -884,7 +899,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "return {1, syncs};"
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getCreatedSyncChannelName()),
creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId));
creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
result.add(syncId);
waitSync(result);
@ -898,7 +913,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
double syncId = ThreadLocalRandom.current().nextDouble();
List<Object> result = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
List<Object> result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "if value ~= false then "
+ "if ARGV[2] == '0' then "
@ -947,7 +962,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName()),
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId));
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
if (!result.isEmpty()) {
result.add(syncId);
@ -968,8 +983,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
List<Object> result = getAndPutValueLocked(key, value);
if (result.isEmpty()) {
@ -1060,14 +1074,15 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
}
List<RLock> lockedLocks = new ArrayList<RLock>();
for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
K key = entry.getKey();
V value = entry.getValue();
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
lockedLocks.add(lock);
List<Object> result = getAndPutValue(key, value);
if (result.isEmpty()) {
@ -1133,8 +1148,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
throw new CacheWriterException(e);
}
} finally {
for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
getLock(entry.getKey()).unlock();
for (RLock lock : lockedLocks) {
lock.unlock();
}
}
}
@ -1170,8 +1185,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
boolean result = putIfAbsentValueLocked(key, value);
if (result) {
@ -1209,7 +1223,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private boolean removeValue(K key) {
double syncId = ThreadLocalRandom.current().nextDouble();
List<Object> res = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
List<Object> res = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"local value = redis.call('hexists', KEYS[1], ARGV[2]); "
+ "if value == 0 then "
+ "return {0}; "
@ -1234,7 +1248,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "local syncs = redis.call('publish', KEYS[4], syncMsg); "
+ "return {1, syncs};",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()),
System.currentTimeMillis(), encodeMapKey(key), syncId));
System.currentTimeMillis(), encodeMapKey(key), syncId);
res.add(syncId);
waitSync(res);
@ -1252,8 +1266,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = System.currentTimeMillis();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
V oldValue = getValue(key);
boolean result = removeValue(key);
@ -1291,7 +1304,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private boolean removeValueLocked(K key, V value) {
Boolean result = (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_KEY_VALUE,
Boolean result = evalWrite(getName(), codec, EVAL_REMOVE_KEY_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return 0; "
@ -1316,12 +1329,12 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
0, System.currentTimeMillis(), key, value));
0, System.currentTimeMillis(), key, value);
if (result == null) {
Long accessTimeout = getAccessTimeout();
return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_KEY_VALUE,
return evalWrite(getName(), codec, EVAL_REMOVE_KEY_VALUE,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
@ -1333,7 +1346,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
accessTimeout, System.currentTimeMillis(), key, value));
accessTimeout, System.currentTimeMillis(), key, value);
}
return result;
@ -1342,7 +1355,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private boolean removeValue(K key, V value) {
Long accessTimeout = getAccessTimeout();
return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_KEY_VALUE,
return evalWrite(getName(), codec, EVAL_REMOVE_KEY_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return 0; "
@ -1376,7 +1389,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
accessTimeout, System.currentTimeMillis(), key, value));
accessTimeout, System.currentTimeMillis(), key, value);
}
@ -1393,8 +1406,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = currentNanoTime();
boolean result;
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
result = removeValueLocked(key, value);
if (result) {
@ -1439,7 +1451,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private V getAndRemoveValue(K key) {
double syncId = ThreadLocalRandom.current().nextDouble();
List<Object> result = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_REMOVE_VALUE_LIST,
List<Object> result = evalWrite(getName(), codec, EVAL_GET_REMOVE_VALUE_LIST,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
+ "return {nil}; "
@ -1463,7 +1475,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "local syncs = redis.call('publish', KEYS[4], syncMsg); "
+ "return {value, syncs}; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()),
System.currentTimeMillis(), encodeMapKey(key), syncId));
System.currentTimeMillis(), encodeMapKey(key), syncId);
if (result.isEmpty()) {
return null;
@ -1485,8 +1497,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
Object value = getAndRemoveValue(key);
if (value != null) {
@ -1530,7 +1541,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
private long replaceValueLocked(K key, V oldValue, V newValue) {
Long res = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE,
Long res = evalWrite(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "if value == false then "
+ "return 0; "
@ -1551,12 +1562,12 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return -1;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
0, 0, System.currentTimeMillis(), key, oldValue, newValue));
0, 0, System.currentTimeMillis(), key, oldValue, newValue);
if (res == 1) {
Long updateTimeout = getUpdateTimeout();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
@ -1581,7 +1592,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId));
0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId);
List<Object> result = Arrays.<Object>asList(syncs, syncId);
waitSync(result);
@ -1594,7 +1605,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
Long accessTimeout = getAccessTimeout();
double syncId = ThreadLocalRandom.current().nextDouble();
List<Object> result = (List<Object>) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
List<Object> result = evalWrite(getName(), codec, RedisCommands.EVAL_LIST,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
@ -1610,7 +1621,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return {-1}; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()),
accessTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId));
accessTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId);
result.add(syncId);
waitSync(result);
@ -1623,7 +1634,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
Long updateTimeout = getUpdateTimeout();
return (Long) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE,
return evalWrite(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "if value == false then "
+ "return 0; "
@ -1669,7 +1680,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return -1; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
accessTimeout, updateTimeout, System.currentTimeMillis(), key, oldValue, newValue));
accessTimeout, updateTimeout, System.currentTimeMillis(), key, oldValue, newValue);
}
@ -1688,8 +1699,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
long result = replaceValueLocked(key, oldValue, newValue);
if (result == 1) {
@ -1746,7 +1756,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
if (containsKey(key)) {
double syncId = ThreadLocalRandom.current().nextDouble();
Long updateTimeout = getUpdateTimeout();
Long syncs = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
@ -1771,7 +1781,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId));
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
List<Object> result = Arrays.<Object>asList(syncs, syncId);
waitSync(result);
@ -1786,7 +1796,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
private boolean replaceValue(K key, V value) {
Long updateTimeout = getUpdateTimeout();
return (Boolean) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_VALUE,
return evalWrite(getName(), codec, EVAL_REPLACE_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return 0; "
@ -1819,14 +1829,14 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return 1;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
updateTimeout, System.currentTimeMillis(), key, value));
updateTimeout, System.currentTimeMillis(), key, value);
}
private V getAndReplaceValue(K key, V value) {
Long updateTimeout = getUpdateTimeout();
return (V) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_REPLACE,
return evalWrite(getName(), codec, EVAL_GET_REPLACE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
@ -1859,12 +1869,12 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return value;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
updateTimeout, System.currentTimeMillis(), key, value));
updateTimeout, System.currentTimeMillis(), key, value);
}
private V getAndReplaceValueLocked(K key, V value) {
V oldValue = (V) get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_REPLACE,
V oldValue = evalWrite(getName(), codec, EVAL_GET_REPLACE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
@ -1881,12 +1891,12 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; "
+ "return value;", Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
0, System.currentTimeMillis(), key, value));
0, System.currentTimeMillis(), key, value);
if (oldValue != null) {
Long updateTimeout = getUpdateTimeout();
double syncId = ThreadLocalRandom.current().nextDouble();
Long syncs = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
Long syncs = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"if ARGV[1] == '0' then "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
@ -1911,7 +1921,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getUpdatedSyncChannelName()),
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId));
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
List<Object> result = Arrays.<Object>asList(syncs, syncId);
waitSync(result);
@ -1932,8 +1942,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
boolean result = replaceValueLocked(key, value);
if (result) {
@ -1986,8 +1995,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
try {
V result = getAndReplaceValueLocked(key, value);
if (result != null) {
@ -2041,11 +2049,12 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
}
List<RLock> lockedLocks = new ArrayList<RLock>();
long startTime = currentNanoTime();
if (config.isWriteThrough()) {
for (K key : keys) {
RLock lock = getLock(key);
lock.lock();
RLock lock = getLockedLock(key);
lockedLocks.add(lock);
V result = getAndRemoveValue(key);
if (result != null) {
deletedKeys.put(key, result);
@ -2072,8 +2081,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
cacheManager.getStatBean(this).addRemovals(deletedKeys.size());
} finally {
for (K key : keys) {
getLock(key).unlock();
for (RLock lock : lockedLocks) {
lock.unlock();
}
}
} else {
@ -2086,7 +2095,11 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
try {
return get(f);
} catch (Exception e) {
throw new CacheException(e);
}
}
protected Iterator<K> keyIterator() {
@ -2123,7 +2136,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
} else {
long startTime = currentNanoTime();
long removedObjects = (Long) get(commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
long removedObjects = evalWrite(getName(), codec, RedisCommands.EVAL_LONG,
"local expiredEntriesCount = redis.call('zcount', KEYS[2], 0, ARGV[1]); "
+ "local result = 0; "
+ "if expiredEntriesCount > 0 then "
@ -2134,7 +2147,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
+ "redis.call('del', KEYS[1], KEYS[2]); "
+ "return result; ",
Arrays.<Object>asList(getName(), getTimeoutSetName()),
System.currentTimeMillis()));
System.currentTimeMillis());
cacheManager.getStatBean(this).addRemovals(removedObjects);
cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime);
}
@ -2143,7 +2156,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
@Override
public void clear() {
checkNotClosed();
get(commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName()));
write(getName(), RedisCommands.DEL_OBJECTS, getName(), getTimeoutSetName());
}
@Override
@ -2414,7 +2427,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
if (accessTimeout == 0) {
remove();
} else if (accessTimeout != -1) {
get(commandExecutor.writeAsync(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, entry.getKey().getObj()));
write(getName(), RedisCommands.ZADD_BOOL, getTimeoutSetName(), accessTimeout, entry.getKey().getObj());
}
return je;
}

Loading…
Cancel
Save