Merge branch 'redisson/master' into add-uri-builder

# Conflicts:
#
redisson/src/main/java/org/redisson/connection/SentinelConnectionManager
.java
pull/894/head
Rui Gu 8 years ago
commit 1f1f94209a

@ -4,7 +4,7 @@ Redisson Releases History
Try __ULTRA-FAST__ [Redisson PRO](https://redisson.pro) edition. Try __ULTRA-FAST__ [Redisson PRO](https://redisson.pro) edition.
### 10-Apr-2017 - versions 2.9.2 and 3.4.2 released ### 10-May-2017 - versions 2.9.2 and 3.4.2 released
Feature - __Dropwizard metrics integration__ More details [here](https://github.com/redisson/redisson/wiki/14.-Integration-with-frameworks#147-dropwizard-metrics) Feature - __Dropwizard metrics integration__ More details [here](https://github.com/redisson/redisson/wiki/14.-Integration-with-frameworks#147-dropwizard-metrics)
Feature - `RLocalCachedMap.preloadCache` method added (thanks to Steve Draper) Feature - `RLocalCachedMap.preloadCache` method added (thanks to Steve Draper)

@ -1,6 +1,6 @@
Redisson: Redis based In-Memory Data Grid for Java. Redisson: Redis based In-Memory Data Grid for Java.
==== ====
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.4.1) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [Support chat](https://gitter.im/mrniko/redisson) | [Ultra-fast version](https://redisson.pro) [Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.4.1) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [Support chat](https://gitter.im/mrniko/redisson) | **[Ultra-fast version](https://redisson.pro)**
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
## Please take part in [Redisson survey](https://www.surveymonkey.com/r/QXQZH5D) ## Please take part in [Redisson survey](https://www.surveymonkey.com/r/QXQZH5D)

@ -26,6 +26,12 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
abstract class RedissonBaseIterator<V> implements Iterator<V> { abstract class RedissonBaseIterator<V> implements Iterator<V> {
private List<ByteBuf> firstValues; private List<ByteBuf> firstValues;
@ -36,7 +42,6 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
private boolean finished; private boolean finished;
private boolean currentElementRemoved; private boolean currentElementRemoved;
private boolean removeExecuted;
private V value; private V value;
@Override @Override
@ -47,7 +52,6 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
free(lastValues); free(lastValues);
currentElementRemoved = false; currentElementRemoved = false;
removeExecuted = false;
client = null; client = null;
firstValues = null; firstValues = null;
lastValues = null; lastValues = null;
@ -58,9 +62,7 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
} }
finished = false; finished = false;
} }
long prevIterPos;
do { do {
prevIterPos = nextIterPos;
ListScanResult<ScanObjectEntry> res = iterator(client, nextIterPos); ListScanResult<ScanObjectEntry> res = iterator(client, nextIterPos);
if (lastValues != null) { if (lastValues != null) {
free(lastValues); free(lastValues);
@ -76,7 +78,6 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
client = null; client = null;
firstValues = null; firstValues = null;
nextIterPos = 0; nextIterPos = 0;
prevIterPos = -1;
} }
} else { } else {
if (firstValues.isEmpty()) { if (firstValues.isEmpty()) {
@ -87,38 +88,38 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
client = null; client = null;
firstValues = null; firstValues = null;
nextIterPos = 0; nextIterPos = 0;
prevIterPos = -1;
continue; continue;
} }
if (res.getPos() == 0) { if (res.getPos() == 0) {
free(firstValues);
free(lastValues);
finished = true; finished = true;
return false; return false;
} }
} }
} else if (lastValues.removeAll(firstValues)) { } else if (lastValues.removeAll(firstValues)
|| (lastValues.isEmpty() && nextIterPos == 0)) {
free(firstValues); free(firstValues);
free(lastValues); free(lastValues);
currentElementRemoved = false; currentElementRemoved = false;
removeExecuted = false;
client = null; client = null;
firstValues = null; firstValues = null;
lastValues = null; lastValues = null;
nextIterPos = 0; nextIterPos = 0;
prevIterPos = -1;
if (tryAgain()) { if (tryAgain()) {
continue; continue;
} }
finished = true; finished = true;
return false; return false;
} }
} }
lastIter = res.getValues().iterator(); lastIter = res.getValues().iterator();
nextIterPos = res.getPos(); nextIterPos = res.getPos();
} while (!lastIter.hasNext() && nextIterPos != prevIterPos); } while (!lastIter.hasNext());
if (prevIterPos == nextIterPos && !removeExecuted) {
finished = true;
}
} }
return lastIter.hasNext(); return lastIter.hasNext();
} }
@ -170,7 +171,6 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
lastIter.remove(); lastIter.remove();
remove(value); remove(value);
currentElementRemoved = true; currentElementRemoved = true;
removeExecuted = true;
} }
abstract void remove(V value); abstract void remove(V value);

