Merge branch 'master' into 3.0.0

pull/1821/head
Nikita Koksharov 6 years ago
commit 2f8bbb7f0e

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -264,12 +265,72 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
}
@Override
public void putAll(Map<? extends K, ? extends V> map) {
public final void putAll(Map<? extends K, ? extends V> map) {
get(putAllAsync(map));
}
@Override
public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
public void putAll(Map<? extends K, ? extends V> map, int batchSize) {
get(putAllAsync(map, batchSize));
}
@Override
public RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, int batchSize) {
Map<K, V> batch = new HashMap<K, V>();
AtomicInteger counter = new AtomicInteger();
Iterator<Entry<K, V>> iter = ((Map<K, V>)map).entrySet().iterator();
RPromise<Void> promise = new RedissonPromise<Void>();
putAllAsync(batch, iter, counter, batchSize, promise);
return promise;
}
private void putAllAsync(final Map<K, V> batch, final Iterator<Entry<K, V>> iter,
final AtomicInteger counter, final int batchSize, final RPromise<Void> promise) {
batch.clear();
while (iter.hasNext()) {
Entry<K, V> entry = iter.next();
batch.put(entry.getKey(), entry.getValue());
counter.incrementAndGet();
if (counter.get() % batchSize == 0) {
RFuture<Void> future = putAllAsync(batch);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
putAllAsync(batch, iter, counter, batchSize, promise);
}
});
return;
}
}
if (batch.isEmpty()) {
promise.trySuccess(null);
return;
}
RFuture<Void> future = putAllAsync(batch);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.tryFailure(future.cause());
return;
}
promise.trySuccess(null);
}
});
}
@Override
public final RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
}
@ -288,7 +349,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
return mapWriterFuture(future, listener);
}
protected <M> RFuture<M> mapWriterFuture(RFuture<M> future, final MapWriterTask<M> listener) {
protected final <M> RFuture<M> mapWriterFuture(RFuture<M> future, final MapWriterTask<M> listener) {
if (options != null && options.getWriteMode() == WriteMode.WRITE_BEHIND) {
future.addListener(new MapWriteBehindListener<M>(commandExecutor, listener, writeBehindCurrentThreads, writeBehindTasks, options.getWriteBehindThreads()));
return future;

@ -273,14 +273,14 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
}
@Override
public Long addAll(Map<V, Double> objects) {
public int addAll(Map<V, Double> objects) {
return get(addAllAsync(objects));
}
@Override
public RFuture<Long> addAllAsync(Map<V, Double> objects) {
public RFuture<Integer> addAllAsync(Map<V, Double> objects) {
if (objects.isEmpty()) {
return RedissonPromise.newSucceededFuture(0L);
return RedissonPromise.newSucceededFuture(0);
}
List<Object> params = new ArrayList<Object>(objects.size()*2+1);
params.add(getName());
@ -289,7 +289,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
params.add(encode(entry.getKey()));
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD, params.toArray());
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD_INT, params.toArray());
}
@Override
@ -705,12 +705,12 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
}
@Override
public Long count(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
public int count(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
return get(countAsync(startScore, startScoreInclusive, endScore, endScoreInclusive));
}
@Override
public RFuture<Long> countAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
public RFuture<Integer> countAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZCOUNT, getName(), startValue, endValue);

@ -190,6 +190,16 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SRANDMEMBER_SINGLE, getName());
}
@Override
public Set<V> random(int count) {
return get(randomAsync(count));
}
@Override
public RFuture<Set<V>> randomAsync(int count) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SRANDMEMBER, getName(), count);
}
@Override
public RFuture<Boolean> removeAsync(Object o) {
return commandExecutor.writeAsync(getName(o), codec, RedisCommands.SREM_SINGLE, getName(o), encode(o));

@ -303,6 +303,16 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SRANDMEMBER_SINGLE, getName());
}
@Override
public Set<V> random(int count) {
return get(randomAsync(count));
}
@Override
public RFuture<Set<V>> randomAsync(int count) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SRANDMEMBER, getName(), count);
}
@Override
public RFuture<Boolean> removeAsync(Object o) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,

