From 44ea60a242c2f473391e9c2570f22dbaccec4455 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 30 Oct 2018 13:59:06 +0300 Subject: [PATCH] Feature - added RMap#putAllAsync() method with batch size #1662 --- .../main/java/org/redisson/RedissonMap.java | 67 ++++++++++++++++++- .../src/main/java/org/redisson/api/RMap.java | 12 ++++ .../main/java/org/redisson/api/RMapAsync.java | 13 ++++ .../test/java/org/redisson/BaseMapTest.java | 40 +++++++++++ 4 files changed, 129 insertions(+), 3 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index d220c23d1..18b6f4bd9 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -264,12 +265,72 @@ public class RedissonMap extends RedissonExpirable implements RMap { } @Override - public void putAll(Map map) { + public final void putAll(Map map) { get(putAllAsync(map)); } @Override - public RFuture putAllAsync(final Map map) { + public void putAll(Map map, int batchSize) { + get(putAllAsync(map, batchSize)); + } + + @Override + public RFuture putAllAsync(Map map, int batchSize) { + Map batch = new HashMap(); + AtomicInteger counter = new AtomicInteger(); + Iterator> iter = ((Map)map).entrySet().iterator(); + + RPromise promise = new RedissonPromise(); + putAllAsync(batch, iter, counter, batchSize, promise); + return promise; + } + + private void putAllAsync(final Map batch, final Iterator> iter, + final AtomicInteger counter, final int batchSize, final RPromise promise) { + batch.clear(); + + while (iter.hasNext()) { + Entry entry = iter.next(); + batch.put(entry.getKey(), entry.getValue()); + counter.incrementAndGet(); + if (counter.get() % batchSize == 0) { + RFuture future = putAllAsync(batch); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + putAllAsync(batch, iter, counter, batchSize, promise); + } + }); + return; + } + } + + if (batch.isEmpty()) { + promise.trySuccess(null); + return; + } + + RFuture future = putAllAsync(batch); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.tryFailure(future.cause()); + return; + } + + promise.trySuccess(null); + } + }); + } + + @Override + public final RFuture putAllAsync(final Map map) { if (map.isEmpty()) { return RedissonPromise.newSucceededFuture(null); } @@ -288,7 +349,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { return mapWriterFuture(future, listener); } - protected RFuture mapWriterFuture(RFuture future, final MapWriterTask listener) { + protected final RFuture mapWriterFuture(RFuture future, final MapWriterTask listener) { if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) { future.addListener(new MapWriteBehindListener(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads())); return future; diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index 1fd0647d9..75b48702b 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -202,6 +202,18 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsync map); + /** + * Associates the specified value with the specified key + * in batch. Batch inserted by chunks limited by batchSize amount + * to avoid OOM and/or Redis response timeout error for map with big size. + *

+ * If {@link MapWriter} is defined then new map entries are stored in write-through mode. + * + * @param map mappings to be stored in this map + * @param batchSize - map chunk size + */ + void putAll(Map map, int batchSize); + /** * Gets a map slice contained the mappings with defined keys * by one operation. diff --git a/redisson/src/main/java/org/redisson/api/RMapAsync.java b/redisson/src/main/java/org/redisson/api/RMapAsync.java index 82a325543..02a747014 100644 --- a/redisson/src/main/java/org/redisson/api/RMapAsync.java +++ b/redisson/src/main/java/org/redisson/api/RMapAsync.java @@ -88,6 +88,19 @@ public interface RMapAsync extends RExpirableAsync { */ RFuture putAllAsync(Map map); + /** + * Associates the specified value with the specified key + * in batch. Batch inserted by chunks limited by batchSize amount + * to avoid OOM and/or Redis response timeout error for map with big size. + *

+ * If {@link MapWriter} is defined then new map entries are stored in write-through mode. + * + * @param map mappings to be stored in this map + * @param batchSize - map chunk size + * @return void + */ + RFuture putAllAsync(Map map, int batchSize); + /** * Atomically adds the given delta to the current value * by mapped key. diff --git a/redisson/src/test/java/org/redisson/BaseMapTest.java b/redisson/src/test/java/org/redisson/BaseMapTest.java index 02a913443..cb551ff54 100644 --- a/redisson/src/test/java/org/redisson/BaseMapTest.java +++ b/redisson/src/test/java/org/redisson/BaseMapTest.java @@ -354,6 +354,46 @@ public abstract class BaseMapTest extends BaseTest { destroy(map); } + @Test + public void testPutAllBatched() { + RMap map = getMap("simple"); + map.put(1, "1"); + map.put(2, "2"); + map.put(3, "3"); + + Map joinMap = new HashMap(); + joinMap.put(4, "4"); + joinMap.put(5, "5"); + joinMap.put(6, "6"); + map.putAll(joinMap, 5); + + assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6); + + Map joinMap2 = new HashMap(); + joinMap2.put(7, "7"); + joinMap2.put(8, "8"); + joinMap2.put(9, "9"); + joinMap2.put(10, "10"); + joinMap2.put(11, "11"); + map.putAll(joinMap2, 5); + + assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11); + + Map joinMap3 = new HashMap(); + joinMap3.put(12, "12"); + joinMap3.put(13, "13"); + joinMap3.put(14, "14"); + joinMap3.put(15, "15"); + joinMap3.put(16, "16"); + joinMap3.put(17, "17"); + joinMap3.put(18, "18"); + map.putAll(joinMap3, 5); + + assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18); + + destroy(map); + } + @Test public void testPutAllBig() { Map joinMap = new HashMap();