@ -28,6 +28,14 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
* @param <M> loaded value type
*/
public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> { public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstValues; private Map<ByteBuf, ByteBuf> firstValues;
@ -38,7 +46,6 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private boolean finished; private boolean finished;
private boolean currentElementRemoved; private boolean currentElementRemoved;
private boolean removeExecuted;
protected Map.Entry<ScanObjectEntry, ScanObjectEntry> entry; protected Map.Entry<ScanObjectEntry, ScanObjectEntry> entry;
@Override @Override
@ -49,7 +56,6 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
free(lastValues); free(lastValues);
currentElementRemoved = false; currentElementRemoved = false;
removeExecuted = false;
client = null; client = null;
firstValues = null; firstValues = null;
lastValues = null; lastValues = null;
@ -60,15 +66,15 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
} }
finished = false; finished = false;
} }
long prevIterPos;
do { do {
prevIterPos = nextIterPos;
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator(); MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator();
if (lastValues != null) { if (lastValues != null) {
free(lastValues); free(lastValues);
} }
lastValues = convert(res.getMap()); lastValues = convert(res.getMap());
client = res.getRedisClient(); client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) { if (nextIterPos == 0 && firstValues == null) {
firstValues = lastValues; firstValues = lastValues;
lastValues = null; lastValues = null;
@ -76,7 +82,6 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
client = null; client = null;
firstValues = null; firstValues = null;
nextIterPos = 0; nextIterPos = 0;
prevIterPos = -1;
} }
} else { } else {
if (firstValues.isEmpty()) { if (firstValues.isEmpty()) {
@ -87,38 +92,38 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
client = null; client = null;
firstValues = null; firstValues = null;
nextIterPos = 0; nextIterPos = 0;
prevIterPos = -1;
continue; continue;
} }
if (res.getPos() == 0) { if (res.getPos() == 0) {
free(firstValues);
free(lastValues);
finished = true; finished = true;
return false; return false;
} }
} }
} else if (lastValues.keySet().removeAll(firstValues.keySet())) { } else if (lastValues.keySet().removeAll(firstValues.keySet())
|| (lastValues.isEmpty() && nextIterPos == 0)) {
free(firstValues); free(firstValues);
free(lastValues); free(lastValues);
currentElementRemoved = false; currentElementRemoved = false;
removeExecuted = false;
client = null; client = null;
firstValues = null; firstValues = null;
lastValues = null; lastValues = null;
nextIterPos = 0; nextIterPos = 0;
prevIterPos = -1;
if (tryAgain()) { if (tryAgain()) {
continue; continue;
} }
finished = true; finished = true;
return false; return false;
} }
} }
lastIter = res.getMap().entrySet().iterator(); lastIter = res.getMap().entrySet().iterator();
nextIterPos = res.getPos(); nextIterPos = res.getPos();
} while (!lastIter.hasNext() && nextIterPos != prevIterPos); } while (!lastIter.hasNext());
if (prevIterPos == nextIterPos && !removeExecuted) {
finished = true;
}
} }
return lastIter.hasNext(); return lastIter.hasNext();
@ -184,7 +189,6 @@ public abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
lastIter.remove(); lastIter.remove();
removeKey(); removeKey();
currentElementRemoved = true; currentElementRemoved = true;
removeExecuted = true;
entry = null; entry = null;
} }

@ -124,6 +124,17 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit); return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit);
} }
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}
@Override @Override
public int remainingCapacity() { public int remainingCapacity() {
return Integer.MAX_VALUE; return Integer.MAX_VALUE;

@ -134,6 +134,17 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return res.await().getNow(); return res.await().getNow();
} }
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}
@Override @Override
public int remainingCapacity() { public int remainingCapacity() {
return Integer.MAX_VALUE; return Integer.MAX_VALUE;

@ -33,6 +33,7 @@ import org.redisson.command.CommandExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder; import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.PromiseDelegator; import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.SemaphorePubSub; import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -136,7 +137,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
} }
private RPromise<V> wrapTakeFuture(final RFuture<V> takeFuture) { private RPromise<V> wrapTakeFuture(final RFuture<V> takeFuture) {
final RPromise<V> result = new PromiseDelegator<V>(commandExecutor.getConnectionManager().<V>newPromise()) { final RPromise<V> result = new RedissonPromise<V>() {
@Override @Override
public boolean cancel(boolean mayInterruptIfRunning) { public boolean cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning); super.cancel(mayInterruptIfRunning);
@ -253,6 +254,17 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
return wrapTakeFuture(takeFuture); return wrapTakeFuture(takeFuture);
} }
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}
@Override @Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
RFuture<V> takeFuture = commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout)); RFuture<V> takeFuture = commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));

