Feature - Add ability to define MapWriterAsync in MapOptions #4472

pull/4497/head
Nikita Koksharov 3 years ago
parent b320f191c6
commit 6caac9e71f

@ -85,7 +85,11 @@ public class MapWriteBehindTask {
private void flushTasks(Map<Object, Object> addedMap, List<Object> deletedKeys) {
try {
if (!deletedKeys.isEmpty()) {
options.getWriter().delete(deletedKeys);
if (options.getWriter() != null) {
options.getWriter().delete(deletedKeys);
} else {
options.getWriterAsync().delete(deletedKeys).toCompletableFuture().join();
}
deletedKeys.clear();
}
} catch (Exception exception) {
@ -93,7 +97,11 @@ public class MapWriteBehindTask {
}
try {
if (!addedMap.isEmpty()) {
options.getWriter().write(addedMap);
if (options.getWriter() != null) {
options.getWriter().write(addedMap);
} else {
options.getWriterAsync().write(addedMap).toCompletableFuture().join();
}
addedMap.clear();
}
} catch (Exception exception) {
@ -107,7 +115,11 @@ public class MapWriteBehindTask {
try {
deletedKeys.add(key);
if (deletedKeys.size() == options.getWriteBehindBatchSize()) {
options.getWriter().delete(deletedKeys);
if (options.getWriter() != null) {
options.getWriter().delete(deletedKeys);
} else {
options.getWriterAsync().delete(deletedKeys).toCompletableFuture().join();
}
deletedKeys.clear();
}
@ -120,7 +132,11 @@ public class MapWriteBehindTask {
try {
addedMap.put(entry.getKey(), entry.getValue());
if (addedMap.size() == options.getWriteBehindBatchSize()) {
options.getWriter().write(addedMap);
if (options.getWriter() != null) {
options.getWriter().write(addedMap);
} else {
options.getWriterAsync().write(addedMap).toCompletableFuture().join();
}
addedMap.clear();
}
} catch (Exception exception) {

@ -70,7 +70,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
this.options = options;
if (options != null
&& options.getWriteMode() == WriteMode.WRITE_BEHIND
&& options.getWriter() != null) {
&& (options.getWriter() != null || options.getWriterAsync() != null)) {
this.writeBehindService = writeBehindService;
writeBehindTask = writeBehindService.start(name, options);
} else {
@ -94,7 +94,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
this.options = options;
if (options != null
&& options.getWriteMode() == WriteMode.WRITE_BEHIND
&& options.getWriter() != null) {
&& (options.getWriter() != null || options.getWriterAsync() != null)) {
this.writeBehindService = writeBehindService;
writeBehindTask = writeBehindService.start(name, options);
} else {
@ -726,21 +726,31 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
CompletionStage<M> f = future.thenCompose(res -> {
if (condition.apply(res)) {
CompletableFuture<M> promise = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
try {
if (task instanceof MapWriterTask.Add) {
options.getWriter().write(task.getMap());
} else {
options.getWriter().delete(task.getKeys());
if (options.getWriter() != null) {
CompletableFuture<M> promise = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
try {
if (task instanceof MapWriterTask.Add) {
options.getWriter().write(task.getMap());
} else {
options.getWriter().delete(task.getKeys());
}
} catch (Exception ex) {
promise.completeExceptionally(ex);
return;
}
} catch (Exception ex) {
promise.completeExceptionally(ex);
return;
}
promise.complete(res);
});
return promise;
promise.complete(res);
});
return promise;
}
if (task instanceof MapWriterTask.Add) {
return options.getWriterAsync().write(task.getMap())
.thenApply(r -> res);
} else {
return options.getWriterAsync().delete(task.getKeys())
.thenApply(r -> res);
}
}
return CompletableFuture.completedFuture(res);
});
@ -919,7 +929,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
protected boolean hasNoWriter() {
return options == null || options.getWriter() == null;
return options == null || (options.getWriter() == null && options.getWriterAsync() == null);
}
protected RFuture<V> putIfAbsentOperationAsync(K key, V value) {
@ -1190,10 +1200,6 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
throw new IllegalArgumentException("parallelism can't be lower than 1");
}
for (K key : keys) {
checkKey(key);
}
List<CompletableFuture<?>> futures = new ArrayList<>();
try {
Iterator<? extends K> iter = keys.iterator();
@ -1203,6 +1209,9 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
K key = iter.next();
if (key == null) {
continue;
}
if (replaceExistingValues) {
CompletableFuture<Void> f = loadValue(iter, key, loadedEntires);
futures.add(f);
@ -1378,17 +1387,22 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return CompletableFuture.completedFuture((long) deletedKeys.size());
} else {
CompletableFuture<Long> future = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
try {
options.getWriter().delete(deletedKeys);
} catch (Exception ex) {
future.completeExceptionally(ex);
return;
}
future.complete((long) deletedKeys.size());
});
return future;
if (options.getWriter() != null) {
CompletableFuture<Long> future = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
try {
options.getWriter().delete(deletedKeys);
} catch (Exception ex) {
future.completeExceptionally(ex);
return;
}
future.complete((long) deletedKeys.size());
});
return future;
}
return options.getWriterAsync().delete(deletedKeys)
.thenApply(r -> (long) deletedKeys.size());
}
});
return new CompletableFutureWrapper<>(f);
@ -1660,60 +1674,88 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
private CompletableFuture<V> loadValue(K key, RLock lock, long threadId) {
CompletableFuture<V> result = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
V value;
try {
value = options.getLoader().load(key);
if (value == null) {
// if (options.getLoader() != null) {
CompletableFuture<V> result = new CompletableFuture<>();
commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() {
@Override
public void run() {
V value;
try {
value = options.getLoader().load(key);
if (value == null) {
lock.unlockAsync(threadId)
.whenComplete((r, e) -> {
if (e != null) {
result.completeExceptionally(e);
return;
}
result.complete(value);
});
return;
}
} catch (Exception e) {
log.error("Unable to load value by key " + key + " for map " + getRawName(), e);
lock.unlockAsync(threadId)
.whenComplete((r, e) -> {
if (e != null) {
result.completeExceptionally(e);
.whenComplete((r, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
return;
}
result.complete(value);
result.complete(null);
});
return;
}
} catch (Exception e) {
log.error("Unable to load value by key " + key + " for map " + getRawName(), e);
lock.unlockAsync(threadId)
.whenComplete((r, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
return;
}
result.complete(null);
});
return;
}
putOperationAsync(key, value)
.whenComplete((res, e) -> {
if (e != null) {
lock.unlockAsync(threadId);
result.completeExceptionally(e);
return;
}
lock.unlockAsync(threadId)
.whenComplete((r, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
putOperationAsync(key, value)
.whenComplete((res, e) -> {
if (e != null) {
lock.unlockAsync(threadId);
result.completeExceptionally(e);
return;
}
result.complete(value);
lock.unlockAsync(threadId)
.whenComplete((r, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
return;
}
result.complete(value);
});
});
});
}
});
return result;
}
});
return result;
// }
// CompletionStage<V> valueFuture = options.getLoaderAsync().load(key);
// return valueFuture.handle((r, ex) -> {
// if (r == null) {
// return lock.unlockAsync(threadId);
// }
// if (ex != null) {
// log.error("Unable to load value by key " + key + " for map " + getRawName(), ex);
// return lock.unlockAsync(threadId);
// }
//
// return valueFuture;
// }).thenCompose(f -> f)
// .thenCompose(value -> {
// if (value != null) {
// return (CompletionStage<V>) putOperationAsync(key, (V) value).handle((r, ex) -> {
// RFuture<Void> f = lock.unlockAsync(threadId);
// if (ex != null) {
// log.error("Unable to store value by key " + key + " for map " + getRawName(), ex);
// return f;
// }
// return f.thenApply(res -> value);
// }).thenCompose(f -> f);
// }
// return CompletableFuture.completedFuture((V) value);
// }).toCompletableFuture();
}
final class EntrySet extends AbstractSet<Map.Entry<K, V>> {

@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.MapWriterAsync;
/**
* Configuration for LocalCachedMap object.
@ -386,6 +387,11 @@ public class LocalCachedMapOptions<K, V> extends MapOptions<K, V> {
return (LocalCachedMapOptions<K, V>) super.writer(writer);
}
@Override
public LocalCachedMapOptions<K, V> writerAsync(MapWriterAsync<K, V> writer) {
return (LocalCachedMapOptions<K, V>) super.writerAsync(writer);
}
@Override
public LocalCachedMapOptions<K, V> writeMode(org.redisson.api.MapOptions.WriteMode writeMode) {
return (LocalCachedMapOptions<K, V>) super.writeMode(writeMode);

@ -17,6 +17,7 @@ package org.redisson.api;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.MapWriterAsync;
/**
* Configuration for Map object.
@ -47,6 +48,8 @@ public class MapOptions<K, V> {
private MapLoader<K, V> loader;
private MapWriter<K, V> writer;
private MapWriterAsync<K, V> writerAsync;
private WriteMode writeMode = WriteMode.WRITE_THROUGH;
private int writeBehindBatchSize = 50;
private int writeBehindDelay = 1000;
@ -77,7 +80,7 @@ public class MapOptions<K, V> {
}
/**
* Sets {@link MapWriter} object.
* Defines {@link MapWriter} object which is invoked during write operation.
*
* @param writer object
* @return MapOptions instance
@ -90,6 +93,20 @@ public class MapOptions<K, V> {
return writer;
}
/**
* Defines {@link MapWriterAsync} object which is invoked during write operation.
*
* @param writer object
* @return MapOptions instance
*/
public MapOptions<K, V> writerAsync(MapWriterAsync<K, V> writer) {
this.writerAsync = writer;
return this;
}
public MapWriterAsync<K, V> getWriterAsync() {
return writerAsync;
}
/**
* Sets write behind tasks batch size.
* All updates accumulated into a batch of specified size and written with {@link MapWriter}.

@ -0,0 +1,36 @@
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.map;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletionStage;
/**
* Asynchronous Map writer used for write-through operations.
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface MapWriterAsync<K, V> {
CompletionStage<Void> write(Map<K, V> map);
CompletionStage<Void> delete(Collection<K> keys);
}

@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test;
import org.redisson.api.*;
import org.redisson.api.map.MapLoader;
import org.redisson.api.map.MapWriter;
import org.redisson.api.map.MapWriterAsync;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.IntegerCodec;
@ -18,6 +19,8 @@ import java.io.Serializable;
import java.time.Duration;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
@ -1151,6 +1154,8 @@ public abstract class BaseMapTest extends BaseTest {
protected abstract <K, V> RMap<K, V> getWriteBehindTestMap(String name, Map<K, V> map);
protected abstract <K, V> RMap<K, V> getWriteBehindAsyncTestMap(String name, Map<K, V> map);
protected abstract <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map);
@Test
@ -1201,6 +1206,25 @@ public abstract class BaseMapTest extends BaseTest {
destroy(map);
}
@Test
public void testWriteBehindAsyncFastRemove() throws InterruptedException {
Map<String, String> store = new HashMap<>();
RMap<String, String> map = getWriteBehindAsyncTestMap("test", store);
map.put("1", "11");
map.put("2", "22");
map.put("3", "33");
Thread.sleep(1400);
map.fastRemove("1", "2", "4");
Map<String, String> expected = new HashMap<>();
expected.put("3", "33");
Thread.sleep(1400);
assertThat(store).isEqualTo(expected);
destroy(map);
}
@Test
public void testWriterFastRemove() {
@ -1420,6 +1444,26 @@ public abstract class BaseMapTest extends BaseTest {
destroy(map);
}
protected <K, V> MapWriterAsync<K, V> createMapWriterAsync(Map<K, V> map) {
return new MapWriterAsync<K, V>() {
@Override
public CompletionStage<Void> write(Map<K, V> values) {
map.putAll(values);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletionStage<Void> delete(Collection<K> keys) {
for (K key : keys) {
map.remove(key);
}
return CompletableFuture.completedFuture(null);
}
};
}
protected <K, V> MapWriter<K, V> createMapWriter(Map<K, V> map) {
return new MapWriter<K, V>() {

@ -180,6 +180,14 @@ public class RedissonLocalCachedMapTest extends BaseMapTest {
return redisson.getLocalCachedMap("test", options);
}
@Override
protected <K, V> RMap<K, V> getWriteBehindAsyncTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults()
.writerAsync(createMapWriterAsync(map))
.writeMode(WriteMode.WRITE_BEHIND);
return redisson.getLocalCachedMap("test", options);
}
@Override
protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {
LocalCachedMapOptions<K, V> options = LocalCachedMapOptions.<K, V>defaults().loader(createMapLoader(map));

@ -122,6 +122,14 @@ public class RedissonMapCacheTest extends BaseMapTest {
return redisson.getMapCache("test", options);
}
@Override
protected <K, V> RMap<K, V> getWriteBehindAsyncTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.writerAsync(createMapWriterAsync(map))
.writeMode(WriteMode.WRITE_BEHIND);
return redisson.getMapCache("test", options);
}
@Override
protected <K, V> RMap<K, V> getLoaderTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults().loader(createMapLoader(map));

@ -46,6 +46,14 @@ public class RedissonMapTest extends BaseMapTest {
return redisson.getMap("test", options);
}
@Override
protected <K, V> RMap<K, V> getWriteBehindAsyncTestMap(String name, Map<K, V> map) {
MapOptions<K, V> options = MapOptions.<K, V>defaults()
.writerAsync(createMapWriterAsync(map))
.writeMode(WriteMode.WRITE_BEHIND);
return redisson.getMap("test", options);
}
@Test
public void testEntrySet() {
Map<Integer, String> map = redisson.getMap("simple12");

Loading…
Cancel
Save