From 82f374c90185d9b90cdfbb7e0c5b2acccb81ce94 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 28 Jun 2021 10:05:22 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/jcache/JCache.java | 505 +++++++++++------- 1 file changed, 302 insertions(+), 203 deletions(-) diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 58d395dd7..fc535508e 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -27,6 +27,8 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapValueDecoder; import org.redisson.codec.BaseEventCodec; +import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.iterator.RedissonBaseMapIterator; import org.redisson.jcache.JMutableEntry.Action; @@ -124,7 +126,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs String getTimeoutSetName() { return "jcache_timeout_set:{" + getRawName() + "}"; } - + + String getTimeoutSetName(String name) { + return prefixName("jcache_timeout_set", name); + } + String getSyncName(Object syncId) { return "jcache_sync:" + syncId + ":{" + getRawName() + "}"; } @@ -133,26 +139,50 @@ public class JCache extends RedissonObject implements Cache, CacheAs return "jcache_created_sync_channel:{" + getRawName() + "}"; } + String getCreatedSyncChannelName(String name) { + return prefixName("jcache_created_sync_channel", name); + } + String getUpdatedSyncChannelName() { return "jcache_updated_sync_channel:{" + getRawName() + "}"; } + String getUpdatedSyncChannelName(String name) { + return prefixName("jcache_updated_sync_channel", name); + } + String getRemovedSyncChannelName() { return "jcache_removed_sync_channel:{" + getRawName() + "}"; } - + + String getRemovedSyncChannelName(String name) { + return prefixName("jcache_removed_sync_channel", name); + } + String getCreatedChannelName() { return "jcache_created_channel:{" + getRawName() + "}"; } - + + String getCreatedChannelName(String name) { + return prefixName("jcache_created_channel", name); + } + String getUpdatedChannelName() { return "jcache_updated_channel:{" + getRawName() + "}"; } + String getUpdatedChannelName(String name) { + return prefixName("jcache_updated_channel", name); + } + String getExpiredChannelName() { return "jcache_expired_channel:{" + getRawName() + "}"; } + String getRemovedChannelName(String name) { + return prefixName("jcache_removed_channel", name); + } + String getRemovedChannelName() { return "jcache_removed_channel:{" + getRawName() + "}"; } @@ -161,6 +191,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs return "jcache_old_value_listeners:{" + getRawName() + "}"; } + String getOldValueListenerCounter(String name) { + return prefixName("jcache_old_value_listeners", name); + } + long currentNanoTime() { if (config.isStatisticsEnabled()) { return System.nanoTime(); @@ -168,7 +202,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs return 0; } - protected void checkKey(Object key) { + void checkKey(Object key) { if (key == null) { throw new NullPointerException(); } @@ -286,7 +320,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs Long accessTimeout = getAccessTimeout(); if (accessTimeout == -1) { - return commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE, + String name = getRawName(key); + return commandExecutor.evalReadAsync(name, codec, RedisCommands.EVAL_MAP_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return nil; " @@ -298,11 +333,12 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; " + "return value; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)), accessTimeout, System.currentTimeMillis(), encodeMapKey(key)); } - - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE, + + String name = getRawName(key); + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return nil; " @@ -323,7 +359,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; " + "return value; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)), accessTimeout, System.currentTimeMillis(), encodeMapKey(key)); } @@ -499,9 +535,66 @@ public class JCache extends RedissonObject implements Cache, CacheAs params.add(encodeMapKey(entry.getKey())); params.add(encodeMapValue(entry.getValue())); } - - RFuture> res = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST, - "local added = 0; " + + RFuture> res = putAllOperation(commandExecutor, null, getRawName(), params); + + RFuture result = handlePutAllResult(syncId, res); + return result; + } + + RFuture handlePutAllResult(double syncId, RFuture> res) { + RPromise result = new RedissonPromise<>(); + if (atomicExecution) { + res.onComplete((r, e) -> { + if (e != null) { + result.tryFailure(new CacheException(e)); + return; + } + + Long added = (Long) r.get(0); + Long syncs = (Long) r.get(1); + if (syncs > 0) { + RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); + semaphore.acquireAsync(syncs.intValue()).onComplete((obj1, ex) -> { + if (ex != null) { + result.tryFailure(new CacheException(ex)); + return; + } + semaphore.deleteAsync().onComplete((obj, exc) -> { + if (exc != null) { + result.tryFailure(new CacheException(exc)); + return; + } + result.trySuccess(added); + }); + }); + } else { + result.trySuccess(added); + } + }); + } else { + res.syncUninterruptibly(); + + List r = res.getNow(); + Long added = (Long) r.get(0); + Long syncs = (Long) r.get(1); + if (syncs > 0) { + RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); + try { + semaphore.acquire(syncs.intValue()); + semaphore.delete(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + result.trySuccess(added); + } + return result; + } + + RFuture> putAllOperation(CommandAsyncExecutor commandExecutor, MasterSlaveEntry entry, String name, List params) { + String script = "local added = 0; " + "local syncs = 0; " + "for i = 5, #ARGV, 2 do " + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]);" + @@ -573,69 +666,28 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; " + "end; " + "end; " - + "return {added, syncs};", - Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), - getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), - params.toArray()); - - RPromise result = new RedissonPromise<>(); - if (atomicExecution) { - res.onComplete((r, e) -> { - if (e != null) { - result.tryFailure(new CacheException(e)); - return; - } - - Long added = (Long) r.get(0); - Long syncs = (Long) r.get(1); - if (syncs > 0) { - RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); - semaphore.acquireAsync(syncs.intValue()).onComplete((obj1, ex) -> { - if (ex != null) { - result.tryFailure(new CacheException(ex)); - return; - } - semaphore.deleteAsync().onComplete((obj, exc) -> { - if (exc != null) { - result.tryFailure(new CacheException(exc)); - return; - } - result.trySuccess(added); - }); - }); - } else { - result.trySuccess(added); - } - }); - } else { - res.syncUninterruptibly(); - - List r = res.getNow(); - Long added = (Long) r.get(0); - Long syncs = (Long) r.get(1); - if (syncs > 0) { - RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); - try { - semaphore.acquire(syncs.intValue()); - semaphore.delete(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - result.trySuccess(added); + + "return {added, syncs};"; + + if (entry == null) { + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST, script, + Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name), getRemovedChannelName(name), getUpdatedChannelName(name), + getCreatedSyncChannelName(name), getRemovedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)), + params.toArray()); } - - return result; - + + return commandExecutor.evalWriteAsync(entry, codec, RedisCommands.EVAL_LIST, script, + Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name), getRemovedChannelName(name), getUpdatedChannelName(name), + getCreatedSyncChannelName(name), getRemovedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)), + params.toArray()); } - + RFuture putValue(K key, Object value) { double syncId = ThreadLocalRandom.current().nextDouble(); Long creationTimeout = getCreationTimeout(); Long updateTimeout = getUpdateTimeout(); - - RFuture> res = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST, + + String name = getRawName(key); + RFuture> res = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST, "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[4]);" + "local exists = redis.call('hexists', KEYS[1], ARGV[4]) == 1;" + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[3]) then " + @@ -706,8 +758,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return {1, syncs};" + "end; " + "end; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), - getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), + Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name), getRemovedChannelName(name), getUpdatedChannelName(name), + getCreatedSyncChannelName(name), getRemovedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)), creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); RPromise result = waitSync(syncId, res); @@ -715,7 +767,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs return result; } - protected RPromise waitSync(double syncId, RFuture> res) { + RPromise waitSync(double syncId, RFuture> res) { RPromise result = new RedissonPromise<>(); if (atomicExecution) { res.onComplete((r, e) -> { @@ -802,8 +854,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (creationTimeout == 0) { return RedissonPromise.newSucceededFuture(false); } - - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + + String name = getRawName(key); + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_BOOLEAN, "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]);" + "local exists = redis.call('hexists', KEYS[1], ARGV[2]) == 1;" + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[4]) then " + @@ -825,7 +878,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return 1;" + "end; " + "end; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName()), + Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name)), creationTimeout, encodeMapKey(key), encodeMapValue(value), System.currentTimeMillis()); } @@ -855,7 +908,6 @@ public class JCache extends RedissonObject implements Cache, CacheAs creationTimeout, encodeMapKey(key), encodeMapValue(value)); } - private String getLockName(Object key) { ByteBuf keyState = encodeMapKey(key); try { @@ -902,24 +954,28 @@ public class JCache extends RedissonObject implements Cache, CacheAs long startTime = currentNanoTime(); Long accessTimeout = getAccessTimeout(); - List args = new ArrayList(keys.size() + 2); + List args = new ArrayList<>(keys.size() + 2); args.add(accessTimeout); args.add(System.currentTimeMillis()); - encode(args, keys); - - RFuture> res; + encodeMapKeys(args, keys); + + RFuture> res = getAllOperation(commandExecutor, getRawName(), null, new ArrayList<>(keys), accessTimeout, args); + + return handleGetAllResult(startTime, res); + } + + RFuture> getAllOperation(CommandAsyncExecutor commandExecutor, String name, MasterSlaveEntry entry, List keys, Long accessTimeout, List args) { + String script; if (accessTimeout == -1) { - res = commandExecutor.evalReadAsync(getRawName(), codec, new RedisCommand>("EVAL", - new MapValueDecoder(new MapGetAllDecoder(new ArrayList(keys), 0, true))), - "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" + script = "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" + "local accessTimeout = ARGV[1]; " + "local currentTime = tonumber(ARGV[2]); " + "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= currentTime; " + "local map = {};" + "for i=3, #ARGV, 5000 do " + "local m = redis.call('hmget', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); " - + "for k,v in ipairs(m) do " - + "table.insert(map, v) " + + "for k,v in ipairs(m) do " + + "table.insert(map, v) " + "end; " + "end; " + "local result = {};" @@ -937,20 +993,17 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "table.insert(result, value); " + "end; " - + "return result;", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), args.toArray()); + + "return result;"; } else { - res = commandExecutor.evalWriteAsync(getRawName(), codec, new RedisCommand>("EVAL", - new MapValueDecoder(new MapGetAllDecoder(new ArrayList(keys), 0, true))), - "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" + script = "local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" + "local accessTimeout = ARGV[1]; " + "local currentTime = tonumber(ARGV[2]); " + "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= currentTime; " + "local map = {};" + "for i=3, #ARGV, 5000 do " + "local m = redis.call('hmget', KEYS[1], unpack(ARGV, i, math.min(i+4999, #ARGV))); " - + "for k,v in ipairs(m) do " - + "table.insert(map, v) " + + "for k,v in ipairs(m) do " + + "table.insert(map, v) " + "end; " + "end; " @@ -965,31 +1018,41 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "value = false; " + "end; " + "end; " - + + "if accessTimeout == '0' then " + "redis.call('hdel', KEYS[1], key); " + "redis.call('zrem', KEYS[2], key); " + "local msg = struct.pack('Lc0Lc0', string.len(key), key, string.len(value), value); " + "redis.call('publish', KEYS[3], {key, value}); " - + "elseif accessTimeout ~= '-1' then " + + "elseif accessTimeout ~= '-1' then " + "redis.call('zadd', KEYS[2], accessTimeout, key); " + "end; " + "end; " + "table.insert(result, value); " + "end; " - + "return result;", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), args.toArray()); + + "return result;"; + } + + if (entry == null) { + return commandExecutor.evalReadAsync(name, codec, new RedisCommand>("EVAL", + new MapValueDecoder(new MapGetAllDecoder(keys, 0, true))), + script, Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)), args.toArray()); } + return commandExecutor.evalReadAsync(entry, codec, new RedisCommand>("EVAL", + new MapValueDecoder(new MapGetAllDecoder(keys, 0, true))), + script, Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)), args.toArray()); + } + RPromise> handleGetAllResult(long startTime, RFuture> res) { RPromise> result = new RedissonPromise<>(); res.onComplete((r, ex) -> { Map map = r.entrySet().stream() .filter(e -> e.getValue() != null) .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue())); - + cacheManager.getStatBean(this).addHits(map.size()); - + int nullValues = r.size() - map.size(); if (config.isReadThrough() && nullValues > 0) { cacheManager.getStatBean(this).addMisses(nullValues); @@ -1030,7 +1093,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs checkNotClosed(); checkKey(key); - RFuture future = commandExecutor.evalReadAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + String name = getRawName(key); + RFuture future = commandExecutor.evalReadAsync(name, codec, RedisCommands.EVAL_BOOLEAN, "if redis.call('hexists', KEYS[1], ARGV[2]) == 0 then " + "return 0;" + "end;" @@ -1040,7 +1104,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return 0; " + "end; " + "return 1;", - Arrays.asList(getRawName(), getTimeoutSetName()), + Arrays.asList(name, getTimeoutSetName(name)), System.currentTimeMillis(), encodeMapKey(key)); return future; } @@ -1251,30 +1315,41 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture future = putAsync(key, value); future.syncUninterruptibly(); } - + RFuture removeValues(Object... keys) { - List params = new ArrayList(keys.length+1); + List params = new ArrayList<>(keys.length + 1); params.add(System.currentTimeMillis()); encodeMapKeys(params, Arrays.asList(keys)); - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG, - "local counter = 0;" - + "for i=2, #ARGV do " - + "local value = redis.call('hget', KEYS[1], ARGV[i]); " - + "if value ~= false then " - + "redis.call('hdel', KEYS[1], ARGV[i]); " - - + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]); " - + "if not (expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1])) then " - + "counter = counter + 1;" - + "end; " - - + "redis.call('zrem', KEYS[2], ARGV[i]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); " - + "redis.call('publish', KEYS[3], msg); " - + "end;" - + "end; " - + "return counter;", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), + return removeValuesOperation(commandExecutor, getRawName(), null, params); + } + + RFuture removeValuesOperation(CommandAsyncExecutor commandExecutor, String name, MasterSlaveEntry entry, List params) { + String script = "local counter = 0;" + + "for i=2, #ARGV do " + + "local value = redis.call('hget', KEYS[1], ARGV[i]); " + + "if value ~= false then " + + "redis.call('hdel', KEYS[1], ARGV[i]); " + + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]); " + + "if not (expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1])) then " + + "counter = counter + 1;" + + "end; " + + + "redis.call('zrem', KEYS[2], ARGV[i]); " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); " + + "redis.call('publish', KEYS[3], msg); " + + "end;" + + "end; " + + "return counter;"; + + if (entry == null) { + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LONG, script, + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)), + params.toArray()); + } + + return commandExecutor.evalWriteAsync(entry, codec, RedisCommands.EVAL_LONG, script, + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)), params.toArray()); } @@ -1368,7 +1443,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs double syncId = ThreadLocalRandom.current().nextDouble(); RPromise> result = new RedissonPromise<>(); - RFuture> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST, + String name = getRawName(key); + RFuture> future = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST, "local value = redis.call('hget', KEYS[1], ARGV[4]);" + "if value ~= false then " + "if ARGV[2] == '0' then " @@ -1429,8 +1505,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return {1, syncs};" + "end; " + "end; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(), - getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getCreatedChannelName(name), getUpdatedChannelName(name), + getRemovedSyncChannelName(name), getCreatedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)), creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); future.onComplete((r, e) -> { @@ -1580,7 +1656,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs RPromise result = new RedissonPromise<>(); Runnable r = () -> { - Map> addedEntries = new HashMap<>(); + Map> addedEntries = new HashMap<>(); for (Map.Entry entry : map.entrySet()) { addedEntries.put(entry.getKey(), new JCacheEntry(entry.getKey(), entry.getValue())); } @@ -1708,7 +1784,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs return result; } - protected void handleException(RPromise result, Exception e) { + void handleException(RPromise result, Exception e) { if (e instanceof CacheWriterException) { result.tryFailure(e); } @@ -1717,8 +1793,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture removeValue(K key) { double syncId = ThreadLocalRandom.current().nextDouble(); - - RFuture> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LIST, + + String name = getRawName(key); + RFuture> future = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST, "local value = redis.call('hexists', KEYS[1], ARGV[2]); " + "if value == 0 then " + "return {0}; " @@ -1737,7 +1814,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[2]), ARGV[2], string.len(tostring(value)), tostring(value), ARGV[3]); " + "local syncs = redis.call('publish', KEYS[4], syncMsg); " + "return {1, syncs};", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)), System.currentTimeMillis(), encodeMapKey(key), syncId); RPromise result = waitSync(syncId, future); @@ -1870,8 +1947,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture removeValue(K key, V value) { Long accessTimeout = getAccessTimeout(); - - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + + String name = getRawName(key); + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return 0; " @@ -1899,7 +1977,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + "end; " + "return 0; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)), accessTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } @@ -2012,7 +2090,6 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture> getAndRemoveValues(Collection keys) { double syncId = ThreadLocalRandom.current().nextDouble(); - RPromise> result = new RedissonPromise<>(); List params = new ArrayList<>(); params.add(System.currentTimeMillis()); @@ -2021,45 +2098,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs for (K key : keys) { params.add(encodeMapKey(key)); } - - RFuture> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST, - "local syncs = 0; " - + "local values = {}; " - + "local result = {}; " - + "local nulls = {}; " - - + "for i = 3, #ARGV, 1 do " - + "local value = redis.call('hget', KEYS[1], ARGV[i]); " - + "if value == false then " - + "table.insert(nulls, i-3); " - + "else " - + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]); " - + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1]) then " - + "table.insert(nulls, i-3); " - + "else " - + "redis.call('hdel', KEYS[1], ARGV[i]); " - + "redis.call('zrem', KEYS[2], ARGV[i]); " - + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); " - + "redis.call('publish', KEYS[3], msg); " - + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value), ARGV[2]); " - + "syncs = syncs + redis.call('publish', KEYS[4], syncMsg); " - + "table.insert(values, value); " - + "end; " - + "end; " - + "end; " - - + "table.insert(result, syncs); " - + "table.insert(result, #nulls); " - + "for i = 1, #nulls, 1 do " - + "table.insert(result, nulls[i]); " - + "end; " - + "for i = 1, #values, 1 do " - + "table.insert(result, values[i]); " - + "end; " - + "return result; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), - params.toArray()); - + + RFuture> future = getAndRemoveValuesOperation(commandExecutor, null, getRawName(), params); + + RPromise> result = new RedissonPromise<>(); if (atomicExecution) { future.onComplete((r, exc1) -> { if (exc1 != null) { @@ -2072,7 +2114,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.trySuccess(Collections.emptyMap()); return; } - + long syncs = (long) r.get(0); if (syncs > 0) { RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); @@ -2086,7 +2128,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(exc); return; } - + getAndRemoveValuesResult(keys, result, r, nullsAmount); }); }); @@ -2096,15 +2138,15 @@ public class JCache extends RedissonObject implements Cache, CacheAs }); } else { future.syncUninterruptibly(); - + List r = future.getNow(); - + long nullsAmount = (long) r.get(1); if (nullsAmount == keys.size()) { result.trySuccess(Collections.emptyMap()); return result; } - + long syncs = (long) r.get(0); if (syncs > 0) { RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); @@ -2115,33 +2157,87 @@ public class JCache extends RedissonObject implements Cache, CacheAs Thread.currentThread().interrupt(); } } - + getAndRemoveValuesResult(keys, result, r, nullsAmount); } - + return result; } + RFuture> getAndRemoveValuesOperation(CommandAsyncExecutor commandExecutor, MasterSlaveEntry entry, String name, List params) { + String script = "local syncs = 0; " + + "local values = {}; " + + "local result = {}; " + + "local nulls = {}; " + + + "for i = 3, #ARGV, 1 do " + + "local value = redis.call('hget', KEYS[1], ARGV[i]); " + + "if value == false then " + + "table.insert(nulls, i-3); " + + "else " + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[i]); " + + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1]) then " + + "table.insert(nulls, i-3); " + + "else " + + "redis.call('hdel', KEYS[1], ARGV[i]); " + + "redis.call('zrem', KEYS[2], ARGV[i]); " + + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value)); " + + "redis.call('publish', KEYS[3], msg); " + + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[i]), ARGV[i], string.len(tostring(value)), tostring(value), ARGV[2]); " + + "syncs = syncs + redis.call('publish', KEYS[4], syncMsg); " + + "table.insert(values, value); " + + "end; " + + "end; " + + "end; " + + + "table.insert(result, syncs); " + + "table.insert(result, #nulls); " + + "for i = 1, #nulls, 1 do " + + "table.insert(result, nulls[i]); " + + "end; " + + "for i = 1, #values, 1 do " + + "table.insert(result, values[i]); " + + "end; " + + "return result; "; + + if (entry == null) { + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE_LIST, script, + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)), + params.toArray()); + } + + return commandExecutor.evalWriteAsync(entry, codec, RedisCommands.EVAL_MAP_VALUE_LIST, script, + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)), + params.toArray()); + } + private void getAndRemoveValuesResult(Collection keys, RPromise> result, List r, - long nullsAmount) { - HashSet nullIndexes = new HashSet<>((List) (Object) r.subList(2, (int) nullsAmount + 2)); + long nullsAmount) { Map res = new HashMap<>(); + fillMap(keys, r, res, nullsAmount, 0); + result.trySuccess(res); + } + + void fillMap(Collection keys, List r, Map res, long nullsAmount, int baseIndex) { + List list = (List) (Object) r.subList(baseIndex + 2, baseIndex + (int) nullsAmount + 2); + HashSet nullIndexes = new HashSet<>(list); long i = 0; for (K key : keys) { if (nullIndexes.contains(i)) { continue; } - V value = (V) r.get((int) (i+nullsAmount+2)); + V value = (V) r.get((int) (baseIndex + i + nullsAmount + 2)); res.put(key, value); i++; } - result.trySuccess(res); } - + RFuture getAndRemoveValue(K key) { double syncId = ThreadLocalRandom.current().nextDouble(); RPromise result = new RedissonPromise<>(); - RFuture> future = commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST, + + String name = getRawName(key); + RFuture> future = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE_LIST, "local value = redis.call('hget', KEYS[1], ARGV[2]); " + "if value == false then " + "return {nil}; " @@ -2159,7 +2255,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[2]), ARGV[2], string.len(tostring(value)), tostring(value), ARGV[3]); " + "local syncs = redis.call('publish', KEYS[4], syncMsg); " + "return {value, syncs}; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)), System.currentTimeMillis(), encodeMapKey(key), syncId); if (atomicExecution) { @@ -2380,7 +2476,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs Long updateTimeout = getUpdateTimeout(); - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_LONG, + String name = getRawName(key); + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LONG, "local value = redis.call('hget', KEYS[1], ARGV[4]); " + "if value == false then " + "return 0; " @@ -2432,7 +2529,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return 0;" + "end; " + "return -1; ", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getOldValueListenerCounter(name)), accessTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue)); } @@ -2578,7 +2675,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture replaceValue(K key, V value) { Long updateTimeout = getUpdateTimeout(); - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, + String name = getRawName(key); + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return 0; " @@ -2617,7 +2715,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('publish', KEYS[4], msg); " + "end; " + "return 1;", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getOldValueListenerCounter(name)), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } @@ -2625,7 +2723,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture getAndReplaceValue(K key, V value) { Long updateTimeout = getUpdateTimeout(); - return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE, + String name = getRawName(key); + return commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_MAP_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return nil; " @@ -2664,7 +2763,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('publish', KEYS[4], msg); " + "end; " + "return value;", - Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getOldValueListenerCounter()), + Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getOldValueListenerCounter(name)), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } @@ -2952,7 +3051,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } } - protected Iterator keyIterator() { + Iterator keyIterator() { return new RedissonBaseMapIterator() { @Override protected K getValue(Map.Entry entry) { @@ -2960,18 +3059,18 @@ public class JCache extends RedissonObject implements Cache, CacheAs } @Override - protected void remove(java.util.Map.Entry value) { + protected void remove(Map.Entry value) { throw new UnsupportedOperationException(); } @Override - protected Object put(java.util.Map.Entry entry, Object value) { + protected Object put(Map.Entry entry, Object value) { throw new UnsupportedOperationException(); } @Override - protected ScanResult> iterator(RedisClient client, - long nextIterPos) { + protected ScanResult> iterator(RedisClient client, + long nextIterPos) { return JCache.this.scanIterator(JCache.this.getRawName(), client, nextIterPos); } }; @@ -3301,11 +3400,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs } @Override - public Iterator> iterator() { + public Iterator> iterator() { checkNotClosed(); - return new RedissonBaseMapIterator>() { + return new RedissonBaseMapIterator>() { @Override - protected Cache.Entry getValue(Map.Entry entry) { + protected Entry getValue(Map.Entry entry) { cacheManager.getStatBean(JCache.this).addHits(1); Long accessTimeout = getAccessTimeout(); JCacheEntry je = new JCacheEntry((K) entry.getKey(), (V) entry.getValue()); @@ -3323,15 +3422,15 @@ public class JCache extends RedissonObject implements Cache, CacheAs } @Override - protected Object put(java.util.Map.Entry entry, Object value) { + protected Object put(Map.Entry entry, Object value) { throw new UnsupportedOperationException(); } @Override - protected ScanResult> iterator(RedisClient client, - long nextIterPos) { + protected ScanResult> iterator(RedisClient client, + long nextIterPos) { return JCache.this.scanIterator(JCache.this.getRawName(), client, nextIterPos); }