@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RDelayedQueue; import org.redisson.api.RDelayedQueue;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RQueue;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
@ -417,7 +416,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
public RFuture<V> peekAsync() { public RFuture<V> peekAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_OBJECT, return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], 0); " "local v = redis.call('lindex', KEYS[1], 0); "
+ "if v ~= nil then " + "if v ~= false then "
+ "local randomId, value = struct.unpack('dLc0', v);" + "local randomId, value = struct.unpack('dLc0', v);"
+ "return value; " + "return value; "
+ "end " + "end "
@ -429,7 +428,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
public RFuture<V> pollAsync() { public RFuture<V> pollAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('lpop', KEYS[1]); " "local v = redis.call('lpop', KEYS[1]); "
+ "if v ~= nil then " + "if v ~= false then "
+ "redis.call('zrem', KEYS[2], v); " + "redis.call('zrem', KEYS[2], v); "
+ "local randomId, value = struct.unpack('dLc0', v);" + "local randomId, value = struct.unpack('dLc0', v);"
+ "return value; " + "return value; "
@ -447,7 +446,7 @@ public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelay
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName) { public RFuture<V> pollLastAndOfferFirstToAsync(String queueName) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT, return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_OBJECT,
"local v = redis.call('rpop', KEYS[1]); " "local v = redis.call('rpop', KEYS[1]); "
+ "if v ~= nil then " + "if v ~= false then "
+ "redis.call('zrem', KEYS[2], v); " + "redis.call('zrem', KEYS[2], v); "
+ "local randomId, value = struct.unpack('dLc0', v);" + "local randomId, value = struct.unpack('dLc0', v);"
+ "redis.call('lpush', KEYS[3], value); " + "redis.call('lpush', KEYS[3], value); "

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -230,27 +231,15 @@ public class RedissonKeys implements RKeys {
}; };
for (MasterSlaveEntry entry : entries) { for (MasterSlaveEntry entry : entries) {
RFuture<Collection<String>> findFuture = commandExecutor.readAsync(entry, null, RedisCommands.KEYS, pattern); Iterator<String> keysIterator = createKeysIterator(entry, pattern, 10);
findFuture.addListener(new FutureListener<Collection<String>>() { Collection<String> keys = new HashSet<String>();
@Override while (keysIterator.hasNext()) {
public void operationComplete(Future<Collection<String>> future) throws Exception { String key = keysIterator.next();
if (!future.isSuccess()) { keys.add(key);
failed.set(future.cause());
checkExecution(result, failed, count, executed);
return;
} }
Collection<String> keys = future.getNow();
if (keys.isEmpty()) {
checkExecution(result, failed, count, executed);
return;
}
RFuture<Long> deleteFuture = deleteAsync(keys.toArray(new String[keys.size()])); RFuture<Long> deleteFuture = deleteAsync(keys.toArray(new String[keys.size()]));
deleteFuture.addListener(listener); deleteFuture.addListener(listener);
} }
});
}
return result; return result;
} }

@ -144,6 +144,27 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), -1, -1); return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), -1, -1);
} }
@Override
public Double firstScore() {
return get(firstScoreAsync());
}
@Override
public RFuture<Double> firstScoreAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE_SCORE, getName(), 0, 0, "WITHSCORES");
}
@Override
public Double lastScore() {
return get(lastScoreAsync());
}
@Override
public RFuture<Double> lastScoreAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE_SCORE, getName(), -1, -1, "WITHSCORES");
}
@Override @Override
public RFuture<Boolean> addAsync(double score, V object) { public RFuture<Boolean> addAsync(double score, V object) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD_BOOL, getName(), BigDecimal.valueOf(score).toPlainString(), object); return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD_BOOL, getName(), BigDecimal.valueOf(score).toPlainString(), object);

@ -44,4 +44,6 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException; V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException;
V takeLastAndOfferFirstTo(String queueName) throws InterruptedException;
} }

