Fixed - CacheLoader.loadAll() method isn't called by JCache.getAll() method if readThrough=true. #4810

pull/4640/merge
Nikita Koksharov 2 years ago
parent 9b42391166
commit 3dbe5f90f3

@ -399,6 +399,21 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return getAccessTimeout(System.currentTimeMillis());
}
Map<K, V> loadValues(Iterable<? extends K> keys) {
Map<K, V> loaded;
try {
loaded = cacheLoader.loadAll(keys);
} catch (Exception ex) {
throw new CacheLoaderException(ex);
}
if (loaded != null) {
long startTime = currentNanoTime();
putAll(loaded);
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
}
return loaded;
}
V loadValue(K key) {
V value = null;
try {
@ -1035,35 +1050,33 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V>, CacheAs
return;
}
Map<K, V> map = r.entrySet().stream()
int notNullAmount = (int) r.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
cacheManager.getStatBean(this).addHits(map.size());
.count();
cacheManager.getStatBean(this).addHits(notNullAmount);
int nullValues = r.size() - map.size();
int nullValues = r.size() - notNullAmount;
if (config.isReadThrough() && nullValues > 0) {
cacheManager.getStatBean(this).addMisses(nullValues);
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
try {
r.entrySet().stream()
Set<K> nullKeys = r.entrySet().stream()
.filter(e -> e.getValue() == null)
.forEach(entry -> {
V value = loadValue(entry.getKey());
if (value != null) {
map.put(entry.getKey(), value);
}
});
.map(e -> e.getKey())
.collect(Collectors.toSet());
Map<K, V> loadedMap = loadValues(nullKeys);
r.putAll(loadedMap);
} catch (Exception exc) {
result.completeExceptionally(exc);
return;
}
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
result.complete(map);
result.complete(r);
});
} else {
cacheManager.getStatBean(this).addGetTime(currentNanoTime() - startTime);
result.complete(map);
result.complete(r);
}
});
return new CompletableFutureWrapper<>(result);

@ -19,10 +19,7 @@ import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.Caching;
import javax.cache.configuration.Configuration;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.configuration.*;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListenerException;
@ -30,6 +27,8 @@ import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -295,6 +294,58 @@ public class JCacheTest extends BaseTest {
runner.stop();
}
@Test
public void testGetAllCacheLoader() throws Exception {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
URL configUrl = getClass().getResource("redisson-jcache.yaml");
Config cfg = Config.fromYAML(configUrl);
MutableConfiguration<String, String> jcacheConfig = new MutableConfiguration<>();
jcacheConfig.setReadThrough(true);
jcacheConfig.setCacheLoaderFactory(new Factory<CacheLoader<String, String>>() {
@Override
public CacheLoader<String, String> create() {
return new CacheLoader<String, String>() {
@Override
public String load(String key) throws CacheLoaderException {
throw new CacheLoaderException("shouldn't be used");
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
Map<String, String> res = new HashMap<>();
for (String key : keys) {
res.put(key, key+"_loaded");
}
return res;
}
};
}
});
Configuration<String, String> config = RedissonConfiguration.fromConfig(cfg, jcacheConfig);
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager()
.createCache("test", config);
cache.put("1", "2");
cache.put("3", "4");
Map<String, String> entries = cache.getAll(new HashSet<>(Arrays.asList("1", "3", "7", "10")));
Map<String, String> expected = new HashMap<String, String>();
expected.put("1", "2");
expected.put("3", "4");
expected.put("7", "7_loaded");
expected.put("10", "10_loaded");
assertThat(entries).isEqualTo(expected);
cache.close();
runner.stop();
}
@Test
public void testJson() throws InterruptedException, IllegalArgumentException, URISyntaxException, IOException {
RedisProcess runner = new RedisRunner()

Loading…
Cancel
Save