Merge branch 'master' of github.com:amillalen/redisson into remove_includes_for_full_configuration

pull/4498/head
Alvaro Millalen 2 years ago
commit 47398b1e1d

@ -44,6 +44,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@ -631,7 +632,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
protected boolean hasNoLoader() {
return options == null || options.getLoader() == null;
return options == null || (options.getLoader() == null && options.getLoaderAsync() == null);
}
public RFuture<Map<K, V>> getAllOperationAsync(Set<K> keys) {
@ -1173,19 +1174,70 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Void> loadAllAsync(boolean replaceExistingValues, int parallelism) {
if (options.getLoader() == null) {
if (hasNoLoader()) {
throw new NullPointerException("MapLoader isn't defined");
}
return loadAllAsync(options.getLoader().loadAllKeys().spliterator(), replaceExistingValues, parallelism);
if (options.getLoaderAsync() != null) {
return loadAllAsync(options.getLoaderAsync().loadAllKeys(), replaceExistingValues, parallelism);
}
return loadAllAsync(() -> options.getLoader().loadAllKeys().spliterator(), replaceExistingValues, parallelism);
}
RFuture<Void> loadAllAsync(AsyncIterator<K> iterator, boolean replaceExistingValues, int parallelism) {
CompletionStage<List<K>> f = loadAllAsync(iterator, new ArrayList<>(), new AtomicInteger(parallelism));
CompletionStage<Void> ff = f.thenCompose(elements -> {
List<CompletableFuture<V>> futures = new ArrayList<>(elements.size());
for (K k : elements) {
if (replaceExistingValues) {
CompletableFuture<V> vFuture = loadValue(k, true);
futures.add(vFuture);
} else {
CompletableFuture<V> vFuture = new CompletableFuture<>();
containsKeyAsync(k, vFuture);
futures.add(vFuture);
}
}
CompletableFuture<Void> finalFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
if (elements.size() < parallelism) {
return finalFuture;
}
return finalFuture
.thenCompose(v -> loadAllAsync(iterator, replaceExistingValues, parallelism));
});
return new CompletableFutureWrapper<>(ff);
}
CompletionStage<List<K>> loadAllAsync(AsyncIterator<K> iterator, List<K> elements, AtomicInteger workers) {
return iterator.hasNext()
.thenCompose(v -> {
int s = workers.decrementAndGet();
if (v) {
return iterator.next().thenCompose(k -> {
if (k != null) {
elements.add(k);
}
if (s > 0) {
return loadAllAsync(iterator, elements, workers);
}
return CompletableFuture.completedFuture(elements);
});
}
return CompletableFuture.completedFuture(elements);
});
}
private RFuture<Void> loadAllAsync(Spliterator<K> spliterator, boolean replaceExistingValues, int parallelism) {
private RFuture<Void> loadAllAsync(Supplier<Spliterator<K>> supplier, boolean replaceExistingValues, int parallelism) {
ForkJoinPool customThreadPool = new ForkJoinPool(parallelism);
CompletableFuture<Void> result = new CompletableFuture<>();
customThreadPool.submit(() -> {
try {
Stream<K> s = StreamSupport.stream(spliterator, true);
Stream<K> s = StreamSupport.stream(supplier.get(), true);
List<CompletableFuture<?>> r = s.filter(k -> k != null)
.map(k -> {
if (replaceExistingValues) {
@ -1253,7 +1305,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Void> loadAllAsync(Set<? extends K> keys, boolean replaceExistingValues, int parallelism) {
return loadAllAsync((Spliterator<K>) keys.spliterator(), replaceExistingValues, parallelism);
return loadAllAsync(() -> (Spliterator<K>) keys.spliterator(), replaceExistingValues, parallelism);
}
@Override
@ -1642,7 +1694,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
private CompletableFuture<V> loadValue(K key, RLock lock, long threadId) {
// if (options.getLoader() != null) {
if (options.getLoader() != null) {
CompletableFuture<V> result = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
@ -1697,33 +1749,33 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
});
return result;
// }
// CompletionStage<V> valueFuture = options.getLoaderAsync().load(key);
// return valueFuture.handle((r, ex) -> {
// if (r == null) {
// return lock.unlockAsync(threadId);
// }
// if (ex != null) {
// log.error("Unable to load value by key " + key + " for map " + getRawName(), ex);
// return lock.unlockAsync(threadId);
// }
//
// return valueFuture;
// }).thenCompose(f -> f)
// .thenCompose(value -> {
// if (value != null) {
// return (CompletionStage<V>) putOperationAsync(key, (V) value).handle((r, ex) -> {
// RFuture<Void> f = lock.unlockAsync(threadId);
// if (ex != null) {
// log.error("Unable to store value by key " + key + " for map " + getRawName(), ex);
// return f;
// }
// return f.thenApply(res -> value);
// }).thenCompose(f -> f);
// }
// return CompletableFuture.completedFuture((V) value);
// }).toCompletableFuture();
}
CompletionStage<V> valueFuture = options.getLoaderAsync().load(key);
return valueFuture.handle((r, ex) -> {
if (r == null) {
return lock.unlockAsync(threadId);
}
if (ex != null) {
log.error("Unable to load value by key " + key + " for map " + getRawName(), ex);
return lock.unlockAsync(threadId);
}
return valueFuture;
}).thenCompose(f -> f)
.thenCompose(value -> {
if (value != null) {
return (CompletionStage<V>) putOperationAsync(key, (V) value).handle((r, ex) -> {
RFuture<Void> f = lock.unlockAsync(threadId);
if (ex != null) {
log.error("Unable to store value by key " + key + " for map " + getRawName(), ex);
return f;
}
return f.thenApply(res -> value);
}).thenCompose(f -> f);
}
return CompletableFuture.completedFuture((V) value);
}).toCompletableFuture();
}
final class EntrySet extends AbstractSet<Map.Entry<K, V>> {

@ -0,0 +1,47 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.concurrent.CompletionStage;
/**
* Asynchronous iterator
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public interface AsyncIterator<V> {
/**
* Returns <code>true</code> if more elements are available.
* <p>
* NOTE: each invocation returns a new instance of CompletionStage
*
* @return <code>true</code> if more elements are available, otherwise <code>false</code>
*/
CompletionStage<Boolean> hasNext();
/**
* Returns next element or NoSuchElementException if no more elements available.
* <p>
* NOTE: each invocation returns a new instance of CompletionStage
*
* @return next element or NoSuchElementException
*/
CompletionStage<V> next();
}

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.concurrent.TimeUnit;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapLoaderAsync;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.MapWriterAsync;
@ -402,4 +403,8 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
return (LocalCachedMapOptions<K, V>) super.loader(loader);
}
@Override
public LocalCachedMapOptions<K, V> loaderAsync(MapLoaderAsync<K, V> loaderAsync) {
return (LocalCachedMapOptions<K, V>) super.loaderAsync(loaderAsync);
}
}

@ -16,6 +16,7 @@
package org.redisson.api;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapLoaderAsync;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.MapWriterAsync;
@ -50,6 +51,8 @@ public class MapOptions<K, V> {
private MapWriter<K, V> writer;
private MapWriterAsync<K, V> writerAsync;
private MapLoaderAsync<K, V> loaderAsync;
private WriteMode writeMode = WriteMode.WRITE_THROUGH;
private int writeBehindBatchSize = 50;
private int writeBehindDelay = 1000;
@ -171,4 +174,17 @@ public class MapOptions<K, V> {
return loader;
}
/**
* Sets {@link MapLoaderAsync} object.
*
* @param loaderAsync object
* @return MapOptions instance
*/
public MapOptions<K, V> loaderAsync(MapLoaderAsync<K, V> loaderAsync) {
this.loaderAsync = loaderAsync;
return this;
}
public MapLoaderAsync<K, V> getLoaderAsync() {
return loaderAsync;
}
}

@ -0,0 +1,48 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.map;
import org.redisson.api.AsyncIterator;
import org.redisson.api.RMap;
import java.util.concurrent.CompletionStage;
/**
* Map loader used for read-through operations or during {@link RMap#loadAll} execution.
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface MapLoaderAsync<K, V> {
/**
* Loads map value by key.
*
* @param key - map key
* @return value or <code>null</code> if value doesn't exists
*/
CompletionStage<V> load(K key);
/**
* Loads all keys.
*
* @return Iterable object. It's helpful if all keys don't fit in memory.
*/
AsyncIterator<K> loadAllKeys();
}

@ -5,6 +5,7 @@ import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.redisson.api.*;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapLoaderAsync;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.MapWriterAsync;
import org.redisson.client.codec.Codec;
@ -1156,6 +1157,8 @@ public abstract class BaseMapTest extends BaseTest {
protected abstract <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map);
protected abstract <K, V> RMap<K, V> getLoaderAsyncTestMap(String name, Map<K, V> map);
@Test
public void testMapLoaderGetMulipleNulls() {
Map<String, String> cache = new HashMap<String, String>();
@ -1204,6 +1207,37 @@ public abstract class BaseMapTest extends BaseTest {
destroy(map);
}
@Test
public void testLoaderGetAsync() {
Map<String, String> cache = new HashMap<String, String>();
cache.put("1", "11");
cache.put("2", "22");
cache.put("3", "33");
RMap<String, String> map = getLoaderAsyncTestMap("test", cache);
assertThat(map.size()).isEqualTo(0);
assertThat(map.get("1")).isEqualTo("11");
assertThat(map.size()).isEqualTo(1);
assertThat(map.get("0")).isNull();
map.put("0", "00");
assertThat(map.get("0")).isEqualTo("00");
assertThat(map.size()).isEqualTo(2);
assertThat(map.containsKey("2")).isTrue();
assertThat(map.size()).isEqualTo(3);
Map<String, String> s = map.getAll(new HashSet<>(Arrays.asList("1", "2", "9", "3")));
Map<String, String> expectedMap = new HashMap<>();
expectedMap.put("1", "11");
expectedMap.put("2", "22");
expectedMap.put("3", "33");
assertThat(s).isEqualTo(expectedMap);
assertThat(map.size()).isEqualTo(4);
destroy(map);
}
@Test
public void testWriteBehindAsyncFastRemove() throws InterruptedException {
Map<String, String> store = new HashMap<>();
@ -1442,6 +1476,25 @@ public abstract class BaseMapTest extends BaseTest {
destroy(map);
}
@Test
public void testLoadAllAsync() {
Map<String, String> cache = new HashMap<String, String>();
for (int i = 0; i < 100; i++) {
cache.put("" + i, "" + (i*10 + i));
}
RMap<String, String> map = getLoaderAsyncTestMap("test", cache);
assertThat(map.size()).isEqualTo(0);
map.loadAll(false, 2);
assertThat(map.size()).isEqualTo(100);
for (int i = 0; i < 100; i++) {
assertThat(map.containsKey("" + i)).isTrue();
}
destroy(map);
}
protected <K, V> MapWriterAsync<K, V> createMapWriterAsync(Map<K, V> map) {
return new MapWriterAsync<K, V>() {
@ -1481,7 +1534,34 @@ public abstract class BaseMapTest extends BaseTest {
};
}
protected <K, V> MapLoaderAsync<K, V> createMapLoaderAsync(Map<K, V> map) {
MapLoaderAsync<K, V> loaderAsync = new MapLoaderAsync<K, V>() {
@Override
public CompletionStage<V> load(Object key) {
return CompletableFuture.completedFuture(map.get(key));
}
@Override
public AsyncIterator<K> loadAllKeys() {
Iterator<K> iter = map.keySet().iterator();
return new AsyncIterator<K>() {
@Override
public CompletionStage<Boolean> hasNext() {
return CompletableFuture.completedFuture(iter.hasNext());
}
@Override
public CompletionStage<K> next() {
return CompletableFuture.completedFuture(iter.next());
}
};
}
};
return loaderAsync;
}
protected <K, V> MapLoader<K, V> createMapLoader(Map<K, V> map) {
return new MapLoader<K, V>() {
@Override

@ -193,7 +193,13 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().loader(createMapLoader(map));
return redisson.getLocalCachedMap(name, options);
}
@Override
protected <K, V> RMap<K, V> getLoaderAsyncTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().loaderAsync(createMapLoaderAsync(map));
return redisson.getLocalCachedMap(name, options);
}
@Test
public void testBigPutAll() throws InterruptedException {
RLocalCachedMap<Object, Object> m = redisson.getLocalCachedMap("testValuesWithNearCache2",

@ -135,7 +135,13 @@ public class RedissonMapCacheTest extends BaseMapTest {
MapOptions<K, V> options = MapOptions.<K, V>defaults().loader(createMapLoader(map));
return redisson.getMapCache("test", options);
}
@Override
protected <K, V> RMap<K, V> getLoaderAsyncTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().loaderAsync(createMapLoaderAsync(map));
return redisson.getMapCache("test", options);
}
@Test
public void testSizeInMemory() {
Assumptions.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("4.0.0") > 0);

@ -26,12 +26,18 @@ public class RedissonMapTest extends BaseMapTest {
return redisson.getMap(name, codec);
}
@Override
@Override
protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().loader(createMapLoader(map));
return redisson.getMap("test", options);
}
@Override
protected <K, V> RMap<K, V> getLoaderAsyncTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().loaderAsync(createMapLoaderAsync(map));
return redisson.getMap("test", options);
}
@Override
protected <K, V> RMap<K, V> getWriterTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(createMapWriter(map));

Loading…
Cancel
Save