From 6caac9e71f1332dfc4eae3b805389a0120388df6 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 8 Aug 2022 11:43:50 +0300 Subject: [PATCH] Feature - Add ability to define MapWriterAsync in MapOptions #4472 --- .../java/org/redisson/MapWriteBehindTask.java | 24 ++- .../main/java/org/redisson/RedissonMap.java | 190 +++++++++++------- .../redisson/api/LocalCachedMapOptions.java | 8 +- .../java/org/redisson/api/MapOptions.java | 21 +- .../org/redisson/api/map/MapWriterAsync.java | 36 ++++ .../test/java/org/redisson/BaseMapTest.java | 48 ++++- .../redisson/RedissonLocalCachedMapTest.java | 10 +- .../org/redisson/RedissonMapCacheTest.java | 10 +- .../java/org/redisson/RedissonMapTest.java | 8 + 9 files changed, 270 insertions(+), 85 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/api/map/MapWriterAsync.java diff --git a/redisson/src/main/java/org/redisson/MapWriteBehindTask.java b/redisson/src/main/java/org/redisson/MapWriteBehindTask.java index 19dc09cb5..6650c5e1c 100644 --- a/redisson/src/main/java/org/redisson/MapWriteBehindTask.java +++ b/redisson/src/main/java/org/redisson/MapWriteBehindTask.java @@ -85,7 +85,11 @@ public class MapWriteBehindTask { private void flushTasks(Map addedMap, List deletedKeys) { try { if (!deletedKeys.isEmpty()) { - options.getWriter().delete(deletedKeys); + if (options.getWriter() != null) { + options.getWriter().delete(deletedKeys); + } else { + options.getWriterAsync().delete(deletedKeys).toCompletableFuture().join(); + } deletedKeys.clear(); } } catch (Exception exception) { @@ -93,7 +97,11 @@ public class MapWriteBehindTask { } try { if (!addedMap.isEmpty()) { - options.getWriter().write(addedMap); + if (options.getWriter() != null) { + options.getWriter().write(addedMap); + } else { + options.getWriterAsync().write(addedMap).toCompletableFuture().join(); + } addedMap.clear(); } } catch (Exception exception) { @@ -107,7 +115,11 @@ public class MapWriteBehindTask { try { deletedKeys.add(key); if (deletedKeys.size() == options.getWriteBehindBatchSize()) { - options.getWriter().delete(deletedKeys); + if (options.getWriter() != null) { + options.getWriter().delete(deletedKeys); + } else { + options.getWriterAsync().delete(deletedKeys).toCompletableFuture().join(); + } deletedKeys.clear(); } @@ -120,7 +132,11 @@ public class MapWriteBehindTask { try { addedMap.put(entry.getKey(), entry.getValue()); if (addedMap.size() == options.getWriteBehindBatchSize()) { - options.getWriter().write(addedMap); + if (options.getWriter() != null) { + options.getWriter().write(addedMap); + } else { + options.getWriterAsync().write(addedMap).toCompletableFuture().join(); + } addedMap.clear(); } } catch (Exception exception) { diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 3d5b1e13a..33b5fa730 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -70,7 +70,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { this.options = options; if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND - && options.getWriter() != null) { + && (options.getWriter() != null || options.getWriterAsync() != null)) { this.writeBehindService = writeBehindService; writeBehindTask = writeBehindService.start(name, options); } else { @@ -94,7 +94,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { this.options = options; if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND - && options.getWriter() != null) { + && (options.getWriter() != null || options.getWriterAsync() != null)) { this.writeBehindService = writeBehindService; writeBehindTask = writeBehindService.start(name, options); } else { @@ -726,21 +726,31 @@ public class RedissonMap extends RedissonExpirable implements RMap { CompletionStage f = future.thenCompose(res -> { if (condition.apply(res)) { - CompletableFuture promise = new CompletableFuture<>(); - commandExecutor.getConnectionManager().getExecutor().execute(() -> { - try { - if (task instanceof MapWriterTask.Add) { - options.getWriter().write(task.getMap()); - } else { - options.getWriter().delete(task.getKeys()); + if (options.getWriter() != null) { + CompletableFuture promise = new CompletableFuture<>(); + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + try { + if (task instanceof MapWriterTask.Add) { + options.getWriter().write(task.getMap()); + } else { + options.getWriter().delete(task.getKeys()); + } + } catch (Exception ex) { + promise.completeExceptionally(ex); + return; } - } catch (Exception ex) { - promise.completeExceptionally(ex); - return; - } - promise.complete(res); - }); - return promise; + promise.complete(res); + }); + return promise; + } + + if (task instanceof MapWriterTask.Add) { + return options.getWriterAsync().write(task.getMap()) + .thenApply(r -> res); + } else { + return options.getWriterAsync().delete(task.getKeys()) + .thenApply(r -> res); + } } return CompletableFuture.completedFuture(res); }); @@ -919,7 +929,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { } protected boolean hasNoWriter() { - return options == null || options.getWriter() == null; + return options == null || (options.getWriter() == null && options.getWriterAsync() == null); } protected RFuture putIfAbsentOperationAsync(K key, V value) { @@ -1190,10 +1200,6 @@ public class RedissonMap extends RedissonExpirable implements RMap { throw new IllegalArgumentException("parallelism can't be lower than 1"); } - for (K key : keys) { - checkKey(key); - } - List> futures = new ArrayList<>(); try { Iterator iter = keys.iterator(); @@ -1203,6 +1209,9 @@ public class RedissonMap extends RedissonExpirable implements RMap { } K key = iter.next(); + if (key == null) { + continue; + } if (replaceExistingValues) { CompletableFuture f = loadValue(iter, key, loadedEntires); futures.add(f); @@ -1378,17 +1387,22 @@ public class RedissonMap extends RedissonExpirable implements RMap { return CompletableFuture.completedFuture((long) deletedKeys.size()); } else { - CompletableFuture future = new CompletableFuture<>(); - commandExecutor.getConnectionManager().getExecutor().execute(() -> { - try { - options.getWriter().delete(deletedKeys); - } catch (Exception ex) { - future.completeExceptionally(ex); - return; - } - future.complete((long) deletedKeys.size()); - }); - return future; + if (options.getWriter() != null) { + CompletableFuture future = new CompletableFuture<>(); + commandExecutor.getConnectionManager().getExecutor().execute(() -> { + try { + options.getWriter().delete(deletedKeys); + } catch (Exception ex) { + future.completeExceptionally(ex); + return; + } + future.complete((long) deletedKeys.size()); + }); + return future; + } + + return options.getWriterAsync().delete(deletedKeys) + .thenApply(r -> (long) deletedKeys.size()); } }); return new CompletableFutureWrapper<>(f); @@ -1660,60 +1674,88 @@ public class RedissonMap extends RedissonExpirable implements RMap { } private CompletableFuture loadValue(K key, RLock lock, long threadId) { - CompletableFuture result = new CompletableFuture<>(); - commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { - @Override - public void run() { - V value; - try { - value = options.getLoader().load(key); - if (value == null) { +// if (options.getLoader() != null) { + CompletableFuture result = new CompletableFuture<>(); + commandExecutor.getConnectionManager().getExecutor().execute(new Runnable() { + @Override + public void run() { + V value; + try { + value = options.getLoader().load(key); + if (value == null) { + lock.unlockAsync(threadId) + .whenComplete((r, e) -> { + if (e != null) { + result.completeExceptionally(e); + return; + } + + result.complete(value); + }); + return; + } + } catch (Exception e) { + log.error("Unable to load value by key " + key + " for map " + getRawName(), e); lock.unlockAsync(threadId) - .whenComplete((r, e) -> { - if (e != null) { - result.completeExceptionally(e); + .whenComplete((r, ex) -> { + if (ex != null) { + result.completeExceptionally(ex); return; } - result.complete(value); + result.complete(null); }); return; } - } catch (Exception e) { - log.error("Unable to load value by key " + key + " for map " + getRawName(), e); - lock.unlockAsync(threadId) - .whenComplete((r, ex) -> { - if (ex != null) { - result.completeExceptionally(ex); - return; - } - - result.complete(null); - }); - return; - } - - putOperationAsync(key, value) - .whenComplete((res, e) -> { - if (e != null) { - lock.unlockAsync(threadId); - result.completeExceptionally(e); - return; - } - lock.unlockAsync(threadId) - .whenComplete((r, ex) -> { - if (ex != null) { - result.completeExceptionally(ex); + putOperationAsync(key, value) + .whenComplete((res, e) -> { + if (e != null) { + lock.unlockAsync(threadId); + result.completeExceptionally(e); return; } - result.complete(value); + lock.unlockAsync(threadId) + .whenComplete((r, ex) -> { + if (ex != null) { + result.completeExceptionally(ex); + return; + } + + result.complete(value); + }); }); - }); - } - }); - return result; + } + }); + return result; +// } + +// CompletionStage valueFuture = options.getLoaderAsync().load(key); +// return valueFuture.handle((r, ex) -> { +// if (r == null) { +// return lock.unlockAsync(threadId); +// } +// if (ex != null) { +// log.error("Unable to load value by key " + key + " for map " + getRawName(), ex); +// return lock.unlockAsync(threadId); +// } +// +// return valueFuture; +// }).thenCompose(f -> f) +// .thenCompose(value -> { +// if (value != null) { +// return (CompletionStage) putOperationAsync(key, (V) value).handle((r, ex) -> { +// RFuture f = lock.unlockAsync(threadId); +// if (ex != null) { +// log.error("Unable to store value by key " + key + " for map " + getRawName(), ex); +// return f; +// } +// return f.thenApply(res -> value); +// }).thenCompose(f -> f); +// } +// return CompletableFuture.completedFuture((V) value); +// }).toCompletableFuture(); } final class EntrySet extends AbstractSet> { diff --git a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java index c963f21ec..590e784f2 100644 --- a/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java +++ b/redisson/src/main/java/org/redisson/api/LocalCachedMapOptions.java @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.map.MapLoader; import org.redisson.api.map.MapWriter; + import org.redisson.api.map.MapWriterAsync; /** * Configuration for LocalCachedMap object. @@ -385,7 +386,12 @@ public class LocalCachedMapOptions extends MapOptions { public LocalCachedMapOptions writer(MapWriter writer) { return (LocalCachedMapOptions) super.writer(writer); } - + + @Override + public LocalCachedMapOptions writerAsync(MapWriterAsync writer) { + return (LocalCachedMapOptions) super.writerAsync(writer); + } + @Override public LocalCachedMapOptions writeMode(org.redisson.api.MapOptions.WriteMode writeMode) { return (LocalCachedMapOptions) super.writeMode(writeMode); diff --git a/redisson/src/main/java/org/redisson/api/MapOptions.java b/redisson/src/main/java/org/redisson/api/MapOptions.java index 6c62ca9b2..e8f0a9c16 100644 --- a/redisson/src/main/java/org/redisson/api/MapOptions.java +++ b/redisson/src/main/java/org/redisson/api/MapOptions.java @@ -17,6 +17,7 @@ package org.redisson.api; import org.redisson.api.map.MapLoader; import org.redisson.api.map.MapWriter; +import org.redisson.api.map.MapWriterAsync; /** * Configuration for Map object. @@ -47,6 +48,8 @@ public class MapOptions { private MapLoader loader; private MapWriter writer; + private MapWriterAsync writerAsync; + private WriteMode writeMode = WriteMode.WRITE_THROUGH; private int writeBehindBatchSize = 50; private int writeBehindDelay = 1000; @@ -77,7 +80,7 @@ public class MapOptions { } /** - * Sets {@link MapWriter} object. + * Defines {@link MapWriter} object which is invoked during write operation. * * @param writer object * @return MapOptions instance @@ -89,7 +92,21 @@ public class MapOptions { public MapWriter getWriter() { return writer; } - + + /** + * Defines {@link MapWriterAsync} object which is invoked during write operation. + * + * @param writer object + * @return MapOptions instance + */ + public MapOptions writerAsync(MapWriterAsync writer) { + this.writerAsync = writer; + return this; + } + public MapWriterAsync getWriterAsync() { + return writerAsync; + } + /** * Sets write behind tasks batch size. * All updates accumulated into a batch of specified size and written with {@link MapWriter}. diff --git a/redisson/src/main/java/org/redisson/api/map/MapWriterAsync.java b/redisson/src/main/java/org/redisson/api/map/MapWriterAsync.java new file mode 100644 index 000000000..a4c9b011e --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/map/MapWriterAsync.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2013-2021 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api.map; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +/** + * Asynchronous Map writer used for write-through operations. + * + * @author Nikita Koksharov + * + * @param key type + * @param value type + */ +public interface MapWriterAsync { + + CompletionStage write(Map map); + + CompletionStage delete(Collection keys); + +} diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index d60e5d1e9..e1068273d 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test; import org.redisson.api.*; import org.redisson.api.map.MapLoader; import org.redisson.api.map.MapWriter; + import org.redisson.api.map.MapWriterAsync; import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.IntegerCodec; @@ -18,6 +19,8 @@ import java.io.Serializable; import java.time.Duration; import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; @@ -1150,6 +1153,8 @@ public abstract class BaseMapTest extends BaseTest { protected abstract RMap getWriterTestMap(String name, Map map); protected abstract RMap getWriteBehindTestMap(String name, Map map); + + protected abstract RMap getWriteBehindAsyncTestMap(String name, Map map); protected abstract RMap getLoaderTestMap(String name, Map map); @@ -1201,7 +1206,26 @@ public abstract class BaseMapTest extends BaseTest { destroy(map); } - + @Test + public void testWriteBehindAsyncFastRemove() throws InterruptedException { + Map store = new HashMap<>(); + RMap map = getWriteBehindAsyncTestMap("test", store); + + map.put("1", "11"); + map.put("2", "22"); + map.put("3", "33"); + + Thread.sleep(1400); + + map.fastRemove("1", "2", "4"); + + Map expected = new HashMap<>(); + expected.put("3", "33"); + Thread.sleep(1400); + assertThat(store).isEqualTo(expected); + destroy(map); + } + @Test public void testWriterFastRemove() { Map store = new HashMap<>(); @@ -1419,7 +1443,27 @@ public abstract class BaseMapTest extends BaseTest { } destroy(map); } - + + protected MapWriterAsync createMapWriterAsync(Map map) { + return new MapWriterAsync() { + + @Override + public CompletionStage write(Map values) { + map.putAll(values); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletionStage delete(Collection keys) { + for (K key : keys) { + map.remove(key); + } + return CompletableFuture.completedFuture(null); + } + + }; + } + protected MapWriter createMapWriter(Map map) { return new MapWriter() { diff --git a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java index 441d9c635..19dc34c19 100644 --- a/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLocalCachedMapTest.java @@ -179,7 +179,15 @@ public class RedissonLocalCachedMapTest extends BaseMapTest { .writeMode(WriteMode.WRITE_BEHIND); return redisson.getLocalCachedMap("test", options); } - + + @Override + protected RMap getWriteBehindAsyncTestMap(String name, Map map) { + LocalCachedMapOptions options = LocalCachedMapOptions.defaults() + .writerAsync(createMapWriterAsync(map)) + .writeMode(WriteMode.WRITE_BEHIND); + return redisson.getLocalCachedMap("test", options); + } + @Override protected RMap getLoaderTestMap(String name, Map map) { LocalCachedMapOptions options = LocalCachedMapOptions.defaults().loader(createMapLoader(map)); diff --git a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java index d2b817985..924cf4f10 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapCacheTest.java @@ -121,7 +121,15 @@ public class RedissonMapCacheTest extends BaseMapTest { .writeMode(WriteMode.WRITE_BEHIND); return redisson.getMapCache("test", options); } - + + @Override + protected RMap getWriteBehindAsyncTestMap(String name, Map map) { + MapOptions options = MapOptions.defaults() + .writerAsync(createMapWriterAsync(map)) + .writeMode(WriteMode.WRITE_BEHIND); + return redisson.getMapCache("test", options); + } + @Override protected RMap getLoaderTestMap(String name, Map map) { MapOptions options = MapOptions.defaults().loader(createMapLoader(map)); diff --git a/redisson/src/test/java/org/redisson/RedissonMapTest.java b/redisson/src/test/java/org/redisson/RedissonMapTest.java index 3249261af..5ddc6ec0c 100644 --- a/redisson/src/test/java/org/redisson/RedissonMapTest.java +++ b/redisson/src/test/java/org/redisson/RedissonMapTest.java @@ -46,6 +46,14 @@ public class RedissonMapTest extends BaseMapTest { return redisson.getMap("test", options); } + @Override + protected RMap getWriteBehindAsyncTestMap(String name, Map map) { + MapOptions options = MapOptions.defaults() + .writerAsync(createMapWriterAsync(map)) + .writeMode(WriteMode.WRITE_BEHIND); + return redisson.getMap("test", options); + } + @Test public void testEntrySet() { Map map = redisson.getMap("simple12");