@ -202,6 +202,18 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
@Override
void putAll(java.util.Map<? extends K, ? extends V> map);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch. Batch inserted by chunks limited by <code>batchSize</code> amount
* to avoid OOM and/or Redis response timeout error for map with big size.
* <p>
* If {@link MapWriter} is defined then new map entries are stored in write-through mode.
*
* @param map mappings to be stored in this map
* @param batchSize - map chunk size
*/
void putAll(Map<? extends K, ? extends V> map, int batchSize);
/**
* Gets a map slice contained the mappings with defined <code>keys</code>
* by one operation.

@ -88,6 +88,19 @@ public interface RMapAsync<K, V> extends RExpirableAsync {
*/
RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map);
/**
* Associates the specified <code>value</code> with the specified <code>key</code>
* in batch. Batch inserted by chunks limited by <code>batchSize</code> amount
* to avoid OOM and/or Redis response timeout error for map with big size.
* <p>
* If {@link MapWriter} is defined then new map entries are stored in write-through mode.
*
* @param map mappings to be stored in this map
* @param batchSize - map chunk size
* @return void
*/
RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, int batchSize);
/**
* Atomically adds the given <code>delta</code> to the current value
* by mapped <code>key</code>.

@ -162,7 +162,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
*/
Double lastScore();
Long addAll(Map<V, Double> objects);
int addAll(Map<V, Double> objects);
int removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
@ -370,7 +370,7 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
* @param endScoreInclusive - end score inclusive
* @return count of elements
*/
Long count(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
int count(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
/**
* Read all values at once.

@ -151,7 +151,7 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
*/
RFuture<Double> lastScoreAsync();
RFuture<Long> addAllAsync(Map<V, Double> objects);
RFuture<Integer> addAllAsync(Map<V, Double> objects);
RFuture<Integer> removeRangeByScoreAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
@ -312,7 +312,7 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
* @param endScoreInclusive - end score inclusive
* @return count
*/
RFuture<Long> countAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
RFuture<Integer> countAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
/**
* Read all values at once.

@ -96,6 +96,14 @@ public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set
*/
V random();
/**
* Returns random elements from set limited by <code>count</code>
*
* @param count - values amount to return
* @return value
*/
Set<V> random(int count);
/**
* Move a member from this set to the given destination set in.
*

@ -50,6 +50,14 @@ public interface RSetAsync<V> extends RCollectionAsync<V>, RSortableAsync<Set<V>
* @return value
*/
RFuture<V> randomAsync();
/**
* Returns random elements from set limited by <code>count</code>
*
* @param count - values amount to return
* @return value
*/
RFuture<Set<V>> randomAsync(int count);
/**
* Move a member from this set to the given destination set in async mode.

@ -120,7 +120,7 @@ public interface RedisCommands {
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor());
RedisStrictCommand<Integer> ZCARD_INT = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor());
RedisStrictCommand<Long> ZCARD = new RedisStrictCommand<Long>("ZCARD");
RedisStrictCommand<Long> ZCOUNT = new RedisStrictCommand<Long>("ZCOUNT");
RedisStrictCommand<Integer> ZCOUNT = new RedisStrictCommand<Integer>("ZCOUNT", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZLEXCOUNT = new RedisStrictCommand<Integer>("ZLEXCOUNT", new IntegerReplayConvertor());
RedisCommand<Boolean> ZSCORE_CONTAINS = new RedisCommand<Boolean>("ZSCORE", new BooleanNotNullReplayConvertor());
RedisStrictCommand<Double> ZSCORE = new RedisStrictCommand<Double>("ZSCORE", new DoubleReplayConvertor());
@ -169,7 +169,7 @@ public interface RedisCommands {
RedisCommand<Boolean> SREM_SINGLE = new RedisCommand<Boolean>("SREM", new BooleanAmountReplayConvertor());
RedisCommand<Boolean> SMOVE = new RedisCommand<Boolean>("SMOVE", new BooleanReplayConvertor());
RedisCommand<Set<Object>> SMEMBERS = new RedisCommand<Set<Object>>("SMEMBERS", new ObjectSetReplayDecoder<Object>());
RedisCommand<List<Object>> SRANDMEMBER = new RedisCommand<List<Object>>("SRANDMEMBER", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> SRANDMEMBER = new RedisCommand<Set<Object>>("SRANDMEMBER", new ObjectSetReplayDecoder<Object>());
RedisCommand<Object> SRANDMEMBER_SINGLE = new RedisCommand<Object>("SRANDMEMBER");
RedisCommand<ListScanResult<Object>> SSCAN = new RedisCommand<ListScanResult<Object>>("SSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()));
RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()));

@ -391,6 +391,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
c.setReadMode(cfg.getReadMode());
c.setSubscriptionMode(cfg.getSubscriptionMode());
c.setDnsMonitoringInterval(cfg.getDnsMonitoringInterval());
c.setKeepAlive(cfg.isKeepAlive());
return c;
}

@ -354,6 +354,46 @@ public abstract class BaseMapTest extends BaseTest {
destroy(map);
}
@Test
public void testPutAllBatched() {
RMap<Integer, String> map = getMap("simple");
map.put(1, "1");
map.put(2, "2");
map.put(3, "3");
Map<Integer, String> joinMap = new HashMap<Integer, String>();
joinMap.put(4, "4");
joinMap.put(5, "5");
joinMap.put(6, "6");
map.putAll(joinMap, 5);
assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6);
Map<Integer, String> joinMap2 = new HashMap<Integer, String>();
joinMap2.put(7, "7");
joinMap2.put(8, "8");
joinMap2.put(9, "9");
joinMap2.put(10, "10");
joinMap2.put(11, "11");
map.putAll(joinMap2, 5);
assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
Map<Integer, String> joinMap3 = new HashMap<Integer, String>();
joinMap3.put(12, "12");
joinMap3.put(13, "13");
joinMap3.put(14, "14");
joinMap3.put(15, "15");
joinMap3.put(16, "16");
joinMap3.put(17, "17");
joinMap3.put(18, "18");
map.putAll(joinMap3, 5);
assertThat(map.keySet()).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18);
destroy(map);
}
@Test
public void testPutAllBig() {
Map<Integer, String> joinMap = new HashMap<Integer, String>();

@ -16,7 +16,7 @@ public class RedisVersion implements Comparable<RedisVersion>{
public RedisVersion(String fullVersion) {
this.fullVersion = fullVersion;
Matcher matcher = Pattern.compile("^([\\d]+)\\.([\\d]+)\\.([\\d]+)$").matcher(fullVersion);
Matcher matcher = Pattern.compile("^([\\d]+)\\.([\\d]+)\\.([\\d]+)").matcher(fullVersion);
matcher.find();
majorVersion = Integer.parseInt(matcher.group(1));
minorVersion = Integer.parseInt(matcher.group(2));

@ -117,7 +117,7 @@ public class RedissonBatchTest extends BaseTest {
assertThat(b2f2.get()).isEqualTo(2d);
}
@Test(timeout = 21000)
@Test(timeout = 22000)
public void testPerformance() {
RMap<String, String> map = redisson.getMap("map");
Map<String, String> m = new HashMap<String, String>();

@ -347,7 +347,17 @@ public class RedissonSetTest extends BaseTest {
assertThat(set.removeRandom(4)).isEmpty();
}
@Test
public void testRandomLimited() {
RSet<Integer> set = redisson.getSet("simple");
for (int i = 0; i < 10; i++) {
set.add(i);
}
assertThat(set.random(3)).containsAnyElementsOf(set.readAll()).hasSize(3);
}
@Test
public void testRandom() {
RSet<Integer> set = redisson.getSet("simple");

Loading…
Cancel
Save