Fixed - Spring Cache @Cacheable(sync) doesn't work with reactive types or completableFuture. #6256

pull/6258/head
Nikita Koksharov 5 months ago
parent e3534349b1
commit 595eae2017

@ -176,13 +176,14 @@ public class RedissonCache implements Cache {
} }
public <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) { public <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) {
long threadId = Thread.currentThread().getId();
return retrieve(key).thenCompose(v -> { return retrieve(key).thenCompose(v -> {
if (v != null) { if (v != null) {
return CompletableFuture.completedFuture((T) v); return CompletableFuture.completedFuture((T) v);
} }
RLock lock = map.getLock(key); RLock lock = map.getLock(key);
return lock.lockAsync().thenCompose(rr -> { return lock.lockAsync(threadId).thenCompose(rr -> {
return map.getAsync(key) return map.getAsync(key)
.thenCompose(r -> { .thenCompose(r -> {
if (r != null) { if (r != null) {
@ -204,7 +205,9 @@ public class RedissonCache implements Cache {
}); });
}); });
}) })
.whenComplete((r1, e) -> lock.unlockAsync()); .whenComplete((r1, e) -> {
lock.unlockAsync(threadId);
});
}); });
}); });
} }

@ -18,13 +18,18 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import reactor.core.publisher.Mono;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -59,6 +64,13 @@ public class RedissonSpringCacheTest extends RedisDockerTest {
@Service @Service
public static class SampleBean { public static class SampleBean {
@Cacheable(cacheNames = "monoCache", sync = true)
public Mono<String> readMono() {
return Mono.delay(Duration.ofSeconds(1))
.map(i -> "Hello world")
.log();
}
@CachePut(cacheNames = "testMap", key = "#p0") @CachePut(cacheNames = "testMap", key = "#p0")
public SampleObject store(String key, SampleObject object) { public SampleObject store(String key, SampleObject object) {
return object; return object;
@ -170,6 +182,28 @@ public class RedissonSpringCacheTest extends RedisDockerTest {
assertThat(s.getValue()).isEqualTo("value1"); assertThat(s.getValue()).isEqualTo("value1");
} }
@ParameterizedTest
@MethodSource("data")
public void testMono(Class<?> contextClass) throws InterruptedException {
AnnotationConfigApplicationContext context = contexts.get(contextClass);
SampleBean bean = context.getBean(SampleBean.class);
CacheManager cm = context.getBean(CacheManager.class);
System.out.println(cm);
ExecutorService e = Executors.newFixedThreadPool(2);
for (int t = 0; t < 2; t++) {
e.submit(() -> {
for (int i = 0; i < 5; i++) {
Mono<String> m = bean.readMono();
m.block();
}
});
}
e.shutdown();
assertThat(e.awaitTermination(3, TimeUnit.SECONDS)).isTrue();
}
@ParameterizedTest @ParameterizedTest
@MethodSource("data") @MethodSource("data")
public void testGet(Class<?> contextClass) { public void testGet(Class<?> contextClass) {

Loading…
Cancel
Save