Locking added

pull/748/head
Nikita 8 years ago
parent 5f19403ded
commit dfcf7d2f0b

@ -170,17 +170,62 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
throw new NullPointerException();
}
long startTime = currentNanoTime();
V value = getValue(key);
if (value == null) {
cacheManager.getStatBean(this).addMisses(1);
if (config.isReadThrough()) {
value = load(key);
RLock lock = getLockedLock(key);
try {
V value = getValueLocked(key);
if (value == null) {
cacheManager.getStatBean(this).addMisses(1);
if (config.isReadThrough()) {
value = loadValue(key);
}
} else {
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addHits(1);
}
} else {
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addHits(1);
return value;
} finally {
lock.unlock();
}
}
V getValueLocked(K key) {
V value = (V)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
+ "end; "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore); "
+ "end; "
+ "if expireDate <= tonumber(ARGV[2]) then "
+ "return nil; "
+ "end; "
+ "return value; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
0, System.currentTimeMillis(), key));
if (value != null) {
Long accessTimeout = getAccessTimeout();
get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
accessTimeout, System.currentTimeMillis(), key));
}
return value;
}
@ -232,22 +277,13 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
return accessTimeout;
}
public V load(K key) {
V load(K key) {
RLock lock = getLock(key);
lock.lock(30, TimeUnit.MINUTES);
try {
V value = getValue(key);
V value = getValueLocked(key);
if (value == null) {
try {
value = cacheLoader.load(key);
} catch (Exception ex) {
throw new CacheLoaderException(ex);
}
if (value != null) {
long startTime = currentNanoTime();
putValue(key, value);
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
}
value = loadValue(key);
}
return value;
} finally {
@ -255,6 +291,69 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
}
private V loadValue(K key) {
V value = null;
try {
value = cacheLoader.load(key);
} catch (Exception ex) {
throw new CacheLoaderException(ex);
}
if (value != null) {
long startTime = currentNanoTime();
putValueLocked(key, value);
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
}
return value;
}
private boolean putValueLocked(K key, Object value) {
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
return (Boolean)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return 0;"
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('publish', KEYS[5], msg); "
+ "return 1;"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('publish', KEYS[5], msg); "
+ "return 1;"
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName()),
0, updateTimeout, System.currentTimeMillis(), key, value));
}
Long creationTimeout = getCreationTimeout();
return (Boolean)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"if ARGV[1] == '0' then "
+ "return 0;"
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('publish', KEYS[3], msg); "
+ "return 1;"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('publish', KEYS[3], msg); "
+ "return 1;"
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName()),
creationTimeout, 0, System.currentTimeMillis(), key, value));
}
private boolean putValue(K key, Object value) {
Long creationTimeout = getCreationTimeout();
Long updateTimeout = getUpdateTimeout();
@ -352,6 +451,31 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName()),
creationTimeout, key, value));
}
private boolean putIfAbsentValueLocked(K key, Object value) {
if (containsKey(key)) {
return false;
}
Long creationTimeout = getCreationTimeout();
return (Boolean)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_IF_ABSENT,
"if ARGV[1] == '0' then "
+ "return 0;"
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('publish', KEYS[3], msg); "
+ "return 1;"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('publish', KEYS[3], msg); "
+ "return 1;"
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getCreatedChannelName()),
creationTimeout, key, value));
}
private String getLockName(Object key) {
@ -370,8 +494,19 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
throw new NullPointerException();
}
}
long startTime = currentNanoTime();
boolean exists = false;
for (K key : keys) {
if (containsKey(key)) {
exists = true;
}
}
if (!exists && !config.isReadThrough()) {
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
return Collections.emptyMap();
}
Long accessTimeout = getAccessTimeout();
@ -501,7 +636,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
throw new CacheLoaderException(ex);
}
if (value != null) {
putValue(key, value);
putValueLocked(key, value);
}
}
} finally {
@ -527,6 +662,14 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
RLock lock = redisson.getLock(lockName);
return lock;
}
private RLock getLockedLock(K key) {
String lockName = getLockName(key);
RLock lock = redisson.getLock(lockName);
lock.lock(30, TimeUnit.MINUTES);
return lock;
}
@Override
public void put(K key, V value) {
@ -543,7 +686,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
RLock lock = getLock(key);
lock.lock(30, TimeUnit.MINUTES);
try {
List<Object> result = getAndPutValue(key, value);
List<Object> result = getAndPutValueLocked(key, value);
if (result.isEmpty()) {
cacheManager.getStatBean(this).addPuts(1);
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
@ -586,9 +729,14 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
lock.unlock();
}
} else {
boolean result = putValue(key, value);
if (result) {
cacheManager.getStatBean(this).addPuts(1);
RLock lock = getLockedLock(key);
try {
boolean result = putValueLocked(key, value);
if (result) {
cacheManager.getStatBean(this).addPuts(1);
}
} finally {
lock.unlock();
}
}
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
@ -601,6 +749,53 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
Arrays.<Object>asList(getName(), getTimeoutSetName()), keys));
}
private List<Object> getAndPutValueLocked(K key, V value) {
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
return (List<Object>)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_PUT,
"local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "return {0, value};"
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('publish', KEYS[5], msg); "
+ "return {1, value};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('publish', KEYS[5], msg); "
+ "return {1, value};"
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName()),
0, updateTimeout, System.currentTimeMillis(), key, value));
}
Long creationTimeout = getCreationTimeout();
return (List<Object>)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_PUT,
"if ARGV[1] == '0' then "
+ "return {nil};"
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return {1};"
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "return {1};"
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName()),
creationTimeout, 0, System.currentTimeMillis(), key, value));
}
private List<Object> getAndPutValue(K key, V value) {
Long creationTimeout = getCreationTimeout();
@ -662,7 +857,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
RLock lock = getLock(key);
lock.lock(30, TimeUnit.MINUTES);
try {
List<Object> result = getAndPutValue(key, value);
List<Object> result = getAndPutValueLocked(key, value);
if (result.isEmpty()) {
cacheManager.getStatBean(this).addPuts(1);
cacheManager.getStatBean(this).addMisses(1);
@ -720,19 +915,24 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
lock.unlock();
}
} else {
List<Object> result = getAndPutValue(key, value);
if (result.size() < 2) {
RLock lock = getLockedLock(key);
try {
List<Object> result = getAndPutValueLocked(key, value);
if (result.size() < 2) {
cacheManager.getStatBean(this).addPuts(1);
cacheManager.getStatBean(this).addMisses(1);
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return null;
}
cacheManager.getStatBean(this).addPuts(1);
cacheManager.getStatBean(this).addMisses(1);
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return null;
return (V) result.get(1);
} finally {
lock.unlock();
}
cacheManager.getStatBean(this).addPuts(1);
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return (V) result.get(1);
}
}
@ -847,7 +1047,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
RLock lock = getLock(key);
lock.lock(30, TimeUnit.MINUTES);
try {
boolean result = putIfAbsentValue(key, value);
boolean result = putIfAbsentValueLocked(key, value);
if (result) {
cacheManager.getStatBean(this).addPuts(1);
try {
@ -866,12 +1066,17 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
lock.unlock();
}
} else {
boolean result = putIfAbsentValue(key, value);
if (result) {
cacheManager.getStatBean(this).addPuts(1);
RLock lock = getLockedLock(key);
try {
boolean result = putIfAbsentValueLocked(key, value);
if (result) {
cacheManager.getStatBean(this).addPuts(1);
}
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return result;
} finally {
lock.unlock();
}
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return result;
}
}
@ -949,6 +1154,56 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
private boolean removeValueLocked(K key, V value) {
Boolean result = (Boolean)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_KEY_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return 0; "
+ "end; "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore); "
+ "end; "
+ "if expireDate <= tonumber(ARGV[2]) then "
+ "return 0; "
+ "end; "
+ "if ARGV[4] == value then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "return 1; "
+ "end; "
+ "return nil;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
0, System.currentTimeMillis(), key, value));
if (result == null) {
Long accessTimeout = getAccessTimeout();
return (Boolean)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE_KEY_VALUE,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "end; "
+ "return 0; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName()),
accessTimeout, System.currentTimeMillis(), key, value));
}
return result;
}
private boolean removeValue(K key, V value) {
Long accessTimeout = getAccessTimeout();
@ -1006,7 +1261,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
RLock lock = getLock(key);
lock.lock(30, TimeUnit.MINUTES);
try {
result = removeValue(key, value);
result = removeValueLocked(key, value);
if (result) {
try {
cacheWriter.delete(key);
@ -1030,15 +1285,20 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
lock.unlock();
}
} else {
result = removeValue(key, value);
if (result) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addRemovals(1);
} else {
cacheManager.getStatBean(this).addMisses(1);
RLock lock = getLockedLock(key);
try {
result = removeValueLocked(key, value);
if (result) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addRemovals(1);
} else {
cacheManager.getStatBean(this).addMisses(1);
}
cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime);
return result;
} finally {
lock.unlock();
}
cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime);
return result;
}
}
@ -1122,6 +1382,76 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
}
private long replaceValueLocked(K key, V oldValue, V newValue) {
Long res = (Long)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "if value == false then "
+ "return 0; "
+ "end; "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[4]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore); "
+ "end; "
+ "if expireDate <= tonumber(ARGV[3]) then "
+ "return 0; "
+ "end; "
+ "if ARGV[5] == value then "
+ "return 1;"
+ "end; "
+ "return -1;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
0, 0, System.currentTimeMillis(), key, oldValue, newValue));
if (res == 1) {
Long updateTimeout = getUpdateTimeout();
get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE,
"if ARGV[2] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "elseif ARGV[2] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
0, updateTimeout, System.currentTimeMillis(), key, oldValue, newValue));
return res;
} else if (res == 0) {
return res;
}
Long accessTimeout = getAccessTimeout();
return (Long)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_OLD_NEW_VALUE,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[4]); "
+ "redis.call('zrem', KEYS[2], ARGV[4]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(value), value); "
+ "redis.call('publish', KEYS[3], msg); "
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "return 0;"
+ "end; "
+ "return -1; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
accessTimeout, 0, System.currentTimeMillis(), key, oldValue, newValue));
}
private long replaceValue(K key, V oldValue, V newValue) {
Long accessTimeout = getAccessTimeout();
@ -1195,7 +1525,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
RLock lock = getLock(key);
lock.lock(30, TimeUnit.MINUTES);
try {
long result = replaceValue(key, oldValue, newValue);
long result = replaceValueLocked(key, oldValue, newValue);
if (result == 1) {
try {
cacheWriter.write(new JCacheEntry<K, V>(key, newValue));
@ -1225,21 +1555,57 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
lock.unlock();
}
} else {
long result = replaceValue(key, oldValue, newValue);
if (result == 1) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addPuts(1);
} else if (result == 0){
cacheManager.getStatBean(this).addMisses(1);
} else {
cacheManager.getStatBean(this).addHits(1);
RLock lock = getLockedLock(key);
try {
long result = replaceValueLocked(key, oldValue, newValue);
if (result == 1) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addPuts(1);
} else if (result == 0){
cacheManager.getStatBean(this).addMisses(1);
} else {
cacheManager.getStatBean(this).addHits(1);
}
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return result == 1;
} finally {
lock.unlock();
}
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return result == 1;
}
}
private boolean replaceValueLocked(K key, V value) {
if (containsKey(key)) {
Long updateTimeout = getUpdateTimeout();
return (Boolean)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_REPLACE_VALUE,
"if ARGV[1] == '0' then "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return 1;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
updateTimeout, System.currentTimeMillis(), key, value));
}
return false;
}
private boolean replaceValue(K key, V value) {
Long updateTimeout = getUpdateTimeout();
@ -1319,6 +1685,51 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
updateTimeout, System.currentTimeMillis(), key, value));
}
private V getAndReplaceValueLocked(K key, V value) {
V oldValue = (V)get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_REPLACE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
+ "end; "
+ "local expireDate = 92233720368547758; "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore); "
+ "end; "
+ "if expireDate <= tonumber(ARGV[2]) then "
+ "return nil; "
+ "end; "
+ "return value;", Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
0, System.currentTimeMillis(), key, value));
if (oldValue != null) {
Long updateTimeout = getUpdateTimeout();
get(commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_REPLACE,
"if ARGV[1] == '0' then "
+ "local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "redis.call('hdel', KEYS[1], ARGV[3]); "
+ "redis.call('zrem', KEYS[2], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "elseif ARGV[1] ~= '-1' then "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "else "
+ "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4]); "
+ "redis.call('publish', KEYS[4], msg); "
+ "end; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()),
updateTimeout, System.currentTimeMillis(), key, value));
}
return oldValue;
}
@Override
@ -1336,7 +1747,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
RLock lock = getLock(key);
lock.lock(30, TimeUnit.MINUTES);
try {
boolean result = replaceValue(key, value);
boolean result = replaceValueLocked(key, value);
if (result) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addPuts(1);
@ -1358,15 +1769,20 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
lock.unlock();
}
} else {
boolean result = replaceValue(key, value);
if (result) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addPuts(1);
} else {
cacheManager.getStatBean(this).addMisses(1);
RLock lock = getLockedLock(key);
try {
boolean result = replaceValueLocked(key, value);
if (result) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addPuts(1);
} else {
cacheManager.getStatBean(this).addMisses(1);
}
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return result;
} finally {
lock.unlock();
}
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
return result;
}
}
@ -1385,7 +1801,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
RLock lock = getLock(key);
lock.lock(30, TimeUnit.MINUTES);
try {
V result = getAndReplaceValue(key, value);
V result = getAndReplaceValueLocked(key, value);
if (result != null) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addPuts(1);
@ -1408,16 +1824,21 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
lock.unlock();
}
} else {
V result = getAndReplaceValue(key, value);
if (result != null) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addPuts(1);
} else {
cacheManager.getStatBean(this).addMisses(1);
RLock lock = getLockedLock(key);
try {
V result = getAndReplaceValueLocked(key, value);
if (result != null) {
cacheManager.getStatBean(this).addHits(1);
cacheManager.getStatBean(this).addPuts(1);
} else {
cacheManager.getStatBean(this).addMisses(1);
}
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
return result;
} finally {
lock.unlock();
}
cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime);
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
return result;
}
}
@ -1558,20 +1979,20 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
}
long startTime = currentNanoTime();
V value = getValue(key);
if (value != null) {
if (containsKey(key)) {
cacheManager.getStatBean(this).addHits(1);
} else {
cacheManager.getStatBean(this).addMisses(1);
}
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
JMutableEntry<K, V> entry = new JMutableEntry<K, V>(this, value, key, config.isReadThrough());
JMutableEntry<K, V> entry = new JMutableEntry<K, V>(this, key, null, config.isReadThrough());
try {
T result = entryProcessor.process(entry, arguments);
if (entry.getAction() == Action.CREATED
|| entry.getAction() == Action.UPDATED) {
put(key, entry.getValue());
put(key, entry.value);
}
if (entry.getAction() == Action.DELETED) {
remove(key);

@ -34,12 +34,13 @@ public class JMutableEntry<K, V> implements MutableEntry<K, V> {
Action action = Action.SKIPPED;
V value;
boolean isValueRead;
public JMutableEntry(JCache<K, V> jCache, V value, K key, boolean isReadThrough) {
public JMutableEntry(JCache<K, V> jCache, K key, V value, boolean isReadThrough) {
super();
this.jCache = jCache;
this.value = value;
this.key = key;
this.value = value;
this.isReadThrough = isReadThrough;
}
@ -53,6 +54,12 @@ public class JMutableEntry<K, V> implements MutableEntry<K, V> {
if (action != Action.SKIPPED) {
return value;
}
if (!isValueRead) {
value = jCache.getValueLocked(key);
isValueRead = true;
}
if (value != null) {
action = Action.READ;
} else if (isReadThrough) {
@ -64,7 +71,7 @@ public class JMutableEntry<K, V> implements MutableEntry<K, V> {
}
return value;
}
@Override
public <T> T unwrap(Class<T> clazz) {
return (T)this;
@ -92,7 +99,7 @@ public class JMutableEntry<K, V> implements MutableEntry<K, V> {
}
if (action != Action.CREATED) {
if (exists()) {
if (jCache.containsKey(key)) {
action = Action.UPDATED;
} else {
action = Action.CREATED;

Loading…
Cancel
Save