Merge branch 'mrniko/master'

pull/282/head
Rui Gu 10 years ago
commit a427464685

@ -55,6 +55,8 @@ Monits: [monits.com](http://monits.com/)
Brookhaven National Laboratory: [bnl.gov](http://bnl.gov/)
Netflix Dyno client: [dyno] (https://github.com/Netflix/dyno)
武林Q传:[nbrpg](http://www.nbrpg.com/)
Ocous: [ocous](http://www.ocous.com/)
Recent Releases
================================
####Please Note: trunk is current development branch.
@ -248,12 +250,12 @@ Include the following to your dependency list:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.1.1</version>
<version>2.1.2</version>
</dependency>
### Gradle
compile 'org.redisson:redisson:2.1.1'
compile 'org.redisson:redisson:2.1.2'
### Supported by

@ -35,6 +35,8 @@ class BaseConfig<T extends BaseConfig<T>> {
private int retryInterval = 1000;
private int closeConnectionAfterFailAttempts = -1;
/**
* Database index used for Redis connection
*/
@ -67,6 +69,7 @@ class BaseConfig<T extends BaseConfig<T>> {
setTimeout(config.getTimeout());
setClientName(config.getClientName());
setPingTimeout(config.getPingTimeout());
setCloseConnectionAfterFailAttempts(config.getCloseConnectionAfterFailAttempts());
}
/**
@ -159,30 +162,46 @@ class BaseConfig<T extends BaseConfig<T>> {
}
/**
* Name for client connection
* Setup connection name during connection init
* via CLIENT SETNAME command
*
* @param name
*/
public String getClientName() {
return clientName;
}
public T setClientName(String clientName) {
this.clientName = clientName;
return (T) this;
}
public String getClientName() {
return clientName;
}
/**
* Ping timeout used in <code>Node.ping</code> and <code>Node.pingAll<code> operation
*
* @return
* @param ping timeout in milliseconds
*/
public int getPingTimeout() {
return pingTimeout;
}
public T setPingTimeout(int pingTimeout) {
this.pingTimeout = pingTimeout;
return (T) this;
}
public int getPingTimeout() {
return pingTimeout;
}
/**
* Close connection if it has <code>failAttemptsAmount</code>
* fails in a row during command sending. Turned off by default.
*
* @param failAttemptsAmount
*/
public T setCloseConnectionAfterFailAttempts(int failAttemptsAmount) {
this.closeConnectionAfterFailAttempts = failAttemptsAmount;
return (T) this;
}
public int getCloseConnectionAfterFailAttempts() {
return closeConnectionAfterFailAttempts;
}
}

@ -33,6 +33,8 @@ import io.netty.util.concurrent.Future;
//TODO ping support
public interface CommandExecutor {
<T, R> R read(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R read(RedisClient client, String key, RedisCommand<T> command, Object ... params);
<T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);

@ -201,6 +201,12 @@ public class CommandExecutorService implements CommandExecutor {
return get(res);
}
public <T, R> R read(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(client, key, codec, command, params);
return get(res);
}
public <T, R> Future<R> readAsync(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);

@ -33,6 +33,8 @@ public class Config {
private SingleServerConfig singleServerConfig;
private ClusterServersConfig clusterServersConfig;
private ElasticacheServersConfig elasticacheServersConfig;
/**
* Threads amount shared between all redis node clients
@ -71,6 +73,10 @@ public class Config {
if (oldConf.getClusterServersConfig() != null ) {
setClusterServersConfig(new ClusterServersConfig(oldConf.getClusterServersConfig()));
}
if (oldConf.getElasticacheServersConfig() != null) {
setElasticacheServersConfig(new ElasticacheServersConfig(oldConf.getElasticacheServersConfig()));
}
}
/**
@ -91,6 +97,7 @@ public class Config {
checkMasterSlaveServersConfig();
checkSentinelServersConfig();
checkSingleServerConfig();
checkElasticacheServersConfig();
if (clusterServersConfig == null) {
clusterServersConfig = new ClusterServersConfig();
@ -105,17 +112,37 @@ public class Config {
this.clusterServersConfig = clusterServersConfig;
}
public ElasticacheServersConfig useElasticacheServers() {
checkClusterServersConfig();
checkMasterSlaveServersConfig();
checkSentinelServersConfig();
checkSingleServerConfig();
if (elasticacheServersConfig == null) {
elasticacheServersConfig = new ElasticacheServersConfig();
}
return elasticacheServersConfig;
}
ElasticacheServersConfig getElasticacheServersConfig() {
return elasticacheServersConfig;
}
void setElasticacheServersConfig(ElasticacheServersConfig elasticacheServersConfig) {
this.elasticacheServersConfig = elasticacheServersConfig;
}
public SingleServerConfig useSingleServer() {
checkClusterServersConfig();
checkMasterSlaveServersConfig();
checkSentinelServersConfig();
checkElasticacheServersConfig();
if (singleServerConfig == null) {
singleServerConfig = new SingleServerConfig();
}
return singleServerConfig;
}
SingleServerConfig getSingleServerConfig() {
return singleServerConfig;
}
@ -127,6 +154,7 @@ public class Config {
checkClusterServersConfig();
checkSingleServerConfig();
checkMasterSlaveServersConfig();
checkElasticacheServersConfig();
if (sentinelServersConfig == null) {
sentinelServersConfig = new SentinelServersConfig();
@ -145,6 +173,7 @@ public class Config {
checkClusterServersConfig();
checkSingleServerConfig();
checkSentinelServersConfig();
checkElasticacheServersConfig();
if (masterSlaveServersConfig == null) {
masterSlaveServersConfig = new MasterSlaveServersConfig();
@ -194,6 +223,12 @@ public class Config {
throw new IllegalStateException("single server config already used!");
}
}
private void checkElasticacheServersConfig() {
if (elasticacheServersConfig != null) {
throw new IllegalStateException("elasticache replication group servers config already used!");
}
}
/**
* Activates an unix socket if servers binded to loopback interface.

@ -0,0 +1,86 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
/**
* Configuration for an AWS ElastiCache replication group. A replication group is composed
* of a single master endpoint and multiple read slaves.
*
* @author Steve Ungerer
*/
public class ElasticacheServersConfig extends BaseMasterSlaveServersConfig<ElasticacheServersConfig> {
/**
* Replication group node urls list
*/
private List<URI> nodeAddresses = new ArrayList<URI>();
/**
* Replication group scan interval in milliseconds
*/
private int scanInterval = 1000;
public ElasticacheServersConfig() {
}
ElasticacheServersConfig(ElasticacheServersConfig config) {
super(config);
setNodeAddresses(config.getNodeAddresses());
setScanInterval(config.getScanInterval());
}
/**
* Add Redis cluster node address. Use follow format -- <code>host:port</code>
*
* @param addresses in <code>host:port</code> format
* @return
*/
public ElasticacheServersConfig addNodeAddress(String ... addresses) {
for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address));
}
return this;
}
public List<URI> getNodeAddresses() {
return nodeAddresses;
}
void setNodeAddresses(List<URI> nodeAddresses) {
this.nodeAddresses = nodeAddresses;
}
public int getScanInterval() {
return scanInterval;
}
/**
* Elasticache node scan interval in milliseconds
*
* @param scanInterval in milliseconds
* @return
*/
public ElasticacheServersConfig setScanInterval(int scanInterval) {
this.scanInterval = scanInterval;
return this;
}
}

@ -71,7 +71,10 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
}
return this;
}
public MasterSlaveServersConfig addSlaveAddress(URI slaveAddress) {
slaveAddresses.add(slaveAddress);
return this;
}
public List<URI> getSlaveAddresses() {
return slaveAddresses;
}

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClusterConnectionManager;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ElasticacheConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
@ -39,11 +40,13 @@ import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RHyperLogLog;
import org.redisson.core.RKeys;
import org.redisson.core.RLexSortedSet;
import org.redisson.core.RList;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSet;
import org.redisson.core.RSortedSet;
@ -77,6 +80,8 @@ public class Redisson implements RedissonClient {
connectionManager = new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy);
} else if (configCopy.getClusterServersConfig() != null) {
connectionManager = new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy);
} else if (configCopy.getElasticacheServersConfig() != null) {
connectionManager = new ElasticacheConnectionManager(configCopy.getElasticacheServersConfig(), configCopy);
} else {
throw new IllegalArgumentException("server(s) address(es) not defined!");
}
@ -216,6 +221,14 @@ public class Redisson implements RedissonClient {
return new RedissonSortedSet<V>(commandExecutor, name);
}
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(commandExecutor, name);
}
public RLexSortedSet getLexSortedSet(String name) {
return new RedissonLexSortedSet(commandExecutor, name);
}
/**
* Returns topic instance by name.
*

@ -28,6 +28,7 @@ import org.redisson.core.RKeysAsync;
import org.redisson.core.RListAsync;
import org.redisson.core.RMapAsync;
import org.redisson.core.RQueueAsync;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScriptAsync;
import org.redisson.core.RSetAsync;
import org.redisson.core.RTopicAsync;
@ -92,6 +93,11 @@ public class RedissonBatch implements RBatch {
return new RedissonAtomicLong(executorService, name);
}
@Override
public <V> RScoredSortedSet<V> getScoredSortedSet(String name) {
return new RedissonScoredSortedSet<V>(executorService, name);
}
@Override
public RScriptAsync getScript() {
return new RedissonScript(executorService);

@ -29,11 +29,13 @@ import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RHyperLogLog;
import org.redisson.core.RKeys;
import org.redisson.core.RLexSortedSet;
import org.redisson.core.RList;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RPatternTopic;
import org.redisson.core.RQueue;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RScript;
import org.redisson.core.RSet;
import org.redisson.core.RSortedSet;
@ -104,6 +106,10 @@ public interface RedissonClient {
*/
<V> RSortedSet<V> getSortedSet(String name);
<V> RScoredSortedSet<V> getScoredSortedSet(String name);
RLexSortedSet getLexSortedSet(String name);
/**
* Returns topic instance by name.
*

@ -18,6 +18,7 @@ package org.redisson;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RExpirable;
@ -30,6 +31,10 @@ abstract class RedissonExpirable extends RedissonObject implements RExpirable {
super(connectionManager, name);
}
RedissonExpirable(Codec codec, CommandExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
@Override
public boolean expire(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.get(expireAsync(timeToLive, timeUnit));

@ -116,6 +116,9 @@ public class RedissonKeys implements RKeys {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
iter.remove();
delete(value);

@ -0,0 +1,177 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RLexSortedSet;
import io.netty.util.concurrent.Future;
public class RedissonLexSortedSet extends RedissonScoredSortedSet<String> implements RLexSortedSet {
public RedissonLexSortedSet(CommandExecutor commandExecutor, String name) {
super(StringCodec.INSTANCE, commandExecutor, name);
}
@Override
public int removeRangeByLex(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
return get(removeRangeByLexAsync(fromElement, fromInclusive, toElement, toInclusive));
}
@Override
public int removeRangeHeadByLex(String toElement, boolean toInclusive) {
return get(removeRangeHeadByLexAsync(toElement, toInclusive));
}
@Override
public Future<Integer> removeRangeHeadByLexAsync(String toElement, boolean toInclusive) {
String toValue = value(toElement, toInclusive);
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), "-", toValue);
}
@Override
public int removeRangeTailByLex(String fromElement, boolean fromInclusive) {
return get(removeRangeTailByLexAsync(fromElement, fromInclusive));
}
@Override
public Future<Integer> removeRangeTailByLexAsync(String fromElement, boolean fromInclusive) {
String fromValue = value(fromElement, fromInclusive);
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), fromValue, "+");
}
@Override
public Future<Integer> removeRangeByLexAsync(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
String fromValue = value(fromElement, fromInclusive);
String toValue = value(toElement, toInclusive);
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZREMRANGEBYLEX, getName(), fromValue, toValue);
}
@Override
public Collection<String> lexRange(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
return get(lexRangeAsync(fromElement, fromInclusive, toElement, toInclusive));
}
@Override
public Collection<String> lexRangeHead(String toElement, boolean toInclusive) {
return get(lexRangeHeadAsync(toElement, toInclusive));
}
@Override
public Future<Collection<String>> lexRangeHeadAsync(String toElement, boolean toInclusive) {
String toValue = value(toElement, toInclusive);
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), "-", toValue);
}
@Override
public Collection<String> lexRangeTail(String fromElement, boolean fromInclusive) {
return get(lexRangeTailAsync(fromElement, fromInclusive));
}
@Override
public Future<Collection<String>> lexRangeTailAsync(String fromElement, boolean fromInclusive) {
String fromValue = value(fromElement, fromInclusive);
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, "+");
}
@Override
public Future<Collection<String>> lexRangeAsync(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
String fromValue = value(fromElement, fromInclusive);
String toValue = value(toElement, toInclusive);
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZRANGEBYLEX, getName(), fromValue, toValue);
}
@Override
public int lexCountTail(String fromElement, boolean fromInclusive) {
return get(lexCountTailAsync(fromElement, fromInclusive));
}
@Override
public Future<Integer> lexCountTailAsync(String fromElement, boolean fromInclusive) {
String fromValue = value(fromElement, fromInclusive);
return commandExecutor.readAsync(getName(), RedisCommands.ZLEXCOUNT, getName(), fromValue, "+");
}
@Override
public int lexCountHead(String toElement, boolean toInclusive) {
return get(lexCountHeadAsync(toElement, toInclusive));
}
@Override
public Future<Integer> lexCountHeadAsync(String toElement, boolean toInclusive) {
String toValue = value(toElement, toInclusive);
return commandExecutor.readAsync(getName(), RedisCommands.ZLEXCOUNT, getName(), "-", toValue);
}
@Override
public int lexCount(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
return get(lexCountAsync(fromElement, fromInclusive, toElement, toInclusive));
}
@Override
public Future<Integer> lexCountAsync(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive) {
String fromValue = value(fromElement, fromInclusive);
String toValue = value(toElement, toInclusive);
return commandExecutor.readAsync(getName(), RedisCommands.ZLEXCOUNT, getName(), fromValue, toValue);
}
private String value(String fromElement, boolean fromInclusive) {
String fromValue = fromElement.toString();
if (fromInclusive) {
fromValue = "[" + fromValue;
} else {
fromValue = "(" + fromValue;
}
return fromValue;
}
@Override
public Future<Boolean> addAsync(String e) {
return addAsync(0, e);
}
@Override
public Future<Boolean> addAllAsync(Collection<? extends String> c) {
List<Object> params = new ArrayList<Object>(2*c.size());
for (Object param : c) {
params.add(0);
params.add(param);
}
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD, getName(), params.toArray());
}
@Override
public boolean add(String e) {
return get(addAsync(e));
}
@Override
public boolean addAll(Collection<? extends String> c) {
return get(addAllAsync(c));
}
}

@ -92,8 +92,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return (List<V>) get(readAllAsync());
}
@Override
public Future<Collection<V>> readAllAsync() {
private Future<Collection<V>> readAllAsync() {
return commandExecutor.readAsync(getName(), LRANGE, getName(), 0, -1);
}

@ -15,6 +15,7 @@
*/
package org.redisson;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RObject;
@ -31,10 +32,16 @@ abstract class RedissonObject implements RObject {
final CommandExecutor commandExecutor;
private final String name;
final Codec codec;
public RedissonObject(CommandExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
public RedissonObject(Codec codec, CommandExecutor commandExecutor, String name) {
this.codec = codec;
this.name = name;
this.commandExecutor = commandExecutor;
}
public RedissonObject(CommandExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected <V> V get(Future<V> future) {

@ -0,0 +1,322 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.core.RScoredSortedSet;
import io.netty.util.concurrent.Future;
public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RScoredSortedSet<V> {
public RedissonScoredSortedSet(CommandExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
public RedissonScoredSortedSet(Codec codec, CommandExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public boolean add(double score, V object) {
return get(addAsync(score, object));
}
public V first() {
return get(firstAsync());
}
public Future<V> firstAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), 0, 0);
}
public V last() {
return get(lastAsync());
}
public Future<V> lastAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_SINGLE, getName(), -1, -1);
}
@Override
public Future<Boolean> addAsync(double score, V object) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZADD, getName(), BigDecimal.valueOf(score).toPlainString(), object);
}
@Override
public boolean remove(Object object) {
return get(removeAsync(object));
}
public int removeRangeByRank(int startIndex, int endIndex) {
return get(removeRangeByRankAsync(startIndex, endIndex));
}
public Future<Integer> removeRangeByRankAsync(int startIndex, int endIndex) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREMRANGEBYRANK, getName(), startIndex, endIndex);
}
public int removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
return get(removeRangeByScoreAsync(startScore, startScoreInclusive, endScore, endScoreInclusive));
}
public Future<Integer> removeRangeByScoreAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREMRANGEBYSCORE, getName(), startValue, endValue);
}
private String value(String element, boolean inclusive) {
if (!inclusive) {
element = "(" + element;
}
return element;
}
@Override
public void clear() {
delete();
}
@Override
public Future<Boolean> removeAsync(Object object) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZREM, getName(), object);
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public int size() {
return get(sizeAsync());
}
@Override
public Future<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZCARD, getName());
}
@Override
public boolean contains(Object object) {
return get(containsAsync(object));
}
@Override
public Future<Boolean> containsAsync(Object o) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZSCORE_CONTAINS, getName(), o);
}
@Override
public Double getScore(V o) {
return get(getScoreAsync(o));
}
@Override
public Future<Double> getScoreAsync(V o) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZSCORE, getName(), o);
}
@Override
public int rank(V o) {
return get(rankAsync(o));
}
@Override
public Future<Integer> rankAsync(V o) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK, getName(), o);
}
private ListScanResult<V> scanIterator(RedisClient client, long startPos) {
return commandExecutor.read(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
}
@Override
public Iterator<V> iterator() {
return new Iterator<V>() {
private Iterator<V> iter;
private RedisClient client;
private Long iterPos;
private boolean removeExecuted;
private V value;
@Override
public boolean hasNext() {
if (iter == null) {
ListScanResult<V> res = scanIterator(null, 0);
client = res.getRedisClient();
iter = res.getValues().iterator();
iterPos = res.getPos();
} else if (!iter.hasNext() && iterPos != 0) {
ListScanResult<V> res = scanIterator(client, iterPos);
iter = res.getValues().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
value = iter.next();
removeExecuted = false;
return value;
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
iter.remove();
RedissonScoredSortedSet.this.remove(value);
removeExecuted = true;
}
};
}
@Override
public Object[] toArray() {
List<Object> res = (List<Object>) get(valueRangeAsync(0, -1));
return res.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<Object> res = (List<Object>) get(valueRangeAsync(0, -1));
return res.toArray(a);
}
@Override
public boolean containsAll(Collection<?> c) {
return get(containsAllAsync(c));
}
@Override
public Future<Boolean> containsAllAsync(Collection<?> c) {
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('zrange', KEYS[1], 0, -1);" +
"for i = 0, table.getn(s), 1 do " +
"for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == s[i] "
+ "then table.remove(ARGV, j) end "
+ "end; "
+ "end;"
+ "return table.getn(ARGV) == 0; ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Future<Boolean> removeAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
+ "if redis.call('zrem', KEYS[1], ARGV[i]) == 1 "
+ "then v = true end "
+"end "
+ "return v ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public boolean removeAll(Collection<?> c) {
return get(removeAllAsync(c));
}
@Override
public boolean retainAll(Collection<?> c) {
return get(retainAllAsync(c));
}
@Override
public Future<Boolean> retainAllAsync(Collection<?> c) {
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local s = redis.call('zrange', KEYS[1], 0, -1) "
+ "local i = 0 "
+ "while i <= table.getn(s) do "
+ "local element = s[i] "
+ "local isInAgrs = false "
+ "for j = 0, table.getn(ARGV), 1 do "
+ "if ARGV[j] == element then "
+ "isInAgrs = true "
+ "break "
+ "end "
+ "end "
+ "if isInAgrs == false then "
+ "redis.call('zrem', KEYS[1], element) "
+ "changed = true "
+ "end "
+ "i = i + 1 "
+ "end "
+ "return changed ",
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Double addScore(V object, Number value) {
return get(addScoreAsync(object, value));
}
@Override
public Future<Double> addScoreAsync(V object, Number value) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.ZINCRBY,
getName(), new BigDecimal(value.toString()).toPlainString(), object);
}
@Override
public Collection<V> valueRange(int startIndex, int endIndex) {
return get(valueRangeAsync(startIndex, endIndex));
}
@Override
public Future<Collection<V>> valueRangeAsync(int startIndex, int endIndex) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE, getName(), startIndex, endIndex);
}
@Override
public Collection<ScoredEntry<V>> entryRange(int startIndex, int endIndex) {
return get(entryRangeAsync(startIndex, endIndex));
}
@Override
public Future<Collection<ScoredEntry<V>>> entryRangeAsync(int startIndex, int endIndex) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGE_ENTRY, getName(), startIndex, endIndex, "WITHSCORES");
}
}