@ -94,6 +94,8 @@ public interface RBlockingQueueAsync<V> extends RQueueAsync<V> {
RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit); RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit);
RFuture<V> takeLastAndOfferFirstToAsync(String queueName);
/** /**
* Retrieves and removes the head of this queue in async mode, waiting up to the * Retrieves and removes the head of this queue in async mode, waiting up to the
* specified wait time if necessary for an element to become available. * specified wait time if necessary for an element to become available.

@ -53,6 +53,10 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
V last(); V last();
Double firstScore();
Double lastScore();
Long addAll(Map<V, Double> objects); Long addAll(Map<V, Double> objects);
int removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive); int removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);

@ -38,6 +38,10 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn
RFuture<V> lastAsync(); RFuture<V> lastAsync();
RFuture<Double> firstScoreAsync();
RFuture<Double> lastScoreAsync();
RFuture<Long> addAllAsync(Map<V, Double> objects); RFuture<Long> addAllAsync(Map<V, Double> objects);
RFuture<Integer> removeRangeByScoreAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive); RFuture<Integer> removeRangeByScoreAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);

@ -73,7 +73,7 @@ public abstract class BaseConnectionHandler<C extends RedisConnection> extends C
futures.add(future); futures.add(future);
} }
if (config.getClientName() != null) { if (config.getClientName() != null) {
RFuture<Object> future = connection.async(RedisCommands.CLIENT_SETNAME, config.getDatabase()); RFuture<Object> future = connection.async(RedisCommands.CLIENT_SETNAME, config.getClientName());
futures.add(future); futures.add(future);
} }
if (config.isReadOnly()) { if (config.isReadOnly()) {

@ -46,11 +46,11 @@ import org.redisson.client.protocol.decoder.ListResultReplayDecoder;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.Long2MultiDecoder; import org.redisson.client.protocol.decoder.Long2MultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder; import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder; import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
@ -115,6 +115,7 @@ public interface RedisCommands {
RedisCommand<Integer> ZREVRANK_INT = new RedisCommand<Integer>("ZREVRANK", new IntegerReplayConvertor(), 2); RedisCommand<Integer> ZREVRANK_INT = new RedisCommand<Integer>("ZREVRANK", new IntegerReplayConvertor(), 2);
RedisStrictCommand<Long> ZRANK = new RedisStrictCommand<Long>("ZRANK", 2); RedisStrictCommand<Long> ZRANK = new RedisStrictCommand<Long>("ZRANK", 2);
RedisCommand<Object> ZRANGE_SINGLE = new RedisCommand<Object>("ZRANGE", new ObjectFirstResultReplayDecoder<Object>()); RedisCommand<Object> ZRANGE_SINGLE = new RedisCommand<Object>("ZRANGE", new ObjectFirstResultReplayDecoder<Object>());
RedisStrictCommand<Double> ZRANGE_SINGLE_SCORE = new RedisStrictCommand<Double>("ZRANGE", new ObjectFirstScoreReplayDecoder());
RedisCommand<List<Object>> ZRANGE = new RedisCommand<List<Object>>("ZRANGE", new ObjectListReplayDecoder<Object>()); RedisCommand<List<Object>> ZRANGE = new RedisCommand<List<Object>>("ZRANGE", new ObjectListReplayDecoder<Object>());
RedisStrictCommand<Integer> ZREMRANGEBYRANK = new RedisStrictCommand<Integer>("ZREMRANGEBYRANK", new IntegerReplayConvertor()); RedisStrictCommand<Integer> ZREMRANGEBYRANK = new RedisStrictCommand<Integer>("ZREMRANGEBYRANK", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZREMRANGEBYSCORE = new RedisStrictCommand<Integer>("ZREMRANGEBYSCORE", new IntegerReplayConvertor()); RedisStrictCommand<Integer> ZREMRANGEBYSCORE = new RedisStrictCommand<Integer>("ZREMRANGEBYSCORE", new IntegerReplayConvertor());

@ -0,0 +1,49 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.math.BigDecimal;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
*
*
*/
public class ObjectFirstScoreReplayDecoder implements MultiDecoder<Double> {
@Override
public Object decode(ByteBuf buf, State state) {
return new BigDecimal(buf.toString(CharsetUtil.UTF_8)).doubleValue();
}
@Override
public Double decode(List<Object> parts, State state) {
return (Double) parts.get(1);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum % 2 != 0;
}
}

