refactoring

pull/3696/head
Nikita Koksharov 4 years ago
parent 8a31a83696
commit 82f374c901

@ -27,6 +27,8 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapValueDecoder;
import org.redisson.codec.BaseEventCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.iterator.RedissonBaseMapIterator;
import org.redisson.jcache.JMutableEntry.Action;
@ -124,7 +126,11 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
String getTimeoutSetName() {
return "jcache_timeout_set:{" + getRawName() + "}";
}
String getTimeoutSetName(String name) {
return prefixName("jcache_timeout_set", name);
}
String getSyncName(Object syncId) {
return "jcache_sync:" + syncId + ":{" + getRawName() + "}";
}
@ -133,26 +139,50 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return "jcache_created_sync_channel:{" + getRawName() + "}";
}
String getCreatedSyncChannelName(String name) {
return prefixName("jcache_created_sync_channel", name);
}
String getUpdatedSyncChannelName() {
return "jcache_updated_sync_channel:{" + getRawName() + "}";
}
String getUpdatedSyncChannelName(String name) {
return prefixName("jcache_updated_sync_channel", name);
}
String getRemovedSyncChannelName() {
return "jcache_removed_sync_channel:{" + getRawName() + "}";
}
String getRemovedSyncChannelName(String name) {
return prefixName("jcache_removed_sync_channel", name);
}
String getCreatedChannelName() {
return "jcache_created_channel:{" + getRawName() + "}";
}
String getCreatedChannelName(String name) {
return prefixName("jcache_created_channel", name);
}
String getUpdatedChannelName() {
return "jcache_updated_channel:{" + getRawName() + "}";
}
String getUpdatedChannelName(String name) {
return prefixName("jcache_updated_channel", name);
}
String getExpiredChannelName() {
return "jcache_expired_channel:{" + getRawName() + "}";
}
String getRemovedChannelName(String name) {
return prefixName("jcache_removed_channel", name);
}
String getRemovedChannelName() {
return "jcache_removed_channel:{" + getRawName() + "}";
}
@ -161,6 +191,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return "jcache_old_value_listeners:{" + getRawName() + "}";
}
String getOldValueListenerCounter(String name) {
return prefixName("jcache_old_value_listeners", name);
}
long currentNanoTime() {
if (config.isStatisticsEnabled()) {
return System.nanoTime();
@ -168,7 +202,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return 0;
}
protected void checkKey(Object key) {
void checkKey(Object key) {
if (key == null) {
throw new NullPointerException();
}
@ -286,7 +320,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
Long accessTimeout = getAccessTimeout();
if (accessTimeout == -1) {
return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE,
String name = getRawName(key);
return commandExecutor.evalReadAsync(name, codec, RedisCommands.EVAL_MAP_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
@ -298,11 +333,12 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "end; "
+ "return value; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()),
Arrays.<Object>asList(name, getTimeoutSetName(name), getRemovedChannelName(name)),
accessTimeout, System.currentTimeMillis(), encodeMapKey(key));
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE,
String name = getRawName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
@ -323,7 +359,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "end; "
+ "return value; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()),
Arrays.<Object>asList(name, getTimeoutSetName(name), getRemovedChannelName(name)),
accessTimeout, System.currentTimeMillis(), encodeMapKey(key));
}
@ -499,9 +535,66 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
params.add(encodeMapKey(entry.getKey()));
params.add(encodeMapValue(entry.getValue()));
}
RFuture<List<Object>> res = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
"local added = 0; "
RFuture<List<Object>> res = putAllOperation(commandExecutor, null, getRawName(), params);
RFuture<Long> result = handlePutAllResult(syncId, res);
return result;
}
RFuture<Long> handlePutAllResult(double syncId, RFuture<List<Object>> res) {
RPromise<Long> result = new RedissonPromise<>();
if (atomicExecution) {
res.onComplete((r, e) -> {
if (e != null) {
result.tryFailure(new CacheException(e));
return;
}
Long added = (Long) r.get(0);
Long syncs = (Long) r.get(1);
if (syncs > 0) {
RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId));
semaphore.acquireAsync(syncs.intValue()).onComplete((obj1, ex) -> {
if (ex != null) {
result.tryFailure(new CacheException(ex));
return;
}
semaphore.deleteAsync().onComplete((obj, exc) -> {
if (exc != null) {
result.tryFailure(new CacheException(exc));
return;
}
result.trySuccess(added);
});
});
} else {
result.trySuccess(added);
}
});
} else {
res.syncUninterruptibly();
List<Object> r = res.getNow();
Long added = (Long) r.get(0);
Long syncs = (Long) r.get(1);
if (syncs > 0) {
RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId));
try {
semaphore.acquire(syncs.intValue());
semaphore.delete();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
result.trySuccess(added);
}
return result;
}
RFuture<List<Object>> putAllOperation(CommandAsyncExecutor commandExecutor, MasterSlaveEntry entry, String name, List<Object> params) {
String script = "local added = 0; "
+ "local syncs = 0; "
+ "for i = 5, #ARGV, 2 do " +
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]);" +
@ -573,69 +666,28 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "end; "
+ "end; "
+ "end; "
+ "return {added, syncs};",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
params.toArray());
RPromise<Long> result = new RedissonPromise<>();
if (atomicExecution) {
res.onComplete((r, e) -> {
if (e != null) {
result.tryFailure(new CacheException(e));
return;
}
Long added = (Long) r.get(0);
Long syncs = (Long) r.get(1);
if (syncs > 0) {
RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId));
semaphore.acquireAsync(syncs.intValue()).onComplete((obj1, ex) -> {
if (ex != null) {
result.tryFailure(new CacheException(ex));
return;
}
semaphore.deleteAsync().onComplete((obj, exc) -> {
if (exc != null) {
result.tryFailure(new CacheException(exc));
return;
}
result.trySuccess(added);
});
});
} else {
result.trySuccess(added);
}
});
} else {
res.syncUninterruptibly();
List<Object> r = res.getNow();
Long added = (Long) r.get(0);
Long syncs = (Long) r.get(1);
if (syncs > 0) {
RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId));
try {
semaphore.acquire(syncs.intValue());
semaphore.delete();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
result.trySuccess(added);
+ "return {added, syncs};";
if (entry == null) {
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST, script,
Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name), getRemovedChannelName(name), getUpdatedChannelName(name),
getCreatedSyncChannelName(name), getRemovedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)),
params.toArray());
}
return result;
return commandExecutor.evalWriteAsync(entry, codec, RedisCommands.EVAL_LIST, script,
Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name), getRemovedChannelName(name), getUpdatedChannelName(name),
getCreatedSyncChannelName(name), getRemovedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)),
params.toArray());
}
RFuture<Boolean> putValue(K key, Object value) {
double syncId = ThreadLocalRandom.current().nextDouble();
Long creationTimeout = getCreationTimeout();
Long updateTimeout = getUpdateTimeout();
RFuture<List<Object>> res = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
String name = getRawName(key);
RFuture<List<Object>> res = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST,
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[4]);" +
"local exists = redis.call('hexists', KEYS[1], ARGV[4]) == 1;" +
"if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[3]) then " +
@ -706,8 +758,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "return {1, syncs};"
+ "end; "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(),
getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name), getRemovedChannelName(name), getUpdatedChannelName(name),
getCreatedSyncChannelName(name), getRemovedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)),
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
RPromise<Boolean> result = waitSync(syncId, res);
@ -715,7 +767,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return result;
}
protected RPromise<Boolean> waitSync(double syncId, RFuture<List<Object>> res) {
RPromise<Boolean> waitSync(double syncId, RFuture<List<Object>> res) {
RPromise<Boolean> result = new RedissonPromise<>();
if (atomicExecution) {
res.onComplete((r, e) -> {
@ -802,8 +854,9 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
if (creationTimeout == 0) {
return RedissonPromise.newSucceededFuture(false);
}
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
String name = getRawName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_BOOLEAN,
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]);" +
"local exists = redis.call('hexists', KEYS[1], ARGV[2]) == 1;" +
"if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[4]) then " +
@ -825,7 +878,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "return 1;"
+ "end; "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getCreatedChannelName()),
Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name)),
creationTimeout, encodeMapKey(key), encodeMapValue(value), System.currentTimeMillis());
}
@ -855,7 +908,6 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
creationTimeout, encodeMapKey(key), encodeMapValue(value));
}
private String getLockName(Object key) {
ByteBuf keyState = encodeMapKey(key);
try {
@ -902,24 +954,28 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
long startTime = currentNanoTime();
Long accessTimeout = getAccessTimeout();
List<Object> args = new ArrayList<Object>(keys.size() + 2);
List<Object> args = new ArrayList<>(keys.size() + 2);
args.add(accessTimeout);
args.add(System.currentTimeMillis());
encode(args, keys);
RFuture<Map<K, V>> res;
encodeMapKeys(args, keys);
RFuture<Map<K, V>> res = getAllOperation(commandExecutor, getRawName(), null, new ArrayList<>(keys), accessTimeout, args);
return handleGetAllResult(startTime, res);
}
RFuture<Map<K, V>> getAllOperation(CommandAsyncExecutor commandExecutor, String name, MasterSlaveEntry entry, List<Object> keys, Long accessTimeout, List<Object> args) {
String script;
if (accessTimeout == -1) {
res = commandExecutor.evalReadAsync(getRawName(), codec, new RedisCommand<Map<Object, Object>>("EVAL",
new MapValueDecoder(new MapGetAllDecoder(new ArrayList<Object>(keys), 0, true))),
"local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');"
script = "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');"
+ "local accessTimeout = ARGV[1]; "
+ "local currentTime = tonumber(ARGV[2]); "
+ "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= currentTime; "
+ "local map = {};"
+ "for i=3, #ARGV, 5000 do "
+ "local m = redis.call('hmget', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); "
+ "for k,v in ipairs(m) do "
+ "table.insert(map, v) "
+ "for k,v in ipairs(m) do "
+ "table.insert(map, v) "
+ "end; "
+ "end; "
+ "local result = {};"
@ -937,20 +993,17 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "table.insert(result, value); "
+ "end; "
+ "return result;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), args.toArray());
+ "return result;";
} else {
res = commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand<Map<Object, Object>>("EVAL",
new MapValueDecoder(new MapGetAllDecoder(new ArrayList<Object>(keys), 0, true))),
"local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');"
script = "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');"
+ "local accessTimeout = ARGV[1]; "
+ "local currentTime = tonumber(ARGV[2]); "
+ "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= currentTime; "
+ "local map = {};"
+ "for i=3, #ARGV, 5000 do "
+ "local m = redis.call('hmget', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); "
+ "for k,v in ipairs(m) do "
+ "table.insert(map, v) "
+ "for k,v in ipairs(m) do "
+ "table.insert(map, v) "
+ "end; "
+ "end; "
@ -965,31 +1018,41 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "value = false; "
+ "end; "
+ "end; "
+ "if accessTimeout == '0' then "
+ "redis.call('hdel', KEYS[1], key); "
+ "redis.call('zrem', KEYS[2], key); "
+ "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value); "
+ "redis.call('publish', KEYS[3], {key, value}); "
+ "elseif accessTimeout ~= '-1' then "
+ "elseif accessTimeout ~= '-1' then "
+ "redis.call('zadd', KEYS[2], accessTimeout, key); "
+ "end; "
+ "end; "
+ "table.insert(result, value); "
+ "end; "
+ "return result;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), args.toArray());
+ "return result;";
}
if (entry == null) {
return commandExecutor.evalReadAsync(name, codec, new RedisCommand<Map<Object, Object>>("EVAL",
new MapValueDecoder(new MapGetAllDecoder(keys, 0, true))),
script, Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)), args.toArray());
}
return commandExecutor.evalReadAsync(entry, codec, new RedisCommand<Map<Object, Object>>("EVAL",
new MapValueDecoder(new MapGetAllDecoder(keys, 0, true))),
script, Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)), args.toArray());
}
RPromise<Map<K, V>> handleGetAllResult(long startTime, RFuture<Map<K, V>> res) {
RPromise<Map<K, V>> result = new RedissonPromise<>();
res.onComplete((r, ex) -> {
Map<K, V> map = r.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
cacheManager.getStatBean(this).addHits(map.size());
int nullValues = r.size() - map.size();
if (config.isReadThrough() && nullValues > 0) {
cacheManager.getStatBean(this).addMisses(nullValues);
@ -1030,7 +1093,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
checkNotClosed();
checkKey(key);
RFuture<Boolean> future = commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
String name = getRawName(key);
RFuture<Boolean> future = commandExecutor.evalReadAsync(name, codec, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 0 then "
+ "return 0;"
+ "end;"
@ -1040,7 +1104,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "return 0; "
+ "end; "
+ "return 1;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName()),
Arrays.<Object>asList(name, getTimeoutSetName(name)),
System.currentTimeMillis(), encodeMapKey(key));
return future;
}
@ -1251,30 +1315,41 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RFuture<Void> future = putAsync(key, value);
future.syncUninterruptibly();
}
RFuture<Long> removeValues(Object... keys) {
List<Object> params = new ArrayList<Object>(keys.length+1);
List<Object> params = new ArrayList<>(keys.length + 1);
params.add(System.currentTimeMillis());
encodeMapKeys(params, Arrays.asList(keys));
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG,
"local counter = 0;"
+ "for i=2, #ARGV do "
+ "local value = redis.call('hget', KEYS[1], ARGV[i]); "
+ "if value ~= false then "
+ "redis.call('hdel', KEYS[1], ARGV[i]); "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]); "
+ "if not (expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1])) then "
+ "counter = counter + 1;"
+ "end; "
+ "redis.call('zrem', KEYS[2], ARGV[i]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "end;"
+ "end; "
+ "return counter;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()),
return removeValuesOperation(commandExecutor, getRawName(), null, params);
}
RFuture<Long> removeValuesOperation(CommandAsyncExecutor commandExecutor, String name, MasterSlaveEntry entry, List<Object> params) {
String script = "local counter = 0;"
+ "for i=2, #ARGV do "
+ "local value = redis.call('hget', KEYS[1], ARGV[i]); "
+ "if value ~= false then "
+ "redis.call('hdel', KEYS[1], ARGV[i]); "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]); "
+ "if not (expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1])) then "
+ "counter = counter + 1;"
+ "end; "
+ "redis.call('zrem', KEYS[2], ARGV[i]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "end;"
+ "end; "
+ "return counter;";
if (entry == null) {
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LONG, script,
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)),
params.toArray());
}
return commandExecutor.evalWriteAsync(entry, codec, RedisCommands.EVAL_LONG, script,
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)),
params.toArray());
}
@ -1368,7 +1443,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
double syncId = ThreadLocalRandom.current().nextDouble();
RPromise<List<Object>> result = new RedissonPromise<>();
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
String name = getRawName(key);
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST,
"local value = redis.call('hget', KEYS[1], ARGV[4]);"
+ "if value ~= false then "
+ "if ARGV[2] == '0' then "
@ -1429,8 +1505,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "return {1, syncs};"
+ "end; "
+ "end; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(),
getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()),
Arrays.<Object>asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getCreatedChannelName(name), getUpdatedChannelName(name),
getRemovedSyncChannelName(name), getCreatedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)),
creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId);
future.onComplete((r, e) -> {
@ -1580,7 +1656,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RPromise<Void> result = new RedissonPromise<>();
Runnable r = () -> {
Map<K, Cache.Entry<? extends K, ? extends V>> addedEntries = new HashMap<>();
Map<K, Entry<? extends K, ? extends V>> addedEntries = new HashMap<>();
for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
addedEntries.put(entry.getKey(), new JCacheEntry<K, V>(entry.getKey(), entry.getValue()));
}
@ -1708,7 +1784,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return result;
}
protected void handleException(RPromise<?> result, Exception e) {
void handleException(RPromise<?> result, Exception e) {
if (e instanceof CacheWriterException) {
result.tryFailure(e);
}
@ -1717,8 +1793,9 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RFuture<Boolean> removeValue(K key) {
double syncId = ThreadLocalRandom.current().nextDouble();
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST,
String name = getRawName(key);
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST,
"local value = redis.call('hexists', KEYS[1], ARGV[2]); "
+ "if value == 0 then "
+ "return {0}; "
@ -1737,7 +1814,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[2]), ARGV[2], string.len(tostring(value)), tostring(value), ARGV[3]); "
+ "local syncs = redis.call('publish', KEYS[4], syncMsg); "
+ "return {1, syncs};",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()),
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)),
System.currentTimeMillis(), encodeMapKey(key), syncId);
RPromise<Boolean> result = waitSync(syncId, future);
@ -1870,8 +1947,9 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RFuture<Boolean> removeValue(K key, V value) {
Long accessTimeout = getAccessTimeout();
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
String name = getRawName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return 0; "
@ -1899,7 +1977,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); "
+ "end; "
+ "return 0; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()),
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)),
accessTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@ -2012,7 +2090,6 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RFuture<Map<K, V>> getAndRemoveValues(Collection<K> keys) {
double syncId = ThreadLocalRandom.current().nextDouble();
RPromise<Map<K, V>> result = new RedissonPromise<>();
List<Object> params = new ArrayList<>();
params.add(System.currentTimeMillis());
@ -2021,45 +2098,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
for (K key : keys) {
params.add(encodeMapKey(key));
}
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
"local syncs = 0; "
+ "local values = {}; "
+ "local result = {}; "
+ "local nulls = {}; "
+ "for i = 3, #ARGV, 1 do "
+ "local value = redis.call('hget', KEYS[1], ARGV[i]); "
+ "if value == false then "
+ "table.insert(nulls, i-3); "
+ "else "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]); "
+ "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1]) then "
+ "table.insert(nulls, i-3); "
+ "else "
+ "redis.call('hdel', KEYS[1], ARGV[i]); "
+ "redis.call('zrem', KEYS[2], ARGV[i]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value), ARGV[2]); "
+ "syncs = syncs + redis.call('publish', KEYS[4], syncMsg); "
+ "table.insert(values, value); "
+ "end; "
+ "end; "
+ "end; "
+ "table.insert(result, syncs); "
+ "table.insert(result, #nulls); "
+ "for i = 1, #nulls, 1 do "
+ "table.insert(result, nulls[i]); "
+ "end; "
+ "for i = 1, #values, 1 do "
+ "table.insert(result, values[i]); "
+ "end; "
+ "return result; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()),
params.toArray());
RFuture<List<Object>> future = getAndRemoveValuesOperation(commandExecutor, null, getRawName(), params);
RPromise<Map<K, V>> result = new RedissonPromise<>();
if (atomicExecution) {
future.onComplete((r, exc1) -> {
if (exc1 != null) {
@ -2072,7 +2114,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
result.trySuccess(Collections.emptyMap());
return;
}
long syncs = (long) r.get(0);
if (syncs > 0) {
RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId));
@ -2086,7 +2128,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
result.tryFailure(exc);
return;
}
getAndRemoveValuesResult(keys, result, r, nullsAmount);
});
});
@ -2096,15 +2138,15 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
});
} else {
future.syncUninterruptibly();
List<Object> r = future.getNow();
long nullsAmount = (long) r.get(1);
if (nullsAmount == keys.size()) {
result.trySuccess(Collections.emptyMap());
return result;
}
long syncs = (long) r.get(0);
if (syncs > 0) {
RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId));
@ -2115,33 +2157,87 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
Thread.currentThread().interrupt();
}
}
getAndRemoveValuesResult(keys, result, r, nullsAmount);
}
return result;
}
RFuture<List<Object>> getAndRemoveValuesOperation(CommandAsyncExecutor commandExecutor, MasterSlaveEntry entry, String name, List<Object> params) {
String script = "local syncs = 0; "
+ "local values = {}; "
+ "local result = {}; "
+ "local nulls = {}; "
+ "for i = 3, #ARGV, 1 do "
+ "local value = redis.call('hget', KEYS[1], ARGV[i]); "
+ "if value == false then "
+ "table.insert(nulls, i-3); "
+ "else "
+ "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]); "
+ "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1]) then "
+ "table.insert(nulls, i-3); "
+ "else "
+ "redis.call('hdel', KEYS[1], ARGV[i]); "
+ "redis.call('zrem', KEYS[2], ARGV[i]); "
+ "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); "
+ "redis.call('publish', KEYS[3], msg); "
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value), ARGV[2]); "
+ "syncs = syncs + redis.call('publish', KEYS[4], syncMsg); "
+ "table.insert(values, value); "
+ "end; "
+ "end; "
+ "end; "
+ "table.insert(result, syncs); "
+ "table.insert(result, #nulls); "
+ "for i = 1, #nulls, 1 do "
+ "table.insert(result, nulls[i]); "
+ "end; "
+ "for i = 1, #values, 1 do "
+ "table.insert(result, values[i]); "
+ "end; "
+ "return result; ";
if (entry == null) {
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE_LIST, script,
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)),
params.toArray());
}
return commandExecutor.evalWriteAsync(entry, codec, RedisCommands.EVAL_MAP_VALUE_LIST, script,
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)),
params.toArray());
}
private void getAndRemoveValuesResult(Collection<K> keys, RPromise<Map<K, V>> result, List<Object> r,
long nullsAmount) {
HashSet<Long> nullIndexes = new HashSet<>((List<Long>) (Object) r.subList(2, (int) nullsAmount + 2));
long nullsAmount) {
Map<K, V> res = new HashMap<>();
fillMap(keys, r, res, nullsAmount, 0);
result.trySuccess(res);
}
void fillMap(Collection<K> keys, List<Object> r, Map<K, V> res, long nullsAmount, int baseIndex) {
List<Long> list = (List<Long>) (Object) r.subList(baseIndex + 2, baseIndex + (int) nullsAmount + 2);
HashSet<Long> nullIndexes = new HashSet<>(list);
long i = 0;
for (K key : keys) {
if (nullIndexes.contains(i)) {
continue;
}
V value = (V) r.get((int) (i+nullsAmount+2));
V value = (V) r.get((int) (baseIndex + i + nullsAmount + 2));
res.put(key, value);
i++;
}
result.trySuccess(res);
}
RFuture<V> getAndRemoveValue(K key) {
double syncId = ThreadLocalRandom.current().nextDouble();
RPromise<V> result = new RedissonPromise<>();
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
String name = getRawName(key);
RFuture<List<Object>> future = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE_LIST,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
+ "return {nil}; "
@ -2159,7 +2255,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[2]), ARGV[2], string.len(tostring(value)), tostring(value), ARGV[3]); "
+ "local syncs = redis.call('publish', KEYS[4], syncMsg); "
+ "return {value, syncs}; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()),
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)),
System.currentTimeMillis(), encodeMapKey(key), syncId);
if (atomicExecution) {
@ -2380,7 +2476,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
Long updateTimeout = getUpdateTimeout();
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG,
String name = getRawName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LONG,
"local value = redis.call('hget', KEYS[1], ARGV[4]); "
+ "if value == false then "
+ "return 0; "
@ -2432,7 +2529,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "return 0;"
+ "end; "
+ "return -1; ",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()),
Arrays.<Object>asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getOldValueListenerCounter(name)),
accessTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue));
}
@ -2578,7 +2675,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RFuture<Boolean> replaceValue(K key, V value) {
Long updateTimeout = getUpdateTimeout();
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN,
String name = getRawName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return 0; "
@ -2617,7 +2715,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return 1;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()),
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getOldValueListenerCounter(name)),
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@ -2625,7 +2723,8 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RFuture<V> getAndReplaceValue(K key, V value) {
Long updateTimeout = getUpdateTimeout();
return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE,
String name = getRawName(key);
return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE,
"local value = redis.call('hget', KEYS[1], ARGV[3]); "
+ "if value == false then "
+ "return nil; "
@ -2664,7 +2763,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
+ "redis.call('publish', KEYS[4], msg); "
+ "end; "
+ "return value;",
Arrays.<Object>asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()),
Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getOldValueListenerCounter(name)),
updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value));
}
@ -2952,7 +3051,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
}
protected Iterator<K> keyIterator() {
Iterator<K> keyIterator() {
return new RedissonBaseMapIterator<K>() {
@Override
protected K getValue(Map.Entry<Object, Object> entry) {
@ -2960,18 +3059,18 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
@Override
protected void remove(java.util.Map.Entry<Object, Object> value) {
protected void remove(Map.Entry<Object, Object> value) {
throw new UnsupportedOperationException();
}
@Override
protected Object put(java.util.Map.Entry<Object, Object> entry, Object value) {
protected Object put(Map.Entry<Object, Object> entry, Object value) {
throw new UnsupportedOperationException();
}
@Override
protected ScanResult<java.util.Map.Entry<Object, Object>> iterator(RedisClient client,
long nextIterPos) {
protected ScanResult<Map.Entry<Object, Object>> iterator(RedisClient client,
long nextIterPos) {
return JCache.this.scanIterator(JCache.this.getRawName(), client, nextIterPos);
}
};
@ -3301,11 +3400,11 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
@Override
public Iterator<javax.cache.Cache.Entry<K, V>> iterator() {
public Iterator<Entry<K, V>> iterator() {
checkNotClosed();
return new RedissonBaseMapIterator<javax.cache.Cache.Entry<K, V>>() {
return new RedissonBaseMapIterator<Entry<K, V>>() {
@Override
protected Cache.Entry<K, V> getValue(Map.Entry<Object, Object> entry) {
protected Entry<K, V> getValue(Map.Entry<Object, Object> entry) {
cacheManager.getStatBean(JCache.this).addHits(1);
Long accessTimeout = getAccessTimeout();
JCacheEntry<K, V> je = new JCacheEntry<K, V>((K) entry.getKey(), (V) entry.getValue());
@ -3323,15 +3422,15 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
}
@Override
protected Object put(java.util.Map.Entry<Object, Object> entry, Object value) {
protected Object put(Map.Entry<Object, Object> entry, Object value) {
throw new UnsupportedOperationException();
}
@Override
protected ScanResult<java.util.Map.Entry<Object, Object>> iterator(RedisClient client,
long nextIterPos) {
protected ScanResult<Map.Entry<Object, Object>> iterator(RedisClient client,
long nextIterPos) {
return JCache.this.scanIterator(JCache.this.getRawName(), client, nextIterPos);
}

Loading…
Cancel
Save