diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index 8c4819a73..d564931f1 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -673,7 +673,8 @@ public class RedissonLocalCachedMap extends RedissonMap implements R } if (!missedKeys.isEmpty()) { - CompletionStage> f = loadAllMapAsync(missedKeys.spliterator(), false, 1); + CompletionStage> f = loadAllMapAsync(missedKeys.spliterator(), + false, 1, Thread.currentThread().getId()); CompletionStage> ff = f.thenApply(map -> { result.putAll(map); return result; diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 4655838a3..9f7f066be 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -33,6 +33,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.NumberConvertor; import org.redisson.client.protocol.decoder.*; import org.redisson.command.CommandAsyncExecutor; +import org.redisson.connection.ServiceManager; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.iterator.RedissonMapIterator; import org.redisson.iterator.RedissonMapKeyIterator; @@ -537,6 +538,10 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public RFuture> getAllAsync(Set keys) { + return getAllAsync(keys, Thread.currentThread().getId()); + } + + public RFuture> getAllAsync(Set keys, long threadId) { if (keys.isEmpty()) { return new CompletableFutureWrapper<>(Collections.emptyMap()); } @@ -551,7 +556,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { Set newKeys = new HashSet(keys); newKeys.removeAll(res.keySet()); - CompletionStage> ff = loadAllMapAsync(newKeys.spliterator(), false, 1); + CompletionStage> ff = loadAllMapAsync(newKeys.spliterator(), false, 1, threadId); return ff.thenApply(map -> { res.putAll(map); return res; @@ -1189,7 +1194,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { return new CompletableFutureWrapper<>(result); } - protected CompletionStage> loadAllMapAsync(Spliterator spliterator, boolean replaceExistingValues, int parallelism) { + protected CompletionStage> loadAllMapAsync(Spliterator spliterator, boolean replaceExistingValues, int parallelism, long threadId) { ForkJoinPool customThreadPool = new ForkJoinPool(parallelism); ConcurrentMap map = new ConcurrentHashMap<>(); CompletableFuture> result = new CompletableFuture<>(); @@ -1198,7 +1203,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { Stream s = StreamSupport.stream(spliterator, true); List> r = s.filter(k -> k != null) .map(k -> { - return loadValue(k, replaceExistingValues) + return loadValue(k, replaceExistingValues, threadId) .thenAccept(v -> { if (v != null) { map.put(k, v); @@ -1617,6 +1622,13 @@ public class RedissonMap extends RedissonExpirable implements RMap { protected CompletableFuture loadValue(K key, boolean replaceValue, long threadId) { RLock lock = getLock(key); + if (threadId == Long.MIN_VALUE) { + lock = ServiceManager.DUMMY_LOCK; + } + return loadValue(lock, key, replaceValue, threadId); + } + + private CompletableFuture loadValue(RLock lock, K key, boolean replaceValue, long threadId) { return lock.lockAsync(threadId).thenCompose(res -> { if (replaceValue) { return loadValue(key, lock, threadId); diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index cfeb1dc2e..9e460aeef 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -1724,12 +1724,13 @@ public class RedissonMapCache extends RedissonMap implements RMapCac return future; } + long threadId = Thread.currentThread().getId(); CompletionStage> f = future.thenCompose(res -> { if (!res.keySet().containsAll(keys)) { Set newKeys = new HashSet(keys); newKeys.removeAll(res.keySet()); - CompletionStage> ff = loadAllMapAsync(newKeys.spliterator(), false, 1); + CompletionStage> ff = loadAllMapAsync(newKeys.spliterator(), false, 1, threadId); return ff.thenApply(map -> { res.putAll(map); return res; diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index ce3508d48..c83875169 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -48,6 +48,7 @@ import org.redisson.RedissonShutdownException; import org.redisson.Version; import org.redisson.api.NatMapper; import org.redisson.api.RFuture; +import org.redisson.api.RLock; import org.redisson.cache.LRUCacheMap; import org.redisson.client.RedisNodeNotFoundException; import org.redisson.client.codec.Codec; @@ -65,6 +66,9 @@ import org.redisson.remote.ResponseEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -652,4 +656,14 @@ public final class ServiceManager { return mapResolver; } + public static final RLock DUMMY_LOCK = (RLock) Proxy.newProxyInstance(ServiceManager.class.getClassLoader(), new Class[] {RLock.class}, new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (method.getName().endsWith("lockAsync")) { + return new CompletableFutureWrapper<>((Void) null); + } + return null; + } + }); + } diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 8746cf758..c5f9acfe0 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -29,6 +29,7 @@ import org.redisson.client.protocol.decoder.MapValueDecoder; import org.redisson.codec.BaseEventCodec; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.MasterSlaveEntry; +import org.redisson.connection.ServiceManager; import org.redisson.connection.decoder.MapGetAllDecoder; import org.redisson.iterator.RedissonBaseMapIterator; import org.redisson.jcache.JMutableEntry.Action; @@ -49,9 +50,6 @@ import javax.cache.integration.*; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.EntryProcessorResult; -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; import java.net.URI; import java.util.*; import java.util.concurrent.*; @@ -83,12 +81,6 @@ public class JCache extends RedissonObject implements Cache, CacheAs /* * No locking required in atomic execution mode. */ - private static final RLock DUMMY_LOCK = (RLock) Proxy.newProxyInstance(JCache.class.getClassLoader(), new Class[] {RLock.class}, new InvocationHandler() { - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - return null; - } - }); public JCache(JCacheManager cacheManager, Redisson redisson, String name, JCacheConfiguration config, boolean hasOwnRedisson) { super(redisson.getConfig().getCodec(), redisson.getCommandExecutor(), name); @@ -1186,7 +1178,10 @@ public class JCache extends RedissonObject implements Cache, CacheAs RLock getLockedLock(K key) { if (atomicExecution) { - return DUMMY_LOCK; + /* + * No locking is required in atomic execution mode. + */ + return ServiceManager.DUMMY_LOCK; } String lockName = getLockName(key); diff --git a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java index d0c7ae45d..92964b3eb 100644 --- a/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/BaseTransactionalMap.java @@ -484,7 +484,7 @@ public class BaseTransactionalMap extends BaseTransactionalObject { } // TODO optimize - return map.getAllAsync(new HashSet<>(keyList)).thenApply(res -> { + return ((RedissonMap) map).getAllAsync(new HashSet<>(keyList), Long.MIN_VALUE).thenApply(res -> { for (K key : res.keySet()) { HashValue keyHash = toKeyHash(key); operations.add(new MapFastRemoveOperation(map, key, transactionId, threadId)); diff --git a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalLocalCachedMapTest.java index ad48297d2..1d608b014 100644 --- a/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/transaction/RedissonTransactionalLocalCachedMapTest.java @@ -19,37 +19,31 @@ import static org.assertj.core.api.Assertions.assertThat; public class RedissonTransactionalLocalCachedMapTest extends RedisDockerTest { + @Test + public void testRemoval() { + Map externalStore = new HashMap<>(); + externalStore.put("k1", "v1"); - // reproducer for https://github.com/redisson/redisson/issues/5198 - //@Test - public void test1() { - final LocalCachedMapOptions opts = LocalCachedMapOptions.defaults(); - final Map externalStore = new HashMap<>(); - externalStore.put("hello", "world"); - opts.loader(new MapLoader() { - @Override - public String load(String key) { - return externalStore.get(key); - } - - @Override - public Iterable loadAllKeys() { - return externalStore.keySet(); - } - }); + org.redisson.api.options.LocalCachedMapOptions opts = org.redisson.api.options.LocalCachedMapOptions + .name("test").loader(new MapLoader<>() { + @Override + public String load(String key) { + return externalStore.get(key); + } - RLocalCachedMap lcMap = redisson.getLocalCachedMap("lcMap", opts); + @Override + public Iterable loadAllKeys() { + return externalStore.keySet(); + } + }); - // Uncomment the below line and hang will be avoided -// lcMap.get("hello"); + RLocalCachedMap lcMap = redisson.getLocalCachedMap(opts); RTransaction tx = redisson.createTransaction(TransactionOptions.defaults()); - RLocalCachedMap txMap = tx.getLocalCachedMap(lcMap); + RLocalCachedMap txMap = tx.getLocalCachedMap(lcMap); - // Below line will hang for tx timeout period - txMap.fastRemove("hello"); + txMap.fastRemove("k1"); - // Commit will fail because tx has timed out tx.commit(); }