@ -19,6 +19,9 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.Reader; import java.io.Reader;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.List; import java.util.List;
@ -121,62 +124,101 @@ public class ConfigSupport {
private ObjectMapper jsonMapper = createMapper(null, null); private ObjectMapper jsonMapper = createMapper(null, null);
private ObjectMapper yamlMapper = createMapper(new YAMLFactory(), null); private ObjectMapper yamlMapper = createMapper(new YAMLFactory(), null);
private void patchUriObject() throws IOException {
patchUriField("lowMask", "L_DASH");
patchUriField("highMask", "H_DASH");
}
private void patchUriField(String methodName, String fieldName)
throws IOException {
try {
Method lowMask = URI.class.getDeclaredMethod(methodName, String.class);
lowMask.setAccessible(true);
Long lowMaskValue = (Long) lowMask.invoke(null, "-_");
Field lowDash = URI.class.getDeclaredField(fieldName);
Field modifiers = Field.class.getDeclaredField("modifiers");
modifiers.setAccessible(true);
modifiers.setInt(lowDash, lowDash.getModifiers() & ~Modifier.FINAL);
lowDash.setAccessible(true);
lowDash.setLong(null, lowMaskValue);
} catch (Exception e) {
throw new IOException(e);
}
}
public <T> T fromJSON(String content, Class<T> configType) throws IOException { public <T> T fromJSON(String content, Class<T> configType) throws IOException {
patchUriObject();
return jsonMapper.readValue(content, configType); return jsonMapper.readValue(content, configType);
} }
public <T> T fromJSON(File file, Class<T> configType) throws IOException { public <T> T fromJSON(File file, Class<T> configType) throws IOException {
patchUriObject();
return fromJSON(file, configType, null); return fromJSON(file, configType, null);
} }
public <T> T fromJSON(File file, Class<T> configType, ClassLoader classLoader) throws IOException { public <T> T fromJSON(File file, Class<T> configType, ClassLoader classLoader) throws IOException {
patchUriObject();
jsonMapper = createMapper(null, classLoader); jsonMapper = createMapper(null, classLoader);
return jsonMapper.readValue(file, configType); return jsonMapper.readValue(file, configType);
} }
public <T> T fromJSON(URL url, Class<T> configType) throws IOException { public <T> T fromJSON(URL url, Class<T> configType) throws IOException {
patchUriObject();
return jsonMapper.readValue(url, configType); return jsonMapper.readValue(url, configType);
} }
public <T> T fromJSON(Reader reader, Class<T> configType) throws IOException { public <T> T fromJSON(Reader reader, Class<T> configType) throws IOException {
patchUriObject();
return jsonMapper.readValue(reader, configType); return jsonMapper.readValue(reader, configType);
} }
public <T> T fromJSON(InputStream inputStream, Class<T> configType) throws IOException { public <T> T fromJSON(InputStream inputStream, Class<T> configType) throws IOException {
patchUriObject();
return jsonMapper.readValue(inputStream, configType); return jsonMapper.readValue(inputStream, configType);
} }
public String toJSON(Config config) throws IOException { public String toJSON(Config config) throws IOException {
patchUriObject();
return jsonMapper.writeValueAsString(config); return jsonMapper.writeValueAsString(config);
} }
public <T> T fromYAML(String content, Class<T> configType) throws IOException { public <T> T fromYAML(String content, Class<T> configType) throws IOException {
patchUriObject();
return yamlMapper.readValue(content, configType); return yamlMapper.readValue(content, configType);
} }
public <T> T fromYAML(File file, Class<T> configType) throws IOException { public <T> T fromYAML(File file, Class<T> configType) throws IOException {
patchUriObject();
return yamlMapper.readValue(file, configType); return yamlMapper.readValue(file, configType);
} }
public <T> T fromYAML(File file, Class<T> configType, ClassLoader classLoader) throws IOException { public <T> T fromYAML(File file, Class<T> configType, ClassLoader classLoader) throws IOException {
patchUriObject();
yamlMapper = createMapper(new YAMLFactory(), classLoader); yamlMapper = createMapper(new YAMLFactory(), classLoader);
return yamlMapper.readValue(file, configType); return yamlMapper.readValue(file, configType);
} }
public <T> T fromYAML(URL url, Class<T> configType) throws IOException { public <T> T fromYAML(URL url, Class<T> configType) throws IOException {
patchUriObject();
return yamlMapper.readValue(url, configType); return yamlMapper.readValue(url, configType);
} }
public <T> T fromYAML(Reader reader, Class<T> configType) throws IOException { public <T> T fromYAML(Reader reader, Class<T> configType) throws IOException {
patchUriObject();
return yamlMapper.readValue(reader, configType); return yamlMapper.readValue(reader, configType);
} }
public <T> T fromYAML(InputStream inputStream, Class<T> configType) throws IOException { public <T> T fromYAML(InputStream inputStream, Class<T> configType) throws IOException {
patchUriObject();
return yamlMapper.readValue(inputStream, configType); return yamlMapper.readValue(inputStream, configType);
} }
public String toYAML(Config config) throws IOException { public String toYAML(Config config) throws IOException {
patchUriObject();
return yamlMapper.writeValueAsString(config); return yamlMapper.writeValueAsString(config);
} }

@ -81,7 +81,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
// TODO async // TODO async
List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName()); List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
String masterHost = "redis://" + master.get(0) + ":" + master.get(1); String masterHost = createAddress(master.get(0), master.get(1));
this.config.setMasterAddress(masterHost); this.config.setMasterAddress(masterHost);
currentMaster.set(masterHost); currentMaster.set(masterHost);
log.info("master: {} added", masterHost); log.info("master: {} added", masterHost);
@ -97,7 +97,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = map.get("port"); String port = map.get("port");
String flags = map.get("flags"); String flags = map.get("flags");
String host = "redis://" + ip + ":" + port; String host = createAddress(ip, port);
this.config.addSlaveAddress(host); this.config.addSlaveAddress(host);
slaves.put(host, true); slaves.put(host, true);
@ -134,6 +134,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} }
} }
private String createAddress(String host, Object port) {
if (host.contains(":")) {
host = "[" + host + "]";
}
return "redis://" + host + ":" + port;
}
@Override @Override
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config,
HashSet<ClusterSlotRange> slots) { HashSet<ClusterSlotRange> slots) {
@ -208,7 +215,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2]; String ip = parts[2];
String port = parts[3]; String port = parts[3];
String addr = "redis://" + ip + ":" + port; String addr = createAddress(ip, port);
URI uri = URIBuilder.create(addr); URI uri = URIBuilder.create(addr);
registerSentinel(cfg, uri, c); registerSentinel(cfg, uri, c);
} }
@ -222,7 +229,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
final String ip = parts[2]; final String ip = parts[2];
final String port = parts[3]; final String port = parts[3];
final String slaveAddr = "redis://" + ip + ":" + port; final String slaveAddr = createAddress(ip, port);
if (!isUseSameMaster(parts)) { if (!isUseSameMaster(parts)) {
return; return;
@ -310,7 +317,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String slaveAddr = ip + ":" + port; String slaveAddr = ip + ":" + port;
String master = currentMaster.get(); String master = currentMaster.get();
String slaveMaster = "redis://" + parts[6] + ":" + parts[7]; String slaveMaster = createAddress(parts[6], parts[7]);
if (!master.equals(slaveMaster)) { if (!master.equals(slaveMaster)) {
log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMaster, master); log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMaster, master);
return false; return false;
@ -370,7 +377,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = parts[4]; String port = parts[4];
String current = currentMaster.get(); String current = currentMaster.get();
String newMaster = "redis://" + ip + ":" + port; String newMaster = createAddress(ip, port);
if (!newMaster.equals(current) if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) { && currentMaster.compareAndSet(current, newMaster)) {
changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster)); changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster));