@ -115,9 +115,10 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
if (iter == null) {
throw new IllegalStateException();
}
// lazy init iterator
// hasNext();
iter.remove();
RedissonSet.this.remove(value);
removeExecuted = true;
@ -126,8 +127,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
};
}
@Override
public Future<Collection<V>> readAllAsync() {
private Future<Collection<V>> readAllAsync() {
return commandExecutor.readAsync(getName(), RedisCommands.SMEMBERS, getName());
}

@ -38,6 +38,21 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
*/
private int connectionPoolSize = 100;
/**
* Should the server address be monitored for changes in DNS? Useful for
* AWS ElastiCache where the client is pointed at the endpoint for a replication group
* which is a DNS alias to the current master node.<br>
* <em>NB: applications must ensure the JVM DNS cache TTL is low enough to support this.</em>
* e.g., http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-jvm-ttl.html
*/
private boolean dnsMonitoring = false;
/**
* Interval in milliseconds to check DNS
*/
private long dnsMonitoringInterval = 5000;
SingleServerConfig() {
}
@ -46,6 +61,8 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
setAddress(config.getAddress());
setConnectionPoolSize(config.getConnectionPoolSize());
setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize());
setDnsMonitoring(config.isDnsMonitoring());
setDnsMonitoringInterval(config.getDnsMonitoringInterval());
}
/**
@ -93,4 +110,33 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
this.address = address;
}
/**
* Monitoring of the endpoint address for DNS changes.
* Default is false.
*
* @param dnsMonitoring
* @return
*/
public SingleServerConfig setDnsMonitoring(boolean dnsMonitoring) {
this.dnsMonitoring = dnsMonitoring;
return this;
}
public boolean isDnsMonitoring() {
return dnsMonitoring;
}
/**
* Interval in milliseconds to check the endpoint DNS if {@link #isDnsMonitoring()} is true.
* Default is 5000.
*
* @param dnsMonitoringInterval
* @return
*/
public SingleServerConfig setDnsMonitoringInterval(long dnsMonitoringInterval) {
this.dnsMonitoringInterval = dnsMonitoringInterval;
return this;
}
public long getDnsMonitoringInterval() {
return dnsMonitoringInterval;
}
}

