diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 9628f1a08..38436dcd4 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -44,6 +44,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -631,7 +632,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { } protected boolean hasNoLoader() { - return options == null || options.getLoader() == null; + return options == null || (options.getLoader() == null && options.getLoaderAsync() == null); } public RFuture> getAllOperationAsync(Set keys) { @@ -1173,19 +1174,70 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public RFuture loadAllAsync(boolean replaceExistingValues, int parallelism) { - if (options.getLoader() == null) { + if (hasNoLoader()) { throw new NullPointerException("MapLoader isn't defined"); } - return loadAllAsync(options.getLoader().loadAllKeys().spliterator(), replaceExistingValues, parallelism); + if (options.getLoaderAsync() != null) { + return loadAllAsync(options.getLoaderAsync().loadAllKeys(), replaceExistingValues, parallelism); + } + + return loadAllAsync(() -> options.getLoader().loadAllKeys().spliterator(), replaceExistingValues, parallelism); + } + + RFuture loadAllAsync(AsyncIterator iterator, boolean replaceExistingValues, int parallelism) { + CompletionStage> f = loadAllAsync(iterator, new ArrayList<>(), new AtomicInteger(parallelism)); + CompletionStage ff = f.thenCompose(elements -> { + List> futures = new ArrayList<>(elements.size()); + for (K k : elements) { + if (replaceExistingValues) { + CompletableFuture vFuture = loadValue(k, true); + futures.add(vFuture); + } else { + CompletableFuture vFuture = new CompletableFuture<>(); + containsKeyAsync(k, vFuture); + futures.add(vFuture); + } + } + + CompletableFuture finalFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + + if (elements.size() < parallelism) { + return finalFuture; + } + + return finalFuture + .thenCompose(v -> loadAllAsync(iterator, replaceExistingValues, parallelism)); + }); + return new CompletableFutureWrapper<>(ff); + } + + CompletionStage> loadAllAsync(AsyncIterator iterator, List elements, AtomicInteger workers) { + return iterator.hasNext() + .thenCompose(v -> { + int s = workers.decrementAndGet(); + if (v) { + return iterator.next().thenCompose(k -> { + if (k != null) { + elements.add(k); + } + if (s > 0) { + return loadAllAsync(iterator, elements, workers); + } + return CompletableFuture.completedFuture(elements); + }); + } + return CompletableFuture.completedFuture(elements); + }); + } - private RFuture loadAllAsync(Spliterator spliterator, boolean replaceExistingValues, int parallelism) { + private RFuture loadAllAsync(Supplier> supplier, boolean replaceExistingValues, int parallelism) { ForkJoinPool customThreadPool = new ForkJoinPool(parallelism); CompletableFuture result = new CompletableFuture<>(); customThreadPool.submit(() -> { try { - Stream s = StreamSupport.stream(spliterator, true); + Stream s = StreamSupport.stream(supplier.get(), true); List> r = s.filter(k -> k != null) .map(k -> { if (replaceExistingValues) { @@ -1253,7 +1305,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { @Override public RFuture loadAllAsync(Set keys, boolean replaceExistingValues, int parallelism) { - return loadAllAsync((Spliterator) keys.spliterator(), replaceExistingValues, parallelism); + return loadAllAsync(() -> (Spliterator) keys.spliterator(), replaceExistingValues, parallelism); } @Override @@ -1642,7 +1694,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { } private CompletableFuture loadValue(K key, RLock lock, long threadId) { -// if (options.getLoader() != null) { + if (options.getLoader() != null) { CompletableFuture result = new CompletableFuture<>(); commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { @Override @@ -1697,33 +1749,33 @@ public class RedissonMap extends RedissonExpirable implements RMap { } }); return result; -// } - -// CompletionStage valueFuture = options.getLoaderAsync().load(key); -// return valueFuture.handle((r, ex) -> { -// if (r == null) { -// return lock.unlockAsync(threadId); -// } -// if (ex != null) { -// log.error("Unable to load value by key " + key + " for map " + getRawName(), ex); -// return lock.unlockAsync(threadId); -// } -// -// return valueFuture; -// }).thenCompose(f -> f) -// .thenCompose(value -> { -// if (value != null) { -// return (CompletionStage) putOperationAsync(key, (V) value).handle((r, ex) -> { -// RFuture f = lock.unlockAsync(threadId); -// if (ex != null) { -// log.error("Unable to store value by key " + key + " for map " + getRawName(), ex); -// return f; -// } -// return f.thenApply(res -> value); -// }).thenCompose(f -> f); -// } -// return CompletableFuture.completedFuture((V) value); -// }).toCompletableFuture(); + } + + CompletionStage valueFuture = options.getLoaderAsync().load(key); + return valueFuture.handle((r, ex) -> { + if (r == null) { + return lock.unlockAsync(threadId); + } + if (ex != null) { + log.error("Unable to load value by key " + key + " for map " + getRawName(), ex); + return lock.unlockAsync(threadId); + } + + return valueFuture; + }).thenCompose(f -> f) + .thenCompose(value -> { + if (value != null) { + return (CompletionStage) putOperationAsync(key, (V) value).handle((r, ex) -> { + RFuture f = lock.unlockAsync(threadId); + if (ex != null) { + log.error("Unable to store value by key " + key + " for map " + getRawName(), ex); + return f; + } + return f.thenApply(res -> value); + }).thenCompose(f -> f); + } + return CompletableFuture.completedFuture((V) value); + }).toCompletableFuture(); } final class EntrySet extends AbstractSet> { diff --git a/redisson/src/main/java/org/redisson/api/AsyncIterator.java b/redisson/src/main/java/org/redisson/api/AsyncIterator.java new file mode 100644 index 000000000..8e4bab672 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/AsyncIterator.java @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import java.util.concurrent.CompletionStage; + +/** + * Asynchronous iterator + * + * @author Nikita Koksharov + * + * @param value type + */ +public interface AsyncIterator { + + /** + * Returns true if more elements are available. + *

+ * NOTE: each invocation returns a new instance of CompletionStage + * + * @return true if more elements are available, otherwise false + */ + CompletionStage hasNext(); + + /** + * Returns next element or NoSuchElementException if no more elements available. + *

+ * NOTE: each invocation returns a new instance of CompletionStage + * + * @return next element or NoSuchElementException + */ + CompletionStage next(); + +} diff --git a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java index 590e784f2..c2076c2c2 100644 --- a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java +++ b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.concurrent.TimeUnit; import org.redisson.api.map.MapLoader; +import org.redisson.api.map.MapLoaderAsync; import org.redisson.api.map.MapWriter; import org.redisson.api.map.MapWriterAsync; @@ -402,4 +403,8 @@ public class LocalCachedMapOptions extends MapOptions { return (LocalCachedMapOptions) super.loader(loader); } + @Override + public LocalCachedMapOptions loaderAsync(MapLoaderAsync loaderAsync) { + return (LocalCachedMapOptions) super.loaderAsync(loaderAsync); + } } diff --git a/redisson/src/main/java/org/redisson/api/MapOptions.java b/redisson/src/main/java/org/redisson/api/MapOptions.java index e8f0a9c16..916b41a57 100644 --- a/redisson/src/main/java/org/redisson/api/MapOptions.java +++ b/redisson/src/main/java/org/redisson/api/MapOptions.java @@ -16,6 +16,7 @@ package org.redisson.api; import org.redisson.api.map.MapLoader; +import org.redisson.api.map.MapLoaderAsync; import org.redisson.api.map.MapWriter; import org.redisson.api.map.MapWriterAsync; @@ -50,6 +51,8 @@ public class MapOptions { private MapWriter writer; private MapWriterAsync writerAsync; + private MapLoaderAsync loaderAsync; + private WriteMode writeMode = WriteMode.WRITE_THROUGH; private int writeBehindBatchSize = 50; private int writeBehindDelay = 1000; @@ -171,4 +174,17 @@ public class MapOptions { return loader; } + /** + * Sets {@link MapLoaderAsync} object. + * + * @param loaderAsync object + * @return MapOptions instance + */ + public MapOptions loaderAsync(MapLoaderAsync loaderAsync) { + this.loaderAsync = loaderAsync; + return this; + } + public MapLoaderAsync getLoaderAsync() { + return loaderAsync; + } } diff --git a/redisson/src/main/java/org/redisson/api/map/MapLoaderAsync.java b/redisson/src/main/java/org/redisson/api/map/MapLoaderAsync.java new file mode 100644 index 000000000..5d5a145c1 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/MapLoaderAsync.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map; + +import org.redisson.api.AsyncIterator; +import org.redisson.api.RMap; + +import java.util.concurrent.CompletionStage; + +/** + * Map loader used for read-through operations or during {@link RMap#loadAll} execution. + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface MapLoaderAsync { + + /** + * Loads map value by key. + * + * @param key - map key + * @return value or null if value doesn't exists + */ + CompletionStage load(K key); + + /** + * Loads all keys. + * + * @return Iterable object. It's helpful if all keys don't fit in memory. + */ + AsyncIterator loadAllKeys(); + +} diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index 00aa5a852..dbcc58a12 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -5,6 +5,7 @@ import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.redisson.api.*; import org.redisson.api.map.MapLoader; +import org.redisson.api.map.MapLoaderAsync; import org.redisson.api.map.MapWriter; import org.redisson.api.map.MapWriterAsync; import org.redisson.client.codec.Codec; @@ -1156,6 +1157,8 @@ public abstract class BaseMapTest extends BaseTest { protected abstract RMap getLoaderTestMap(String name, Map map); + protected abstract RMap getLoaderAsyncTestMap(String name, Map map); + @Test public void testMapLoaderGetMulipleNulls() { Map cache = new HashMap(); @@ -1204,6 +1207,37 @@ public abstract class BaseMapTest extends BaseTest { destroy(map); } + @Test + public void testLoaderGetAsync() { + Map cache = new HashMap(); + cache.put("1", "11"); + cache.put("2", "22"); + cache.put("3", "33"); + + RMap map = getLoaderAsyncTestMap("test", cache); + + assertThat(map.size()).isEqualTo(0); + assertThat(map.get("1")).isEqualTo("11"); + assertThat(map.size()).isEqualTo(1); + assertThat(map.get("0")).isNull(); + map.put("0", "00"); + assertThat(map.get("0")).isEqualTo("00"); + assertThat(map.size()).isEqualTo(2); + + assertThat(map.containsKey("2")).isTrue(); + assertThat(map.size()).isEqualTo(3); + + Map s = map.getAll(new HashSet<>(Arrays.asList("1", "2", "9", "3"))); + Map expectedMap = new HashMap<>(); + expectedMap.put("1", "11"); + expectedMap.put("2", "22"); + expectedMap.put("3", "33"); + assertThat(s).isEqualTo(expectedMap); + assertThat(map.size()).isEqualTo(4); + destroy(map); + + } + @Test public void testWriteBehindAsyncFastRemove() throws InterruptedException { Map store = new HashMap<>(); @@ -1442,6 +1476,25 @@ public abstract class BaseMapTest extends BaseTest { destroy(map); } + @Test + public void testLoadAllAsync() { + Map cache = new HashMap(); + for (int i = 0; i < 100; i++) { + cache.put("" + i, "" + (i*10 + i)); + } + + RMap map = getLoaderAsyncTestMap("test", cache); + + assertThat(map.size()).isEqualTo(0); + map.loadAll(false, 2); + assertThat(map.size()).isEqualTo(100); + + for (int i = 0; i < 100; i++) { + assertThat(map.containsKey("" + i)).isTrue(); + } + destroy(map); + } + protected MapWriterAsync createMapWriterAsync(Map map) { return new MapWriterAsync() { @@ -1481,7 +1534,34 @@ public abstract class BaseMapTest extends BaseTest { }; } - + + protected MapLoaderAsync createMapLoaderAsync(Map map) { + MapLoaderAsync loaderAsync = new MapLoaderAsync() { + @Override + public CompletionStage load(Object key) { + return CompletableFuture.completedFuture(map.get(key)); + } + + @Override + public AsyncIterator loadAllKeys() { + Iterator iter = map.keySet().iterator(); + return new AsyncIterator() { + + @Override + public CompletionStage hasNext() { + return CompletableFuture.completedFuture(iter.hasNext()); + } + + @Override + public CompletionStage next() { + return CompletableFuture.completedFuture(iter.next()); + } + }; + } + }; + return loaderAsync; + } + protected MapLoader createMapLoader(Map map) { return new MapLoader() { @Override diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index 19dc34c19..2be48631c 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -193,7 +193,13 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { LocalCachedMapOptions options = LocalCachedMapOptions.defaults().loader(createMapLoader(map)); return redisson.getLocalCachedMap(name, options); } - + + @Override + protected RMap getLoaderAsyncTestMap(String name, Map map) { + LocalCachedMapOptions options = LocalCachedMapOptions.defaults().loaderAsync(createMapLoaderAsync(map)); + return redisson.getLocalCachedMap(name, options); + } + @Test public void testBigPutAll() throws InterruptedException { RLocalCachedMap m = redisson.getLocalCachedMap("testValuesWithNearCache2", diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index 924cf4f10..11ec0e663 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -135,7 +135,13 @@ public class RedissonMapCacheTest extends BaseMapTest { MapOptions options = MapOptions.defaults().loader(createMapLoader(map)); return redisson.getMapCache("test", options); } - + + @Override + protected RMap getLoaderAsyncTestMap(String name, Map map) { + MapOptions options = MapOptions.defaults().loaderAsync(createMapLoaderAsync(map)); + return redisson.getMapCache("test", options); + } + @Test public void testSizeInMemory() { Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("4.0.0") > 0); diff --git a/redisson/src/test/java/org/redisson/RedissonMapTest.java b/redisson/src/test/java/org/redisson/RedissonMapTest.java index 5ddc6ec0c..3618617f9 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapTest.java @@ -26,12 +26,18 @@ public class RedissonMapTest extends BaseMapTest { return redisson.getMap(name, codec); } - @Override + @Override protected RMap getLoaderTestMap(String name, Map map) { MapOptions options = MapOptions.defaults().loader(createMapLoader(map)); return redisson.getMap("test", options); } - + + @Override + protected RMap getLoaderAsyncTestMap(String name, Map map) { + MapOptions options = MapOptions.defaults().loaderAsync(createMapLoaderAsync(map)); + return redisson.getMap("test", options); + } + @Override protected RMap getWriterTestMap(String name, Map map) { MapOptions options = MapOptions.defaults().writer(createMapWriter(map));