@ -39,18 +39,22 @@ public class RedissonCache implements Cache {
private CacheConfig config; private CacheConfig config;
private final boolean allowNullValues;
private final AtomicLong hits = new AtomicLong(); private final AtomicLong hits = new AtomicLong();
private final AtomicLong misses = new AtomicLong(); private final AtomicLong misses = new AtomicLong();
public RedissonCache(RMapCache<Object, Object> mapCache, CacheConfig config) { public RedissonCache(RMapCache<Object, Object> mapCache, CacheConfig config, boolean allowNullValues) {
this.mapCache = mapCache; this.mapCache = mapCache;
this.map = mapCache; this.map = mapCache;
this.config = config; this.config = config;
this.allowNullValues = allowNullValues;
} }
public RedissonCache(RMap<Object, Object> map) { public RedissonCache(RMap<Object, Object> map, boolean allowNullValues) {
this.map = map; this.map = map;
this.allowNullValues = allowNullValues;
} }
@Override @Override
@ -92,6 +96,15 @@ public class RedissonCache implements Cache {
@Override @Override
public void put(Object key, Object value) { public void put(Object key, Object value) {
if (!allowNullValues && value == null) {
if (mapCache != null) {
mapCache.remove(key);
} else {
map.remove(key);
}
return;
}
value = toStoreValue(value); value = toStoreValue(value);
if (mapCache != null) { if (mapCache != null) {
mapCache.fastPut(key, value, config.getTTL(), TimeUnit.MILLISECONDS, config.getMaxIdleTime(), TimeUnit.MILLISECONDS); mapCache.fastPut(key, value, config.getTTL(), TimeUnit.MILLISECONDS, config.getMaxIdleTime(), TimeUnit.MILLISECONDS);
@ -101,13 +114,22 @@ public class RedissonCache implements Cache {
} }
public ValueWrapper putIfAbsent(Object key, Object value) { public ValueWrapper putIfAbsent(Object key, Object value) {
value = toStoreValue(value);
Object prevValue; Object prevValue;
if (!allowNullValues && value == null) {
if (mapCache != null) {
prevValue = mapCache.get(key);
} else {
prevValue = map.get(key);
}
} else {
value = toStoreValue(value);
if (mapCache != null) { if (mapCache != null) {
prevValue = mapCache.putIfAbsent(key, value, config.getTTL(), TimeUnit.MILLISECONDS, config.getMaxIdleTime(), TimeUnit.MILLISECONDS); prevValue = mapCache.putIfAbsent(key, value, config.getTTL(), TimeUnit.MILLISECONDS, config.getMaxIdleTime(), TimeUnit.MILLISECONDS);
} else { } else {
prevValue = map.putIfAbsent(key, value); prevValue = map.putIfAbsent(key, value);
} }
}
return toValueWrapper(prevValue); return toValueWrapper(prevValue);
} }

@ -45,6 +45,10 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA
private ResourceLoader resourceLoader; private ResourceLoader resourceLoader;
private boolean dynamic = true;
private boolean allowNullValues = true;
private Codec codec; private Codec codec;
private RedissonClient redisson; private RedissonClient redisson;
@ -123,6 +127,36 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA
this.codec = codec; this.codec = codec;
} }
/**
* Defines possibility of storing {@code null} values.
* <p>
* Default is <code>true</code>
*
* @param allowNullValues - stores if <code>true</code>
*/
public void setAllowNullValues(boolean allowNullValues) {
this.allowNullValues = allowNullValues;
}
/**
* Defines 'fixed' cache names.
* A new cache instance will not be created in dynamic for non-defined names.
* <p>
* `null` parameter setups dynamic mode
*
* @param names of caches
*/
public void setCacheNames(Collection<String> names) {
if (names != null) {
for (String name : names) {
getCache(name);
}
dynamic = false;
} else {
dynamic = true;
}
}
/** /**
* Set cache config location * Set cache config location
* *
@ -165,6 +199,9 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA
if (cache != null) { if (cache != null) {
return cache; return cache;
} }
if (!dynamic) {
return cache;
}
CacheConfig config = configMap.get(name); CacheConfig config = configMap.get(name);
if (config == null) { if (config == null) {
@ -189,7 +226,7 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA
map = redisson.getMap(name); map = redisson.getMap(name);
} }
Cache cache = new RedissonCache(map); Cache cache = new RedissonCache(map, allowNullValues);
Cache oldCache = instanceMap.putIfAbsent(name, cache); Cache oldCache = instanceMap.putIfAbsent(name, cache);
if (oldCache != null) { if (oldCache != null) {
cache = oldCache; cache = oldCache;
@ -205,7 +242,7 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA
map = redisson.getMapCache(name); map = redisson.getMapCache(name);
} }
Cache cache = new RedissonCache(map, config); Cache cache = new RedissonCache(map, config, allowNullValues);
Cache oldCache = instanceMap.putIfAbsent(name, cache); Cache oldCache = instanceMap.putIfAbsent(name, cache);
if (oldCache != null) { if (oldCache != null) {
cache = oldCache; cache = oldCache;

@ -296,17 +296,43 @@ public class RedissonBlockingQueueTest extends BaseTest {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
}, 10, TimeUnit.SECONDS); }, 5, TimeUnit.SECONDS);
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("{queue}2"); RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("{queue}2");
queue2.put(4); queue2.put(4);
queue2.put(5); queue2.put(5);
queue2.put(6); queue2.put(6);
queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS); Integer value = queue1.pollLastAndOfferFirstTo(queue2.getName(), 5, TimeUnit.SECONDS);
assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6); assertThat(queue2).containsExactly(3, 4, 5, 6);
} }
@Test
public void testTakeLastAndOfferFirstTo() throws InterruptedException {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("{queue}1");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
try {
queue1.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 3, TimeUnit.SECONDS);
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("{queue}2");
queue2.put(4);
queue2.put(5);
queue2.put(6);
long startTime = System.currentTimeMillis();
Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName());
assertThat(System.currentTimeMillis() - startTime).isBetween(2900L, 3200L);
assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6);
}
@Test @Test
public void testAddOfferOrigin() { public void testAddOfferOrigin() {
Queue<Integer> queue = new LinkedList<Integer>(); Queue<Integer> queue = new LinkedList<Integer>();

@ -509,10 +509,38 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
queue2.put(5); queue2.put(5);
queue2.put(6); queue2.put(6);
queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS); Integer value = queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS);
assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6); assertThat(queue2).containsExactly(3, 4, 5, 6);
} }
@Test
public void testTakeLastAndOfferFirstTo() throws InterruptedException {
final RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("{queue}1");
queue1.trySetCapacity(10);
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
try {
queue1.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 3, TimeUnit.SECONDS);
RBoundedBlockingQueue<Integer> queue2 = redisson.getBoundedBlockingQueue("{queue}2");
queue2.trySetCapacity(10);
queue2.put(4);
queue2.put(5);
queue2.put(6);
long startTime = System.currentTimeMillis();
Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName());
assertThat(System.currentTimeMillis() - startTime).isBetween(3000L, 3200L);
assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6);
}
@Test @Test
public void testOffer() { public void testOffer() {
RBoundedBlockingQueue<Integer> queue = redisson.getBoundedBlockingQueue("blocking:queue"); RBoundedBlockingQueue<Integer> queue = redisson.getBoundedBlockingQueue("blocking:queue");

@ -66,6 +66,7 @@ public class RedissonKeysTest extends BaseTest {
for (int i = 0; i < 115; i++) { for (int i = 0; i < 115; i++) {
String key = "key" + Math.random(); String key = "key" + Math.random();
RBucket<String> bucket = redisson.getBucket(key); RBucket<String> bucket = redisson.getBucket(key);
keys.add(key);
bucket.set("someValue"); bucket.set("someValue");
} }

@ -275,6 +275,17 @@ public class RedissonScoredSortedSetTest extends BaseTest {
Assert.assertEquals("d", set.last()); Assert.assertEquals("d", set.last());
} }
@Test
public void testFirstLastScore() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
assertThat(set.firstScore()).isEqualTo(0.1);
assertThat(set.lastScore()).isEqualTo(0.4);
}
@Test @Test
public void testRemoveRangeByScore() { public void testRemoveRangeByScore() {

@ -6,6 +6,7 @@ import static org.redisson.BaseTest.createInstance;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -33,10 +34,13 @@ import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.codec.SerializationCodec; import org.redisson.codec.SerializationCodec;
import org.redisson.config.Config; import org.redisson.config.Config;
import org.redisson.connection.ConnectionListener; import org.redisson.connection.ConnectionListener;
import io.netty.buffer.Unpooled;
public class RedissonTest { public class RedissonTest {
protected RedissonClient redisson; protected RedissonClient redisson;
@ -76,7 +80,7 @@ public class RedissonTest {
} }
@Test @Test
public void testIterator() { public void testIteratorNotLooped() {
RedissonBaseIterator iter = new RedissonBaseIterator() { RedissonBaseIterator iter = new RedissonBaseIterator() {
int i; int i;
@Override @Override
@ -101,6 +105,41 @@ public class RedissonTest {
Assert.assertFalse(iter.hasNext()); Assert.assertFalse(iter.hasNext());
} }
@Test
public void testIteratorNotLooped2() {
RedissonBaseIterator<Integer> iter = new RedissonBaseIterator<Integer>() {
int i;
@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult<ScanObjectEntry>(14L, Arrays.asList(new ScanObjectEntry(Unpooled.wrappedBuffer(new byte[] {1}), 1)));
}
if (i == 2) {
return new ListScanResult(7L, Collections.emptyList());
}
if (i == 3) {
return new ListScanResult(0L, Collections.emptyList());
}
if (i == 4) {
return new ListScanResult(14L, Collections.emptyList());
}
Assert.fail();
return null;
}
@Override
void remove(Integer value) {
}
};
Assert.assertTrue(iter.hasNext());
assertThat(iter.next()).isEqualTo(1);
Assert.assertFalse(iter.hasNext());
}
@BeforeClass @BeforeClass
public static void beforeClass() throws IOException, InterruptedException { public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) { if (!RedissonRuntimeEnvironment.isTravis) {
@ -250,7 +289,7 @@ public class RedissonTest {
Assert.assertEquals(0, pp.stop()); Assert.assertEquals(0, pp.stop());
await().atMost(1, TimeUnit.SECONDS).until(() -> assertThat(connectCounter.get()).isEqualTo(1)); await().atMost(2, TimeUnit.SECONDS).until(() -> assertThat(connectCounter.get()).isEqualTo(1));
await().until(() -> assertThat(disconnectCounter.get()).isEqualTo(1)); await().until(() -> assertThat(disconnectCounter.get()).isEqualTo(1));
} }

Loading…
Cancel
Save