@ -39,6 +39,7 @@ public class RedisConnection implements RedisCommands {
private volatile boolean closed;
volatile Channel channel;
private ReconnectListener reconnectListener;
private int failAttempts;
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
@ -55,9 +56,22 @@ public class RedisConnection implements RedisCommands {
return reconnectListener;
}
public void resetFailAttempt() {
failAttempts = 0;
}
public void incFailAttempt() {
failAttempts++;
}
public int getFailAttempts() {
return failAttempts;
}
public void updateChannel(Channel channel) {
this.channel = channel;
channel.attr(CONNECTION).set(this);
resetFailAttempt();
}
public RedisClient getRedisClient() {

@ -21,7 +21,9 @@ import java.util.Set;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanNotNullReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.DoubleReplayConvertor;
import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.KeyValueConvertor;
import org.redisson.client.protocol.convertor.TrueReplayConvertor;
@ -32,9 +34,12 @@ import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScoredSortedSetScanReplayDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
@ -44,6 +49,23 @@ import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
public interface RedisCommands {
RedisCommand<Boolean> ZADD = new RedisCommand<Boolean>("ZADD", new BooleanAmountReplayConvertor(), 3);
RedisCommand<Boolean> ZREM = new RedisCommand<Boolean>("ZREM", new BooleanAmountReplayConvertor(), 2);
RedisStrictCommand<Integer> ZCARD = new RedisStrictCommand<Integer>("ZCARD", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZLEXCOUNT = new RedisStrictCommand<Integer>("ZLEXCOUNT", new IntegerReplayConvertor());
RedisCommand<Boolean> ZSCORE_CONTAINS = new RedisCommand<Boolean>("ZSCORE", new BooleanNotNullReplayConvertor(), 2);
RedisStrictCommand<Double> ZSCORE = new RedisStrictCommand<Double>("ZSCORE", new DoubleReplayConvertor());
RedisCommand<Integer> ZRANK = new RedisCommand<Integer>("ZRANK", new IntegerReplayConvertor(), 2);
RedisCommand<Object> ZRANGE_SINGLE = new RedisCommand<Object>("ZRANGE", new ObjectFirstResultReplayDecoder<Object>());
RedisCommand<List<Object>> ZRANGE = new RedisCommand<List<Object>>("ZRANGE", new ObjectListReplayDecoder<Object>());
RedisStrictCommand<Integer> ZREMRANGEBYRANK = new RedisStrictCommand<Integer>("ZREMRANGEBYRANK", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZREMRANGEBYSCORE = new RedisStrictCommand<Integer>("ZREMRANGEBYSCORE", new IntegerReplayConvertor());
RedisStrictCommand<Integer> ZREMRANGEBYLEX = new RedisStrictCommand<Integer>("ZREMRANGEBYLEX", new IntegerReplayConvertor());
RedisCommand<List<Object>> ZRANGEBYLEX = new RedisCommand<List<Object>>("ZRANGEBYLEX", new ObjectListReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ScoredSortedSetScanReplayDecoder()), ValueType.OBJECT);
RedisStrictCommand<Double> ZINCRBY = new RedisStrictCommand<Double>("ZINCRBY", new DoubleReplayConvertor());
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new NestedMultiDecoder(new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY", new StringDataDecoder());
RedisStrictCommand<String> PING = new RedisStrictCommand<String>("PING");
@ -158,4 +180,5 @@ public interface RedisCommands {
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisStrictCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisStrictCommand<List<Map<String, String>>>("SENTINEL", "SLAVES", new StringMapReplayDecoder());
RedisStrictCommand<String> INFO_REPLICATION = new RedisStrictCommand<String>("INFO", "replication", new StringDataDecoder());
}

@ -0,0 +1,68 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;
public class ScoredEntry<V> {
private final Double score;
private final V value;
public ScoredEntry(Double score, V value) {
super();
this.score = score;
this.value = value;
}
public V getValue() {
return value;
}
public Double getScore() {
return score;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((score == null) ? 0 : score.hashCode());
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ScoredEntry other = (ScoredEntry) obj;
if (score == null) {
if (other.score != null)
return false;
} else if (!score.equals(other.score))
return false;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
}

@ -0,0 +1,26 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
public class BooleanNotNullReplayConvertor extends SingleConvertor<Boolean> {
@Override
public Boolean convert(Object obj) {
return obj != null;
}
}

@ -0,0 +1,29 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.convertor;
public class DoubleReplayConvertor extends SingleConvertor<Double> {
@Override
public Double convert(Object obj) {
if (obj == null) {
return null;
}
return Double.valueOf(obj.toString());
}
}

@ -19,6 +19,9 @@ public class IntegerReplayConvertor extends SingleConvertor<Integer> {
@Override
public Integer convert(Object obj) {
if (obj == null) {
return null;
}
return ((Long) obj).intValue();
}

@ -0,0 +1,41 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
public class ObjectFirstResultReplayDecoder<T> implements MultiDecoder<T> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public T decode(List<Object> parts, State state) {
return (T) parts.get(0);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -0,0 +1,47 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.ScoredEntry;
import io.netty.buffer.ByteBuf;
public class ScoredSortedSetReplayDecoder<T> implements MultiDecoder<List<ScoredEntry<T>>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public List<ScoredEntry<T>> decode(List<Object> parts, State state) {
List<ScoredEntry<T>> result = new ArrayList<ScoredEntry<T>>();
for (int i = 0; i < parts.size(); i += 2) {
result.add(new ScoredEntry<T>(((Number)parts.get(i+1)).doubleValue(), (T)parts.get(i)));
}
return result;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class ScoredSortedSetScanReplayDecoder implements MultiDecoder<ListScanResult<Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
@Override
public ListScanResult<Object> decode(List<Object> parts, State state) {
List<Object> values = (List<Object>)parts.get(1);
for (int i = 1; i < values.size(); i++) {
values.remove(i);
}
return new ListScanResult<Object>((Long)parts.get(0), values);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}
}

@ -225,7 +225,7 @@ abstract class BaseLoadBalancer implements LoadBalancer {
public void returnConnection(RedisConnection connection) {
SubscribesConnectionEntry entry = clients.get(connection.getRedisClient());
if (entry.isFreezed()) {
if (entry.isFreezed() || connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) {
connection.closeAsync();
} else {
entry.getConnections().add(connection);

@ -244,6 +244,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName());
c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());

@ -0,0 +1,175 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
import org.redisson.ElasticacheServersConfig;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
/**
* {@link ConnectionManager} for AWS ElastiCache Replication Groups. By providing all nodes
* of the replication group to this manager, the role of each node can be polled to determine
* if a failover has occurred resulting in a new master.
*
* @author Steve Ungerer
*/
public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
private static final String ROLE_KEY = "role:";
private final Logger log = LoggerFactory.getLogger(getClass());
private AtomicReference<URI> currentMaster = new AtomicReference<URI>();
private final Map<URI, RedisConnection> nodeConnections = new HashMap<URI, RedisConnection>();
private ScheduledFuture<?> monitorFuture;
private enum Role {
master,
slave
}
public ElasticacheConnectionManager(ElasticacheServersConfig cfg, Config config) {
init(config);
this.config = create(cfg);
for (URI addr : cfg.getNodeAddresses()) {
RedisConnection connection = connect(cfg, addr);
if (connection == null) {
continue;
}
Role role = determineRole(connection.sync(RedisCommands.INFO_REPLICATION));
if (Role.master.equals(role)) {
if (currentMaster.get() != null) {
throw new RedisException("Multiple masters detected");
}
currentMaster.set(addr);
log.info("{} is the master", addr);
this.config.setMasterAddress(addr);
} else {
log.info("{} is a slave", addr);
this.config.addSlaveAddress(addr);
}
}
init(this.config);
monitorRoleChange(cfg);
}
private RedisConnection connect(ElasticacheServersConfig cfg, URI addr) {
RedisConnection connection = nodeConnections.get(addr);
if (connection != null) {
return connection;
}
RedisClient client = createClient(addr.getHost(), addr.getPort(), cfg.getTimeout());
try {
connection = client.connect();
nodeConnections.put(addr, connection);
} catch (RedisConnectionException e) {
log.warn(e.getMessage(), e);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return connection;
}
private void monitorRoleChange(final ElasticacheServersConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
URI master = currentMaster.get();
log.debug("Current master: {}", master);
for (URI addr : cfg.getNodeAddresses()) {
RedisConnection connection = connect(cfg, addr);
String replInfo = connection.sync(RedisCommands.INFO_REPLICATION);
log.trace("{} repl info: {}", addr, replInfo);
Role role = determineRole(replInfo);
log.debug("node {} is {}", addr, role);
if (Role.master.equals(role) && master.equals(addr)) {
log.debug("Current master {} unchanged", master);
} else if (Role.master.equals(role) && !master.equals(addr) && currentMaster.compareAndSet(master, addr)) {
log.info("Master has changed from {} to {}", master, addr);
changeMaster(MAX_SLOT, addr.getHost(), addr.getPort());
break;
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
private Role determineRole(String data) {
for (String s : data.split("\\r\\n")) {
if (s.startsWith(ROLE_KEY)) {
return Role.valueOf(s.substring(ROLE_KEY.length()));
}
}
throw new RedisException("Cannot determine node role from provided 'INFO replication' data");
}
private MasterSlaveServersConfig create(ElasticacheServersConfig cfg) {
MasterSlaveServersConfig c = new MasterSlaveServersConfig();
c.setRetryInterval(cfg.getRetryInterval());
c.setRetryAttempts(cfg.getRetryAttempts());
c.setTimeout(cfg.getTimeout());
c.setPingTimeout(cfg.getPingTimeout());
c.setLoadBalancer(cfg.getLoadBalancer());
c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
return c;
}
@Override
public void shutdown() {
monitorFuture.cancel(true);
super.shutdown();
for (RedisConnection connection : nodeConnections.values()) {
connection.getRedisClient().shutdown();
}
}
}

@ -166,6 +166,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
if (!future.isSuccess()) {
conn.incFailAttempt();
} else {
conn.resetFailAttempt();
}
shutdownLatch.release();
timeout.cancel();
releaseWrite(slot, conn);
@ -179,6 +185,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
if (!future.isSuccess()) {
conn.incFailAttempt();
} else {
conn.resetFailAttempt();
}
shutdownLatch.release();
timeout.cancel();
releaseRead(slot, conn);

@ -175,6 +175,10 @@ public class MasterSlaveEntry {
if (!entry.getClient().equals(connection.getRedisClient())) {
connection.closeAsync();
return;
} else if (connection.getFailAttempts() == config.getCloseConnectionAfterFailAttempts()) {
connection.closeAsync();
entry.getConnectionsSemaphore().release();
return;
}
entry.getConnections().add(connection);

@ -61,6 +61,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
c.setPassword(cfg.getPassword());
c.setDatabase(cfg.getDatabase());
c.setClientName(cfg.getClientName());
c.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize());

@ -15,12 +15,29 @@
*/
package org.redisson.connection;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.SingleServerConfig;
import org.redisson.client.RedisConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
public class SingleConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final AtomicReference<InetAddress> currentMaster = new AtomicReference<InetAddress>();
private ScheduledFuture<?> monitorFuture;
public SingleConnectionManager(SingleServerConfig cfg, Config config) {
MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig();
String addr = cfg.getAddress().getHost() + ":" + cfg.getAddress().getPort();
@ -31,12 +48,23 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setPassword(cfg.getPassword());
newconfig.setDatabase(cfg.getDatabase());
newconfig.setClientName(cfg.getClientName());
newconfig.setCloseConnectionAfterFailAttempts(cfg.getCloseConnectionAfterFailAttempts());
newconfig.setMasterAddress(addr);
newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize());
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
init(newconfig, config);
if (cfg.isDnsMonitoring()) {
try {
this.currentMaster.set(InetAddress.getByName(cfg.getAddress().getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host", e);
}
log.debug("DNS monitoring enabled; Current master set to {}", currentMaster.get());
monitorDnsChange(cfg);
}
}
@Override
@ -45,4 +73,35 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
entries.put(MAX_SLOT, entry);
}
private void monitorDnsChange(final SingleServerConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
InetAddress master = currentMaster.get();
InetAddress now = InetAddress.getByName(cfg.getAddress().getHost());
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", cfg.getAddress().getHost(), master.getHostAddress(), now.getHostAddress());
if (currentMaster.compareAndSet(master, now)) {
changeMaster(MAX_SLOT,cfg.getAddress().getHost(), cfg.getAddress().getPort());
log.info("Master has been changed");
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}, cfg.getDnsMonitoringInterval(), cfg.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
}
@Override
public void shutdown() {
if (monitorFuture != null) {
monitorFuture.cancel(true);
}
super.shutdown();
}
}

@ -113,6 +113,8 @@ public interface RBatch {
*/
RAtomicLongAsync getAtomicLongAsync(String name);
<V> RScoredSortedSet<V> getScoredSortedSet(String name);
/**
* Returns script operations object
*

@ -31,8 +31,6 @@ public interface RCollectionAsync<V> extends RExpirableAsync {
Future<Boolean> removeAsync(Object o);
Future<Collection<V>> readAllAsync();
Future<Integer> sizeAsync();
Future<Boolean> addAsync(V e);

@ -0,0 +1,45 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import java.util.Set;
public interface RLexSortedSet extends RLexSortedSetAsync, Set<String>, RExpirable {
int removeRangeTailByLex(String fromElement, boolean fromInclusive);
int removeRangeHeadByLex(String toElement, boolean toInclusive);
int removeRangeByLex(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
int lexCountTail(String fromElement, boolean fromInclusive);
int lexCountHead(String toElement, boolean toInclusive);
Collection<String> lexRangeTail(String fromElement, boolean fromInclusive);
Collection<String> lexRangeHead(String toElement, boolean toInclusive);
Collection<String> lexRange(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
int lexCount(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
int rank(String o);
Collection<String> valueRange(int startIndex, int endIndex);
}

@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import io.netty.util.concurrent.Future;
public interface RLexSortedSetAsync extends RCollectionAsync<String> {
Future<Integer> removeRangeByLexAsync(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
Future<Integer> removeRangeTailByLexAsync(String fromElement, boolean fromInclusive);
Future<Integer> removeRangeHeadByLexAsync(String toElement, boolean toInclusive);
Future<Integer> lexCountTailAsync(String fromElement, boolean fromInclusive);
Future<Integer> lexCountHeadAsync(String toElement, boolean toInclusive);
Future<Collection<String>> lexRangeTailAsync(String fromElement, boolean fromInclusive);
Future<Collection<String>> lexRangeHeadAsync(String toElement, boolean toInclusive);
Future<Collection<String>> lexRangeAsync(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
Future<Integer> lexCountAsync(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);
Future<Integer> rankAsync(String o);
Future<Collection<String>> valueRangeAsync(int startIndex, int endIndex);
}

@ -0,0 +1,64 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import org.redisson.client.protocol.ScoredEntry;
public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<V>, RExpirable {
V first();
V last();
int removeRangeByScore(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
int removeRangeByRank(int startIndex, int endIndex);
int rank(V o);
Double getScore(V o);
boolean add(double score, V object);
int size();
boolean isEmpty();
boolean contains(Object o);
Object[] toArray();
<T> T[] toArray(T[] a);
boolean remove(Object o);
boolean containsAll(Collection<?> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();
Double addScore(V object, Number value);
Collection<V> valueRange(int startIndex, int endIndex);
Collection<ScoredEntry<V>> entryRange(int startIndex, int endIndex);
}

@ -0,0 +1,56 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import org.redisson.client.protocol.ScoredEntry;
import io.netty.util.concurrent.Future;
public interface RScoredSortedSetAsync<V> extends RExpirableAsync {
Future<V> firstAsync();
Future<V> lastAsync();
Future<Integer> removeRangeByRankAsync(int startIndex, int endIndex);
Future<Integer> rankAsync(V o);
Future<Double> getScoreAsync(V o);
Future<Boolean> addAsync(double score, V object);
Future<Boolean> removeAsync(V object);
Future<Integer> sizeAsync();
Future<Boolean> containsAsync(Object o);
Future<Boolean> containsAllAsync(Collection<?> c);
Future<Boolean> removeAllAsync(Collection<?> c);
Future<Boolean> retainAllAsync(Collection<?> c);
Future<Double> addScoreAsync(V object, Number value);
Future<Collection<V>> valueRangeAsync(int startIndex, int endIndex);
Future<Collection<ScoredEntry<V>>> entryRangeAsync(int startIndex, int endIndex);
}

@ -0,0 +1,461 @@
package org.redisson;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.core.RScoredSortedSet;
import org.redisson.core.RSortedSet;
import io.netty.util.concurrent.Future;
public class RedissonScoredSortedSetTest extends BaseTest {
@Test
public void testFirstLast() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
Assert.assertEquals("a", set.first());
Assert.assertEquals("d", set.last());
}
@Test
public void testRemoveRangeByScore() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
set.add(0.5, "e");
set.add(0.6, "f");
set.add(0.7, "g");
Assert.assertEquals(2, set.removeRangeByScore(0.1, false, 0.3, true));
MatcherAssert.assertThat(set, Matchers.contains("a", "d", "e", "f", "g"));
}
@Test
public void testRemoveRangeByRank() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
set.add(0.5, "e");
set.add(0.6, "f");
set.add(0.7, "g");
Assert.assertEquals(2, set.removeRangeByRank(0, 1));
MatcherAssert.assertThat(set, Matchers.contains("c", "d", "e", "f", "g"));
}
@Test
public void testRank() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
set.add(0.5, "e");
set.add(0.6, "f");
set.add(0.7, "g");
Assert.assertEquals(3, (int)set.rank("d"));
}
@Test
public void testAddAsync() throws InterruptedException, ExecutionException {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
Future<Boolean> future = set.addAsync(0.323, 2);
Assert.assertTrue(future.get());
Future<Boolean> future2 = set.addAsync(0.323, 2);
Assert.assertFalse(future2.get());
Assert.assertTrue(set.contains(2));
}
@Test
public void testRemoveAsync() throws InterruptedException, ExecutionException {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(0.11, 1);
set.add(0.22, 3);
set.add(0.33, 7);
Assert.assertTrue(set.removeAsync(1).get());
Assert.assertFalse(set.contains(1));
Assert.assertThat(set, Matchers.contains(3, 7));
Assert.assertFalse(set.removeAsync(1).get());
Assert.assertThat(set, Matchers.contains(3, 7));
set.removeAsync(3).get();
Assert.assertFalse(set.contains(3));
Assert.assertThat(set, Matchers.contains(7));
}
@Test
public void testIteratorNextNext() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(1, "1");
set.add(2, "4");
Iterator<String> iter = set.iterator();
Assert.assertEquals("1", iter.next());
Assert.assertEquals("4", iter.next());
Assert.assertFalse(iter.hasNext());
}
@Test
public void testIteratorRemove() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(1, "1");
set.add(2, "4");
set.add(3, "2");
set.add(4, "5");
set.add(5, "3");
for (Iterator<String> iterator = set.iterator(); iterator.hasNext();) {
String value = iterator.next();
if (value.equals("2")) {
iterator.remove();
}
}
Assert.assertThat(set, Matchers.contains("1", "4", "5", "3"));
int iteration = 0;
for (Iterator<String> iterator = set.iterator(); iterator.hasNext();) {
iterator.next();
iterator.remove();
iteration++;
}
Assert.assertEquals(4, iteration);
Assert.assertEquals(0, set.size());
Assert.assertTrue(set.isEmpty());
}
@Test
public void testIteratorSequence() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
for (int i = 0; i < 1000; i++) {
set.add(i, Integer.valueOf(i));
}
Set<Integer> setCopy = new HashSet<Integer>();
for (int i = 0; i < 1000; i++) {
setCopy.add(Integer.valueOf(i));
}
checkIterator(set, setCopy);
}
private void checkIterator(RScoredSortedSet<Integer> set, Set<Integer> setCopy) {
for (Iterator<Integer> iterator = set.iterator(); iterator.hasNext();) {
Integer value = iterator.next();
if (!setCopy.remove(value)) {
Assert.fail();
}
}
Assert.assertEquals(0, setCopy.size());
}
@Test
public void testRetainAll() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
for (int i = 0; i < 20000; i++) {
set.add(i, i);
}
Assert.assertTrue(set.retainAll(Arrays.asList(1, 2)));
Assert.assertThat(set, Matchers.containsInAnyOrder(1, 2));
Assert.assertEquals(2, set.size());
}
@Test
public void testRemoveAll() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(0.1, 1);
set.add(0.2, 2);
set.add(0.3, 3);
Assert.assertTrue(set.removeAll(Arrays.asList(1, 2)));
Assert.assertThat(set, Matchers.contains(3));
Assert.assertEquals(1, set.size());
}
// @Test(expected = IllegalArgumentException.class)
public void testTailSet() {
RSortedSet<Integer> set = redisson.getSortedSet("set");
set.add(1);
set.add(2);
set.add(3);
set.add(4);
set.add(5);
SortedSet<Integer> hs = set.tailSet(3);
hs.add(10);
MatcherAssert.assertThat(hs, Matchers.contains(3, 4, 5, 10));
set.remove(4);
MatcherAssert.assertThat(hs, Matchers.contains(3, 5, 10));
set.remove(3);
MatcherAssert.assertThat(hs, Matchers.contains(5, 10));
hs.add(-1);
}
// @Test(expected = IllegalArgumentException.class)
public void testHeadSet() {
RSortedSet<Integer> set = redisson.getSortedSet("set");
set.add(1);
set.add(2);
set.add(3);
set.add(4);
set.add(5);
SortedSet<Integer> hs = set.headSet(3);
hs.add(0);
MatcherAssert.assertThat(hs, Matchers.contains(0, 1, 2));
set.remove(2);
MatcherAssert.assertThat(hs, Matchers.contains(0, 1));
set.remove(3);
MatcherAssert.assertThat(hs, Matchers.contains(0, 1));
hs.add(7);
}
@Test(expected = IllegalArgumentException.class)
public void testTailSetTreeSet() {
TreeSet<Integer> set = new TreeSet<Integer>();
set.add(1);
set.add(2);
set.add(3);
set.add(4);
set.add(5);
SortedSet<Integer> hs = set.tailSet(3);
hs.add(10);
MatcherAssert.assertThat(hs, Matchers.contains(3, 4, 5, 10));
set.remove(4);
MatcherAssert.assertThat(hs, Matchers.contains(3, 5, 10));
set.remove(3);
MatcherAssert.assertThat(hs, Matchers.contains(5, 10));
hs.add(-1);
}
@Test(expected = IllegalArgumentException.class)
public void testHeadSetTreeSet() {
TreeSet<Integer> set = new TreeSet<Integer>();
set.add(1);
set.add(2);
set.add(3);
set.add(4);
set.add(5);
SortedSet<Integer> hs = set.headSet(3);
hs.add(0);
MatcherAssert.assertThat(hs, Matchers.contains(0, 1, 2));
set.remove(2);
MatcherAssert.assertThat(hs, Matchers.contains(0, 1));
set.remove(3);
MatcherAssert.assertThat(hs, Matchers.contains(0, 1));
hs.add(7);
}
@Test
public void testSort() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
Assert.assertTrue(set.add(4, 2));
Assert.assertTrue(set.add(5, 3));
Assert.assertTrue(set.add(3, 1));
Assert.assertTrue(set.add(6, 4));
Assert.assertTrue(set.add(1000, 10));
Assert.assertTrue(set.add(1, -1));
Assert.assertTrue(set.add(2, 0));
MatcherAssert.assertThat(set, Matchers.contains(-1, 0, 1, 2, 3, 4, 10));
// Assert.assertEquals(-1, (int)set.first());
// Assert.assertEquals(10, (int)set.last());
}
@Test
public void testRemove() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(4, 5);
set.add(2, 3);
set.add(0, 1);
set.add(1, 2);
set.add(3, 4);
Assert.assertFalse(set.remove(0));
Assert.assertTrue(set.remove(3));
Assert.assertThat(set, Matchers.contains(1, 2, 4, 5));
}
@Test
public void testContainsAll() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
for (int i = 0; i < 200; i++) {
set.add(i, i);
}
Assert.assertTrue(set.containsAll(Arrays.asList(30, 11)));
Assert.assertFalse(set.containsAll(Arrays.asList(30, 711, 11)));
}
@Test
public void testToArray() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0, "1");
set.add(1, "4");
set.add(2, "2");
set.add(3, "5");
set.add(4, "3");
MatcherAssert.assertThat(Arrays.asList(set.toArray()), Matchers.<Object>containsInAnyOrder("1", "2", "4", "5", "3"));
String[] strs = set.toArray(new String[0]);
MatcherAssert.assertThat(Arrays.asList(strs), Matchers.containsInAnyOrder("1", "4", "2", "5", "3"));
}
@Test
public void testContains() {
RScoredSortedSet<TestObject> set = redisson.getScoredSortedSet("simple");
set.add(0, new TestObject("1", "2"));
set.add(1, new TestObject("1", "2"));
set.add(2, new TestObject("2", "3"));
set.add(3, new TestObject("3", "4"));
set.add(4, new TestObject("5", "6"));
Assert.assertTrue(set.contains(new TestObject("2", "3")));
Assert.assertTrue(set.contains(new TestObject("1", "2")));
Assert.assertFalse(set.contains(new TestObject("1", "9")));
}
@Test
public void testDuplicates() {
RScoredSortedSet<TestObject> set = redisson.getScoredSortedSet("simple");
Assert.assertTrue(set.add(0, new TestObject("1", "2")));
Assert.assertFalse(set.add(0, new TestObject("1", "2")));
Assert.assertTrue(set.add(2, new TestObject("2", "3")));
Assert.assertTrue(set.add(3, new TestObject("3", "4")));
Assert.assertTrue(set.add(4, new TestObject("5", "6")));
Assert.assertEquals(4, set.size());
}
@Test
public void testSize() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(0, 1);
set.add(1, 2);
set.add(2, 3);
set.add(2, 3);
set.add(3, 4);
set.add(4, 5);
set.add(4, 5);
Assert.assertEquals(5, set.size());
}
@Test
public void testValueRange() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(0, 1);
set.add(1, 2);
set.add(2, 3);
set.add(3, 4);
set.add(4, 5);
set.add(4, 5);
Collection<Integer> vals = set.valueRange(0, -1);
MatcherAssert.assertThat(vals, Matchers.contains(1, 2, 3, 4, 5));
}
@Test
public void testEntryRange() {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(10, 1);
set.add(20, 2);
set.add(30, 3);
set.add(40, 4);
set.add(50, 5);
Collection<ScoredEntry<Integer>> vals = set.entryRange(0, -1);
MatcherAssert.assertThat(vals, Matchers.contains(new ScoredEntry<Integer>(10D, 1),
new ScoredEntry<Integer>(20D, 2),
new ScoredEntry<Integer>(30D, 3),
new ScoredEntry<Integer>(40D, 4),
new ScoredEntry<Integer>(50D, 5)));
}
@Test
public void testAddAndGet() throws InterruptedException {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple");
set.add(1, 100);
Double res = set.addScore(100, 11);
Assert.assertEquals(12, (double)res, 0);
Double score = set.getScore(100);
Assert.assertEquals(12, (double)score, 0);
RScoredSortedSet<Integer> set2 = redisson.getScoredSortedSet("simple");
set2.add(100.2, 1);
Double res2 = set2.addScore(1, new Double(12.1));
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
res2 = set2.getScore(1);
Assert.assertTrue(new Double(112.3).compareTo(res2) == 0);
}
}
Loading…
Cancel
Save