Fixed - RLocalCacheMap remove method hangs if called inside transaction #5198

pull/6384/head
Nikita Koksharov 3 weeks ago
parent 5fc0963b76
commit 5909ad0f4c

@ -673,7 +673,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
if (!missedKeys.isEmpty()) {
CompletionStage<Map<K, V>> f = loadAllMapAsync(missedKeys.spliterator(), false, 1);
CompletionStage<Map<K, V>> f = loadAllMapAsync(missedKeys.spliterator(),
false, 1, Thread.currentThread().getId());
CompletionStage<Map<K, V>> ff = f.thenApply(map -> {
result.putAll(map);
return result;

@ -33,6 +33,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.*;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.ServiceManager;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.iterator.RedissonMapIterator;
import org.redisson.iterator.RedissonMapKeyIterator;
@ -537,6 +538,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Map<K, V>> getAllAsync(Set<K> keys) {
return getAllAsync(keys, Thread.currentThread().getId());
}
public RFuture<Map<K, V>> getAllAsync(Set<K> keys, long threadId) {
if (keys.isEmpty()) {
return new CompletableFutureWrapper<>(Collections.emptyMap());
}
@ -551,7 +556,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
Set<K> newKeys = new HashSet<K>(keys);
newKeys.removeAll(res.keySet());
CompletionStage<Map<K, V>> ff = loadAllMapAsync(newKeys.spliterator(), false, 1);
CompletionStage<Map<K, V>> ff = loadAllMapAsync(newKeys.spliterator(), false, 1, threadId);
return ff.thenApply(map -> {
res.putAll(map);
return res;
@ -1189,7 +1194,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return new CompletableFutureWrapper<>(result);
}
protected CompletionStage<Map<K, V>> loadAllMapAsync(Spliterator<K> spliterator, boolean replaceExistingValues, int parallelism) {
protected CompletionStage<Map<K, V>> loadAllMapAsync(Spliterator<K> spliterator, boolean replaceExistingValues, int parallelism, long threadId) {
ForkJoinPool customThreadPool = new ForkJoinPool(parallelism);
ConcurrentMap<K, V> map = new ConcurrentHashMap<>();
CompletableFuture<Map<K, V>> result = new CompletableFuture<>();
@ -1198,7 +1203,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
Stream<K> s = StreamSupport.stream(spliterator, true);
List<CompletableFuture<?>> r = s.filter(k -> k != null)
.map(k -> {
return loadValue(k, replaceExistingValues)
return loadValue(k, replaceExistingValues, threadId)
.thenAccept(v -> {
if (v != null) {
map.put(k, v);
@ -1617,6 +1622,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
protected CompletableFuture<V> loadValue(K key, boolean replaceValue, long threadId) {
RLock lock = getLock(key);
if (threadId == Long.MIN_VALUE) {
lock = ServiceManager.DUMMY_LOCK;
}
return loadValue(lock, key, replaceValue, threadId);
}
private CompletableFuture<V> loadValue(RLock lock, K key, boolean replaceValue, long threadId) {
return lock.lockAsync(threadId).thenCompose(res -> {
if (replaceValue) {
return loadValue(key, lock, threadId);

@ -1724,12 +1724,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
return future;
}
long threadId = Thread.currentThread().getId();
CompletionStage<Map<K, V>> f = future.thenCompose(res -> {
if (!res.keySet().containsAll(keys)) {
Set<K> newKeys = new HashSet<K>(keys);
newKeys.removeAll(res.keySet());
CompletionStage<Map<K, V>> ff = loadAllMapAsync(newKeys.spliterator(), false, 1);
CompletionStage<Map<K, V>> ff = loadAllMapAsync(newKeys.spliterator(), false, 1, threadId);
return ff.thenApply(map -> {
res.putAll(map);
return res;

@ -48,6 +48,7 @@ import org.redisson.RedissonShutdownException;
import org.redisson.Version;
import org.redisson.api.NatMapper;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.cache.LRUCacheMap;
import org.redisson.client.RedisNodeNotFoundException;
import org.redisson.client.codec.Codec;
@ -65,6 +66,9 @@ import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@ -652,4 +656,14 @@ public final class ServiceManager {
return mapResolver;
}
public static final RLock DUMMY_LOCK = (RLock) Proxy.newProxyInstance(ServiceManager.class.getClassLoader(), new Class[] {RLock.class}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().endsWith("lockAsync")) {
return new CompletableFutureWrapper<>((Void) null);
}
return null;
}
});
}

@ -29,6 +29,7 @@ import org.redisson.client.protocol.decoder.MapValueDecoder;
import org.redisson.codec.BaseEventCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ServiceManager;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.iterator.RedissonBaseMapIterator;
import org.redisson.jcache.JMutableEntry.Action;
@ -49,9 +50,6 @@ import javax.cache.integration.*;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.*;
import java.util.concurrent.*;
@ -83,12 +81,6 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
/*
* No locking required in atomic execution mode.
*/
private static final RLock DUMMY_LOCK = (RLock) Proxy.newProxyInstance(JCache.class.getClassLoader(), new Class[] {RLock.class}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return null;
}
});
public JCache(JCacheManager cacheManager, Redisson redisson, String name, JCacheConfiguration<K, V> config, boolean hasOwnRedisson) {
super(redisson.getConfig().getCodec(), redisson.getCommandExecutor(), name);
@ -1186,7 +1178,10 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
RLock getLockedLock(K key) {
if (atomicExecution) {
return DUMMY_LOCK;
/*
* No locking is required in atomic execution mode.
*/
return ServiceManager.DUMMY_LOCK;
}
String lockName = getLockName(key);

@ -484,7 +484,7 @@ public class BaseTransactionalMap<K, V> extends BaseTransactionalObject {
}
// TODO optimize
return map.getAllAsync(new HashSet<>(keyList)).thenApply(res -> {
return ((RedissonMap<K, V>) map).getAllAsync(new HashSet<>(keyList), Long.MIN_VALUE).thenApply(res -> {
for (K key : res.keySet()) {
HashValue keyHash = toKeyHash(key);
operations.add(new MapFastRemoveOperation(map, key, transactionId, threadId));

@ -19,37 +19,31 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonTransactionalLocalCachedMapTest extends RedisDockerTest {
@Test
public void testRemoval() {
Map<String, String> externalStore = new HashMap<>();
externalStore.put("k1", "v1");
// reproducer for https://github.com/redisson/redisson/issues/5198
//@Test
public void test1() {
final LocalCachedMapOptions opts = LocalCachedMapOptions.defaults();
final Map<String, String> externalStore = new HashMap<>();
externalStore.put("hello", "world");
opts.loader(new MapLoader<String, String>() {
@Override
public String load(String key) {
return externalStore.get(key);
}
@Override
public Iterable loadAllKeys() {
return externalStore.keySet();
}
});
org.redisson.api.options.LocalCachedMapOptions<String, String> opts = org.redisson.api.options.LocalCachedMapOptions
.<String, String>name("test").loader(new MapLoader<>() {
@Override
public String load(String key) {
return externalStore.get(key);
}
RLocalCachedMap lcMap = redisson.getLocalCachedMap("lcMap", opts);
@Override
public Iterable<String> loadAllKeys() {
return externalStore.keySet();
}
});
// Uncomment the below line and hang will be avoided
// lcMap.get("hello");
RLocalCachedMap<String, String> lcMap = redisson.getLocalCachedMap(opts);
RTransaction tx = redisson.createTransaction(TransactionOptions.defaults());
RLocalCachedMap txMap = tx.getLocalCachedMap(lcMap);
RLocalCachedMap<String, String> txMap = tx.getLocalCachedMap(lcMap);
// Below line will hang for tx timeout period
txMap.fastRemove("hello");
txMap.fastRemove("k1");
// Commit will fail because tx has timed out
tx.commit();
}

Loading…
Cancel
Save