diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index fc535508e..305ad5188 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -61,7 +61,7 @@ import java.util.stream.Collectors; /** * JCache implementation - * + * * @author Nikita Koksharov * * @param key @@ -70,10 +70,10 @@ import java.util.stream.Collectors; public class JCache extends RedissonObject implements Cache, CacheAsync { final boolean atomicExecution = System.getProperty("org.jsr107.tck.management.agentId") == null; - + final JCacheManager cacheManager; final JCacheConfiguration config; - private final ConcurrentMap, Map> listeners = + private final ConcurrentMap, Map> listeners = new ConcurrentHashMap, Map>(); final Redisson redisson; @@ -81,7 +81,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs private CacheWriter cacheWriter; private boolean closed; private boolean hasOwnRedisson; - + /* * No locking required in atomic execution mode. */ @@ -91,13 +91,13 @@ public class JCache extends RedissonObject implements Cache, CacheAs return null; } }); - + public JCache(JCacheManager cacheManager, Redisson redisson, String name, JCacheConfiguration config, boolean hasOwnRedisson) { super(redisson.getConfig().getCodec(), redisson.getCommandExecutor(), name); - + this.hasOwnRedisson = hasOwnRedisson; this.redisson = redisson; - + Factory> cacheLoaderFactory = config.getCacheLoaderFactory(); if (cacheLoaderFactory != null) { cacheLoader = cacheLoaderFactory.create(); @@ -106,23 +106,23 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (config.getCacheWriterFactory() != null) { cacheWriter = (CacheWriter) cacheWriterFactory.create(); } - + this.cacheManager = cacheManager; this.config = config; - + redisson.getEvictionScheduler().scheduleJCache(getRawName(), getTimeoutSetName(), getExpiredChannelName()); - + for (CacheEntryListenerConfiguration listenerConfig : config.getCacheEntryListenerConfigurations()) { registerCacheEntryListener(listenerConfig, false); } } - + void checkNotClosed() { if (closed) { throw new IllegalStateException(); } } - + String getTimeoutSetName() { return "jcache_timeout_set:{" + getRawName() + "}"; } @@ -138,7 +138,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs String getCreatedSyncChannelName() { return "jcache_created_sync_channel:{" + getRawName() + "}"; } - + String getCreatedSyncChannelName(String name) { return prefixName("jcache_created_sync_channel", name); } @@ -178,7 +178,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs String getExpiredChannelName() { return "jcache_expired_channel:{" + getRawName() + "}"; } - + String getRemovedChannelName(String name) { return prefixName("jcache_removed_channel", name); } @@ -207,7 +207,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs throw new NullPointerException(); } } - + @Override public V get(K key) { RLock lock = getLockedLock(key); @@ -262,9 +262,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs }); return result; } - + V getValueLocked(K key) { - + V value = evalWrite(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " @@ -278,7 +278,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return value; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), 0, System.currentTimeMillis(), encodeMapKey(key)); - + if (value != null) { Long accessTimeout = getAccessTimeout(); if (accessTimeout == -1) { @@ -298,17 +298,17 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value), ARGV[4]); " + "local syncs = redis.call('publish', KEYS[4], syncMsg); " + "return syncs;" - + "elseif ARGV[1] ~= '-1' then " + + "elseif ARGV[1] ~= '-1' then " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + "return 0;" + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), - getRemovedSyncChannelName()), + getRemovedSyncChannelName()), accessTimeout, System.currentTimeMillis(), encodeMapKey(key), syncId); - + result.add(syncs); result.add(syncId); - + waitSync(result); return value; } @@ -318,7 +318,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture getValue(K key) { Long accessTimeout = getAccessTimeout(); - + if (accessTimeout == -1) { String name = getRawName(key); return commandExecutor.evalReadAsync(name, codec, RedisCommands.EVAL_MAP_VALUE, @@ -326,12 +326,12 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == false then " + "return nil; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[2]) then " + "return nil; " + "end; " - + + "return value; ", Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name)), accessTimeout, System.currentTimeMillis(), encodeMapKey(key)); @@ -343,18 +343,18 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == false then " + "return nil; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[2]) then " + "return nil; " + "end; " - + + "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[3], msg); " - + "elseif ARGV[1] ~= '-1' then " + + "elseif ARGV[1] ~= '-1' then " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + "end; " @@ -380,7 +380,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs Long getAccessTimeout() { return getAccessTimeout(System.currentTimeMillis()); } - + V loadValue(K key) { V value = null; try { @@ -399,7 +399,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } return value; } - + private R write(String key, RedisCommand command, Object... params) { RFuture future = commandExecutor.writeAsync(key, command, params); try { @@ -408,7 +408,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs throw new CacheException(e); } } - + R evalWrite(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { RFuture future = commandExecutor.evalWriteAsync(key, codec, evalCommandType, script, keys, params); try { @@ -417,7 +417,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs throw new CacheException(e); } } - + R evalRead(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { RFuture future = commandExecutor.evalReadAsync(key, codec, evalCommandType, script, keys, params); try { @@ -426,10 +426,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs throw new CacheException(e); } } - + private boolean putValueLocked(K key, Object value) { double syncId = ThreadLocalRandom.current().nextDouble(); - + if (containsKey(key)) { Long updateTimeout = getUpdateTimeout(); List res = evalWrite(getRawName(), codec, RedisCommands.EVAL_LIST, @@ -480,13 +480,13 @@ public class JCache extends RedissonObject implements Cache, CacheAs Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); - + res.add(syncId); waitSync(res); - + return (Long) res.get(0) == 1; } - + Long creationTimeout = getCreationTimeout(); if (creationTimeout == 0) { return false; @@ -509,12 +509,12 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return {1, syncs};" + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getRemovedChannelName(), getUpdatedChannelName(), - getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), + getCreatedSyncChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName()), creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); - + res.add(syncId); waitSync(res); - + return (Long) res.get(0) == 1; } @@ -530,7 +530,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs params.add(System.currentTimeMillis()); double syncId = ThreadLocalRandom.current().nextDouble(); params.add(syncId); - + for (Map.Entry entry : map.entrySet()) { params.add(encodeMapKey(entry.getKey())); params.add(encodeMapValue(entry.getValue())); @@ -761,9 +761,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getCreatedSyncChannelName(name), getRemovedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)), creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); - + RPromise result = waitSync(syncId, res); - + return result; } @@ -775,14 +775,14 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(e)); return; } - + r.add(syncId); - + if (r.size() < 2) { result.trySuccess((Long) r.get(0) >= 1); return; } - + Long syncs = (Long) r.get(r.size() - 2); if (syncs != null && syncs > 0) { RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); @@ -805,7 +805,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs }); } else { res.syncUninterruptibly(); - + List r = res.getNow(); r.add(syncId); waitSync(r); @@ -818,7 +818,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (config.getExpiryPolicy().getExpiryForUpdate() == null) { return -1L; } - + Long updateTimeout = config.getExpiryPolicy().getExpiryForUpdate().getAdjustedTime(baseTime); if (config.getExpiryPolicy().getExpiryForUpdate().isZero()) { updateTimeout = 0L; @@ -827,7 +827,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } return updateTimeout; } - + Long getUpdateTimeout() { return getUpdateTimeout(System.currentTimeMillis()); } @@ -844,11 +844,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs } return creationTimeout; } - + Long getCreationTimeout() { return getCreationTimeout(System.currentTimeMillis()); } - + RFuture putIfAbsentValue(K key, Object value) { Long creationTimeout = getCreationTimeout(); if (creationTimeout == 0) { @@ -866,7 +866,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return 0; " + "else " + "if ARGV[1] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " + "redis.call('publish', KEYS[3], msg); " @@ -874,26 +874,26 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "else " + "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " - + "redis.call('publish', KEYS[3], msg); " + + "redis.call('publish', KEYS[3], msg); " + "return 1;" + "end; " + "end; ", Arrays.asList(name, getTimeoutSetName(name), getCreatedChannelName(name)), creationTimeout, encodeMapKey(key), encodeMapValue(value), System.currentTimeMillis()); } - + private boolean putIfAbsentValueLocked(K key, Object value) { if (containsKey(key)) { return false; } - + Long creationTimeout = getCreationTimeout(); if (creationTimeout == 0) { return false; } return evalWrite(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, "if ARGV[1] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + + "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " + "redis.call('publish', KEYS[3], msg); " @@ -901,7 +901,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "else " + "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[2]), ARGV[2], string.len(ARGV[3]), ARGV[3]); " - + "redis.call('publish', KEYS[3], msg); " + + "redis.call('publish', KEYS[3], msg); " + "return 1;" + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName()), @@ -941,7 +941,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs return Collections.emptyMap(); } } - + RFuture> result = getAllAsync(keys); result.syncUninterruptibly(); @@ -953,7 +953,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs checkNotClosed(); long startTime = currentNanoTime(); Long accessTimeout = getAccessTimeout(); - + List args = new ArrayList<>(keys.size() + 2); args.add(accessTimeout); args.add(System.currentTimeMillis()); @@ -1047,6 +1047,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs RPromise> handleGetAllResult(long startTime, RFuture> res) { RPromise> result = new RedissonPromise<>(); res.onComplete((r, ex) -> { + if (ex != null) { + result.tryFailure(ex); + return; + } + Map map = r.entrySet().stream() .filter(e -> e.getValue() != null) .collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue())); @@ -1116,7 +1121,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (keys == null) { throw new NullPointerException(); } - + for (K key : keys) { checkKey(key); } @@ -1168,12 +1173,12 @@ public class JCache extends RedissonObject implements Cache, CacheAs } }); } - + RLock getLockedLock(K key) { if (atomicExecution) { return DUMMY_LOCK; } - + String lockName = getLockName(key); RLock lock = redisson.getLock(lockName); try { @@ -1191,7 +1196,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (value == null) { throw new NullPointerException(); } - + RPromise result = new RedissonPromise<>(); long startTime = currentNanoTime(); if (config.isWriteThrough()) { @@ -1223,7 +1228,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.trySuccess(null); return; } - + writeCache(key, value, result, startTime, res, added); }); } finally { @@ -1244,7 +1249,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(e)); return; } - + if (r) { cacheManager.getStatBean(this).addPuts(1); } @@ -1273,7 +1278,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheWriterException(exc)); return; } - + handleException(result, e); }); } @@ -1292,12 +1297,12 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheWriterException(exc)); return; } - + handleException(result, e); }); return; } - + handleException(result, e); } }; @@ -1367,7 +1372,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value), ARGV[6]); " + "local syncs = redis.call('publish', KEYS[6], syncMsg); " + "return {0, value, syncs};" - + "elseif ARGV[2] ~= '-1' then " + + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " @@ -1382,7 +1387,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('publish', KEYS[5], msg); " + "local syncs = redis.call('publish', KEYS[8], syncMsg); " + "return {1, value, syncs};" - + "else " + + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "local oldValueRequired = tonumber(redis.call('get', KEYS[9])); " + "local msg, syncMsg; " @@ -1400,12 +1405,12 @@ public class JCache extends RedissonObject implements Cache, CacheAs Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getCreatedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getCreatedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); - + result.add(syncId); waitSync(result); return result; } - + Long creationTimeout = getCreationTimeout(); if (creationTimeout == 0) { return Collections.emptyList(); @@ -1419,7 +1424,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[4], syncMsg); " + "return {1, syncs};" - + "else " + + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + "redis.call('publish', KEYS[3], msg); " @@ -1429,19 +1434,19 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getCreatedChannelName(), getCreatedSyncChannelName()), creationTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); - + result.add(syncId); waitSync(result); return result; } - + RFuture> getAndPutValue(K key, V value) { Long creationTimeout = getCreationTimeout(); - + Long updateTimeout = getUpdateTimeout(); - + double syncId = ThreadLocalRandom.current().nextDouble(); - + RPromise> result = new RedissonPromise<>(); String name = getRawName(key); RFuture> future = commandExecutor.evalWriteAsync(name, codec, RedisCommands.EVAL_LIST, @@ -1487,16 +1492,16 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; " + "else " + "if ARGV[1] == '0' then " - + "return {nil};" + + "return {nil};" + "elseif ARGV[1] ~= '-1' then " - + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + "redis.call('publish', KEYS[4], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5], ARGV[6]); " + "local syncs = redis.call('publish', KEYS[7], syncMsg); " + "return {1, syncs};" - + "else " + + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[5]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[5]), ARGV[5]); " + "redis.call('publish', KEYS[4], msg); " @@ -1508,19 +1513,19 @@ public class JCache extends RedissonObject implements Cache, CacheAs Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getCreatedChannelName(name), getUpdatedChannelName(name), getRemovedSyncChannelName(name), getCreatedSyncChannelName(name), getUpdatedSyncChannelName(name), getOldValueListenerCounter(name)), creationTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); - + future.onComplete((r, e) -> { if (e != null) { result.tryFailure(e); return; } - + if (!r.isEmpty()) { r.add(syncId); } result.trySuccess(r); }); - + return result; } @@ -1530,7 +1535,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs future.syncUninterruptibly(); return future.getNow(); } - + @Override public RFuture getAndPutAsync(K key, V value) { checkNotClosed(); @@ -1538,7 +1543,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (value == null) { throw new NullPointerException(); } - + RPromise result = new RedissonPromise<>(); long startTime = currentNanoTime(); if (config.isWriteThrough()) { @@ -1556,7 +1561,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + if (res.isEmpty()) { cacheManager.getStatBean(this).addPuts(1); cacheManager.getStatBean(this).addMisses(1); @@ -1574,7 +1579,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.trySuccess((V) res.get(1)); return; } - + RPromise writeRes = new RedissonPromise<>(); writeCache(key, value, writeRes, startTime, res, added); writeRes.onComplete((r, e) -> { @@ -1582,7 +1587,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(e); return; } - + V val = getAndPutResult(startTime, res); result.trySuccess(val); }); @@ -1605,7 +1610,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(e)); return; } - + V val = getAndPutResult(startTime, r); result.trySuccess(val); }); @@ -1649,18 +1654,18 @@ public class JCache extends RedissonObject implements Cache, CacheAs throw new NullPointerException(); } } - - + + long startTime = currentNanoTime(); RFuture future = putAllValues(map); RPromise result = new RedissonPromise<>(); - + Runnable r = () -> { Map> addedEntries = new HashMap<>(); for (Map.Entry entry : map.entrySet()) { addedEntries.put(entry.getKey(), new JCacheEntry(entry.getKey(), entry.getValue())); } - + try { cacheWriter.writeAll(addedEntries.values()); } catch (Exception e) { @@ -1669,12 +1674,12 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + handleException(result, e); }); return; } - + result.trySuccess(null); }; @@ -1683,13 +1688,13 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + cacheManager.getStatBean(this).addPuts(res); for (int i = 0; i < res; i++) { cacheManager.getStatBean(this).addPutTime((currentNanoTime() - startTime) / res); } }); - + if (atomicExecution) { future.onComplete((res, ex) -> { if (config.isWriteThrough()) { @@ -1707,12 +1712,12 @@ public class JCache extends RedissonObject implements Cache, CacheAs } return result; } - + void waitSync(List result) { if (result.size() < 2) { return; } - + Long syncs = (Long) result.get(result.size() - 2); Double syncId = (Double) result.get(result.size() - 1); if (syncs != null && syncs > 0) { @@ -1740,7 +1745,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (value == null) { throw new NullPointerException(); } - + long startTime = currentNanoTime(); RLock lock = getLockedLock(key); RPromise result = new RedissonPromise<>(); @@ -1757,7 +1762,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + if (r) { cacheManager.getStatBean(this).addPuts(1); if (config.isWriteThrough()) { @@ -1790,7 +1795,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } result.tryFailure(new CacheWriterException(e)); } - + RFuture removeValue(K key) { double syncId = ThreadLocalRandom.current().nextDouble(); @@ -1800,7 +1805,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == 0 then " + "return {0}; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1]) then " + "return {0}; " @@ -1816,7 +1821,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return {1, syncs};", Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)), System.currentTimeMillis(), encodeMapKey(key), syncId); - + RPromise result = waitSync(syncId, future); return result; } @@ -1827,7 +1832,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs future.syncUninterruptibly(); return future.getNow(); } - + @Override public RFuture removeAsync(K key) { checkNotClosed(); @@ -1845,7 +1850,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + commandExecutor.getConnectionManager().getExecutor().submit(() -> { try { cacheWriter.delete(key); @@ -1896,22 +1901,22 @@ public class JCache extends RedissonObject implements Cache, CacheAs }); return result; } - + } private boolean removeValueLocked(K key, V value) { - + Boolean result = evalWrite(getRawName(), codec, RedisCommands.EVAL_BOOLEAN, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return 0; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[2]) then " + "return 0; " + "end; " - + + "if ARGV[4] == value then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " @@ -1932,19 +1937,19 @@ public class JCache extends RedissonObject implements Cache, CacheAs "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " - + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + + "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[3], msg); " - + "elseif ARGV[1] ~= '-1' then " + + "elseif ARGV[1] ~= '-1' then " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName()), - accessTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); + accessTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } return result; } - + RFuture removeValue(K key, V value) { Long accessTimeout = getAccessTimeout(); @@ -1954,7 +1959,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == false then " + "return 0; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[2]) then " + "return 0; " @@ -1967,13 +1972,13 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('publish', KEYS[3], msg); " + "return 1; " + "end; " - + + "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('zrem', KEYS[2], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[3], msg); " - + "elseif ARGV[1] ~= '-1' then " + + "elseif ARGV[1] ~= '-1' then " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + "end; " + "return 0; ", @@ -1988,7 +1993,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs future.syncUninterruptibly(); return future.getNow(); } - + @Override public RFuture removeAsync(K key, V value) { checkNotClosed(); @@ -2009,18 +2014,18 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + if (r) { commandExecutor.getConnectionManager().getExecutor().submit(() -> { try { cacheWriter.delete(key); } catch (Exception e) { putValue(key, value); - + handleException(result, e); return; } - + cacheManager.getStatBean(this).addHits(1); cacheManager.getStatBean(this).addRemovals(1); cacheManager.getStatBean(this).addRemoveTime(currentNanoTime() - startTime); @@ -2071,7 +2076,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + if (r) { cacheManager.getStatBean(this).addHits(1); cacheManager.getStatBean(this).addRemovals(1); @@ -2090,16 +2095,8 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture> getAndRemoveValues(Collection keys) { double syncId = ThreadLocalRandom.current().nextDouble(); - - List params = new ArrayList<>(); - params.add(System.currentTimeMillis()); - params.add(syncId); - - for (K key : keys) { - params.add(encodeMapKey(key)); - } - RFuture> future = getAndRemoveValuesOperation(commandExecutor, null, getRawName(), params); + RFuture> future = getAndRemoveValuesOperation(commandExecutor, null, getRawName(), (Collection) keys, syncId); RPromise> result = new RedissonPromise<>(); if (atomicExecution) { @@ -2164,7 +2161,15 @@ public class JCache extends RedissonObject implements Cache, CacheAs return result; } - RFuture> getAndRemoveValuesOperation(CommandAsyncExecutor commandExecutor, MasterSlaveEntry entry, String name, List params) { + RFuture> getAndRemoveValuesOperation(CommandAsyncExecutor commandExecutor, MasterSlaveEntry entry, String name, Collection keys, double syncId) { + List params = new ArrayList<>(); + params.add(System.currentTimeMillis()); + params.add(syncId); + + for (Object key : keys) { + params.add(encodeMapKey(key)); + } + String script = "local syncs = 0; " + "local values = {}; " + "local result = {}; " @@ -2242,7 +2247,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == false then " + "return {nil}; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[1]) then " + "return {nil}; " @@ -2257,7 +2262,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return {value, syncs}; ", Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getRemovedSyncChannelName(name)), System.currentTimeMillis(), encodeMapKey(key), syncId); - + if (atomicExecution) { future.onComplete((r, exc1) -> { if (exc1 != null) { @@ -2269,7 +2274,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.trySuccess(null); return; } - + Long syncs = (Long) r.get(1); if (syncs != null && syncs > 0) { RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); @@ -2292,14 +2297,14 @@ public class JCache extends RedissonObject implements Cache, CacheAs }); } else { future.syncUninterruptibly(); - + List r = future.getNow(); - + if (r.size() < 2) { result.trySuccess(null); return result; } - + Long syncs = (Long) r.get(1); if (syncs != null && syncs > 0) { RSemaphore semaphore = redisson.getSemaphore(getSyncName(syncId)); @@ -2310,10 +2315,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs Thread.currentThread().interrupt(); } } - + result.trySuccess((V) r.get(0)); } - + return result; } @@ -2323,7 +2328,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs future.syncUninterruptibly(); return future.getNow(); } - + @Override public RFuture getAndRemoveAsync(K key) { checkNotClosed(); @@ -2339,14 +2344,14 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(e)); return; } - + if (value != null) { cacheManager.getStatBean(this).addHits(1); cacheManager.getStatBean(this).addRemovals(1); } else { cacheManager.getStatBean(this).addMisses(1); } - + if (config.isWriteThrough()) { commandExecutor.getConnectionManager().getExecutor().submit(() -> { try { @@ -2355,7 +2360,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (value != null) { putValue(key, value); } - + handleException(result, ex); return; } @@ -2381,7 +2386,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == false then " + "return 0; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[4]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[3]) then " + "return 0; " @@ -2393,7 +2398,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return -1;", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), 0, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue)); - + if (res == 1) { Long updateTimeout = getUpdateTimeout(); double syncId = ThreadLocalRandom.current().nextDouble(); @@ -2433,15 +2438,15 @@ public class JCache extends RedissonObject implements Cache, CacheAs Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), 0, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId); - + List result = Arrays.asList(syncs, syncId); waitSync(result); - + return res; } else if (res == 0) { return res; } - + Long accessTimeout = getAccessTimeout(); if (accessTimeout == -1) { return -1; @@ -2457,23 +2462,23 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('publish', KEYS[3], msg); " + "local syncMsg = struct.pack('Lc0Lc0d', string.len(ARGV[4]), ARGV[4], string.len(value), value, ARGV[7]); " + "local syncs = redis.call('publish', KEYS[4], syncMsg); " - + "return {-1, syncs}; " - + "elseif ARGV[1] ~= '-1' then " + + "return {-1, syncs}; " + + "elseif ARGV[1] ~= '-1' then " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + "return {0};" + "end; ", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getRemovedSyncChannelName()), accessTimeout, 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue), syncId); - + result.add(syncId); waitSync(result); return (Long) result.get(0); } - + RFuture replaceValue(K key, V oldValue, V newValue) { Long accessTimeout = getAccessTimeout(); - + Long updateTimeout = getUpdateTimeout(); String name = getRawName(key); @@ -2482,7 +2487,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == false then " + "return 0; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[4]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[3]) then " + "return 0; " @@ -2494,7 +2499,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('zrem', KEYS[2], ARGV[4]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[3], msg); " - + "elseif ARGV[2] ~= '-1' then " + + "elseif ARGV[2] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[4]); " + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " @@ -2505,7 +2510,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(ARGV[6]), ARGV[6], string.len(tostring(value)), tostring(value)); " + "end; " + "redis.call('publish', KEYS[4], msg); " - + "else " + + "else " + "redis.call('hset', KEYS[1], ARGV[4], ARGV[6]); " + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + "local msg; " @@ -2518,29 +2523,29 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "end; " + "return 1;" + "end; " - + + "if ARGV[1] == '0' then " + "redis.call('hdel', KEYS[1], ARGV[4]); " + "redis.call('zrem', KEYS[2], ARGV[4]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[4]), ARGV[4], string.len(value), value); " - + "redis.call('publish', KEYS[3], msg); " - + "elseif ARGV[1] ~= '-1' then " + + "redis.call('publish', KEYS[3], msg); " + + "elseif ARGV[1] ~= '-1' then " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[4]); " + "return 0;" + "end; " + "return -1; ", Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getOldValueListenerCounter(name)), accessTimeout, updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(oldValue), encodeMapValue(newValue)); - + } - + @Override public boolean replace(K key, V oldValue, V newValue) { RFuture future = replaceAsync(key, oldValue, newValue); future.syncUninterruptibly(); return future.getNow(); } - + @Override public RFuture replaceAsync(K key, V oldValue, V newValue) { checkNotClosed(); @@ -2568,7 +2573,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + if (res == 1) { if (config.isWriteThrough()) { commandExecutor.getConnectionManager().getExecutor().submit(() -> { @@ -2576,11 +2581,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs cacheWriter.write(new JCacheEntry(key, newValue)); } catch (Exception e) { removeValues(key); - + handleException(result, e); return; } - + cacheManager.getStatBean(this).addHits(1); cacheManager.getStatBean(this).addPuts(1); cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime); @@ -2610,7 +2615,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } return result; } - + private boolean replaceValueLocked(K key, V value) { if (containsKey(key)) { @@ -2661,17 +2666,17 @@ public class JCache extends RedissonObject implements Cache, CacheAs Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); - + List result = Arrays.asList(syncs, syncId); waitSync(result); return true; } - + return false; } - + RFuture replaceValue(K key, V value) { Long updateTimeout = getUpdateTimeout(); @@ -2681,7 +2686,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == false then " + "return 0; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[2]) then " + "return 0; " @@ -2703,7 +2708,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "end; " + "redis.call('publish', KEYS[4], msg); " - + "else " + + "else " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + "local msg; " @@ -2717,9 +2722,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "return 1;", Arrays.asList(name, getTimeoutSetName(name), getRemovedChannelName(name), getUpdatedChannelName(name), getOldValueListenerCounter(name)), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); - + } - + RFuture getAndReplaceValue(K key, V value) { Long updateTimeout = getUpdateTimeout(); @@ -2729,7 +2734,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "if value == false then " + "return nil; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[2]) then " + "return nil; " @@ -2740,7 +2745,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "redis.call('zrem', KEYS[2], ARGV[3]); " + "local msg = struct.pack('Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(tostring(value)), tostring(value)); " + "redis.call('publish', KEYS[3], msg); " - + "elseif ARGV[1] ~= '-1' then " + + "elseif ARGV[1] ~= '-1' then " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "redis.call('zadd', KEYS[2], ARGV[1], ARGV[3]); " + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " @@ -2751,7 +2756,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs + "msg = struct.pack('Lc0Lc0Lc0', string.len(ARGV[3]), ARGV[3], string.len(ARGV[4]), ARGV[4], string.len(tostring(value)), tostring(value)); " + "end; " + "redis.call('publish', KEYS[4], msg); " - + "else " + + "else " + "redis.call('hset', KEYS[1], ARGV[3], ARGV[4]); " + "local oldValueRequired = tonumber(redis.call('get', KEYS[5])); " + "local msg; " @@ -2767,19 +2772,19 @@ public class JCache extends RedissonObject implements Cache, CacheAs updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); } - + private V getAndReplaceValueLocked(K key, V value) { V oldValue = evalWrite(getRawName(), codec, RedisCommands.EVAL_MAP_VALUE, "local value = redis.call('hget', KEYS[1], ARGV[3]); " + "if value == false then " + "return nil; " + "end; " - + + "local expireDateScore = redis.call('zscore', KEYS[2], ARGV[3]); " + "if expireDateScore ~= false and tonumber(expireDateScore) <= tonumber(ARGV[2]) then " + "return nil; " + "end; " - + + "return value;", Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName()), 0, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value)); @@ -2831,7 +2836,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs Arrays.asList(getRawName(), getTimeoutSetName(), getRemovedChannelName(), getUpdatedChannelName(), getRemovedSyncChannelName(), getUpdatedSyncChannelName(), getOldValueListenerCounter()), updateTimeout, System.currentTimeMillis(), encodeMapKey(key), encodeMapValue(value), syncId); - + List result = Arrays.asList(syncs, syncId); waitSync(result); } @@ -2862,7 +2867,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs future.syncUninterruptibly(); return future.getNow(); } - + @Override public RFuture replaceAsync(K key, V value) { checkNotClosed(); @@ -2882,13 +2887,13 @@ public class JCache extends RedissonObject implements Cache, CacheAs boolean res = replaceValueLocked(key, value); future = RedissonPromise.newSucceededFuture(res); } - + future.onComplete((r, ex) -> { if (ex != null) { result.tryFailure(new CacheException(ex)); return; } - + if (r) { if (config.isWriteThrough()) { commandExecutor.getConnectionManager().getExecutor().submit(() -> { @@ -2896,10 +2901,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs cacheWriter.write(new JCacheEntry(key, value)); } catch (Exception e) { removeValues(key); - + handleException(result, e); } - + cacheManager.getStatBean(this).addHits(1); cacheManager.getStatBean(this).addPuts(1); cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime); @@ -2907,7 +2912,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs }); return; } - + cacheManager.getStatBean(this).addHits(1); cacheManager.getStatBean(this).addPuts(1); } else { @@ -2919,7 +2924,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } finally { lock.unlock(); } - + return result; } @@ -2929,7 +2934,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs future.syncUninterruptibly(); return future.getNow(); } - + @Override public RFuture getAndReplaceAsync(K key, V value) { checkNotClosed(); @@ -2954,7 +2959,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(new CacheException(ex)); return; } - + if (r != null) { if (config.isWriteThrough()) { commandExecutor.getConnectionManager().getExecutor().submit(() -> { @@ -2964,11 +2969,11 @@ public class JCache extends RedissonObject implements Cache, CacheAs cacheWriter.write(new JCacheEntry(key, value)); } catch (Exception e) { removeValues(key); - + handleException(result, e); return; } - + cacheManager.getStatBean(this).addPutTime(currentNanoTime() - startTime); cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime); result.trySuccess(r); @@ -2995,15 +3000,15 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture future = removeAllAsync(keys); future.syncUninterruptibly(); } - + @Override public RFuture removeAllAsync(Set keys) { checkNotClosed(); - + for (K key : keys) { checkKey(key); } - + long startTime = currentNanoTime(); RPromise result = new RedissonPromise<>(); if (config.isWriteThrough()) { @@ -3013,7 +3018,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs result.tryFailure(ex); return; } - + try { cacheWriter.deleteAll(r.keySet()); } catch (Exception e) { @@ -3040,9 +3045,9 @@ public class JCache extends RedissonObject implements Cache, CacheAs } return result; } - + MapScanResult scanIterator(String name, RedisClient client, long startPos) { - RFuture> f + RFuture> f = commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "COUNT", 50); try { return get(f); @@ -3075,7 +3080,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs } }; } - + @Override public void removeAll() { checkNotClosed(); @@ -3099,7 +3104,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs RFuture future = clearAsync(); future.syncUninterruptibly(); } - + @Override public RFuture clearAsync() { checkNotClosed(); @@ -3198,7 +3203,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs if (isClosed()) { return; } - + synchronized (cacheManager) { if (!isClosed()) { if (hasOwnRedisson) { @@ -3208,7 +3213,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs for (CacheEntryListenerConfiguration config : listeners.keySet()) { deregisterCacheEntryListener(config); } - + closed = true; } } @@ -3257,7 +3262,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs Factory> factory = cacheEntryListenerConfiguration.getCacheEntryListenerFactory(); final CacheEntryListener listener = factory.create(); - + Factory> filterFactory = cacheEntryListenerConfiguration.getCacheEntryEventFilterFactory(); final CacheEntryEventFilter filter; if (filterFactory != null) { @@ -3265,22 +3270,22 @@ public class JCache extends RedissonObject implements Cache, CacheAs } else { filter = null; } - + Map values = new ConcurrentHashMap(); - + Map oldValues = listeners.putIfAbsent(cacheEntryListenerConfiguration, values); if (oldValues != null) { values = oldValues; } - + final boolean sync = cacheEntryListenerConfiguration.isSynchronous(); - + if (CacheEntryRemovedListener.class.isAssignableFrom(listener.getClass())) { String channelName = getRemovedChannelName(); if (sync) { channelName = getRemovedSyncChannelName(); } - + RTopic topic = redisson.getTopic(channelName, new JCacheEventCodec(codec, osType, sync)); int listenerId = topic.addListener(List.class, new MessageListener>() { @Override @@ -3365,7 +3370,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs }); values.put(listenerId, channelName); } - + if (addToConfig) { config.addCacheEntryListenerConfiguration(cacheEntryListenerConfiguration); } @@ -3378,7 +3383,7 @@ public class JCache extends RedissonObject implements Cache, CacheAs semaphore.release(); } } - + @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration cacheEntryListenerConfiguration) { Map listenerIds = listeners.remove(cacheEntryListenerConfiguration);