Feature - added RMap#putAllAsync() method with batch size #1662

pull/1721/head
Nikita Koksharov 6 years ago
parent 84350aad69
commit 44ea60a242

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -264,12 +265,72 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
} }
@Override @Override
public void putAll(Map<? extends K, ? extends V> map) { public final void putAll(Map<? extends K, ? extends V> map) {
get(putAllAsync(map)); get(putAllAsync(map));
} }
@Override @Override
public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) { public void putAll(Map<? extends K, ? extends V> map, int batchSize) {
get(putAllAsync(map, batchSize));
}
@Override
public RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, int batchSize) {
Map<K, V> batch = new HashMap<K, V>();
AtomicInteger counter = new AtomicInteger();
Iterator<Entry<K, V>> iter = ((Map<K, V>)map).entrySet().iterator();
RPromise<Void> promise = new RedissonPromise<Void>();
putAllAsync(batch, iter, counter, batchSize, promise);
return promise;
}
private void putAllAsync(final Map<K, V> batch, final Iterator<Entry<K, V>> iter,
final AtomicInteger counter, final int batchSize, final RPromise<Void> promise) {
batch.clear();
while (iter.hasNext()) {
Entry<K, V> entry = iter.next();
batch.put(entry.getKey(), entry.getValue());
counter.incrementAndGet();
if (counter.get() % batchSize == 0) {
RFuture<Void> future = putAllAsync(batch);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
putAllAsync(batch, iter, counter, batchSize, promise);
}
});
return;
}
}
if (batch.isEmpty()) {
promise.trySuccess(null);
return;
}
RFuture<Void> future = putAllAsync(batch);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(null);
}
});
}
@Override
public final RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) { if (map.isEmpty()) {
return RedissonPromise.newSucceededFuture(null); return RedissonPromise.newSucceededFuture(null);
} }
@ -288,7 +349,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return mapWriterFuture(future, listener); return mapWriterFuture(future, listener);
} }
protected <M> RFuture<M> mapWriterFuture(RFuture<M> future, final MapWriterTask<M> listener) { protected final <M> RFuture<M> mapWriterFuture(RFuture<M> future, final MapWriterTask<M> listener) {
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) { if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
future.addListener(new MapWriteBehindListener<M>(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads())); future.addListener(new MapWriteBehindListener<M>(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads()));
return future; return future;

@ -202,6 +202,18 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
@Override @Override
void putAll(java.util.Map<? extends K, ? extends V> map); void putAll(java.util.Map<? extends K, ? extends V> map);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch. Batch inserted by chunks limited by <code>batchSize</code> amount
* to avoid OOM and/or Redis response timeout error for map with big size.
* <p>
* If {@link MapWriter} is defined then new map entries are stored in write-through mode.
*
* @param map mappings to be stored in this map
* @param batchSize - map chunk size
*/
void putAll(Map<? extends K, ? extends V> map, int batchSize);
/** /**
* Gets a map slice contained the mappings with defined <code>keys</code> * Gets a map slice contained the mappings with defined <code>keys</code>
* by one operation. * by one operation.

@ -88,6 +88,19 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
*/ */
RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map); RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch. Batch inserted by chunks limited by <code>batchSize</code> amount
* to avoid OOM and/or Redis response timeout error for map with big size.
* <p>
* If {@link MapWriter} is defined then new map entries are stored in write-through mode.
*
* @param map mappings to be stored in this map
* @param batchSize - map chunk size
* @return void
*/
RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, int batchSize);
/** /**
* Atomically adds the given <code>delta</code> to the current value * Atomically adds the given <code>delta</code> to the current value
* by mapped <code>key</code>. * by mapped <code>key</code>.

@ -354,6 +354,46 @@ public abstract class BaseMapTest extends BaseTest {
destroy(map); destroy(map);
} }
@Test
public void testPutAllBatched() {
RMap<Integer, String> map = getMap("simple");
map.put(1, "1");
map.put(2, "2");
map.put(3, "3");
Map<Integer, String> joinMap = new HashMap<Integer, String>();
joinMap.put(4, "4");
joinMap.put(5, "5");
joinMap.put(6, "6");
map.putAll(joinMap, 5);
assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6);
Map<Integer, String> joinMap2 = new HashMap<Integer, String>();
joinMap2.put(7, "7");
joinMap2.put(8, "8");
joinMap2.put(9, "9");
joinMap2.put(10, "10");
joinMap2.put(11, "11");
map.putAll(joinMap2, 5);
assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
Map<Integer, String> joinMap3 = new HashMap<Integer, String>();
joinMap3.put(12, "12");
joinMap3.put(13, "13");
joinMap3.put(14, "14");
joinMap3.put(15, "15");
joinMap3.put(16, "16");
joinMap3.put(17, "17");
joinMap3.put(18, "18");
map.putAll(joinMap3, 5);
assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18);
destroy(map);
}
@Test @Test
public void testPutAllBig() { public void testPutAllBig() {
Map<Integer, String> joinMap = new HashMap<Integer, String>(); Map<Integer, String> joinMap = new HashMap<Integer, String>();

Loading…
Cancel
Save