Merge remote-tracking branch 'mrniko/master'

pull/469/head
jackygurui 9 years ago
commit 9f664bbaba

@ -2,6 +2,26 @@ Redisson Releases History
================================
####Please Note: trunk is current development branch.
####04-Mar-2016 - version 2.2.9 released
Feature - __new object added__ `RSetMultimap`
Feature - __new object added__ `RListMultimap`
Feature - `valueRangeReversed` and `valueRangeReversedAsync` methods were added to `RScoredSortedSet` object
Improvement - Throw `RedisOutOfMemoryException` when OOM error from Redis server has occured
Improvement - Node type added to optimization in Cluster mode
Improvement - Add DynamicImport-Package to OSGi headers
Fixed - `RedissonSpringCacheManager` Sentinel compatibility
Fixed - `RAtomicLong.compareAndSet` doesn't work when expected value is 0 and it wasn't initialized
####12-Feb-2016 - version 2.2.8 released
Feature - `union`, `unionAsync`, `readUnion` and `readUnionAsync` methods were added to `RSet` object
Feature - `readAll` and `readAllAsync` methods were added to `RSetCache` object
Improvement - `RKeys.delete` optimization in Cluster mode
Fixed - Script error during `RSetCache.toArray` and `RSetCache.readAll` methods invocation
Fixed - Sentinel doesn't support AUTH command
Fixed - RMap iterator
####03-Feb-2016 - version 2.2.7 released
Feature - `readAllKeySet`, `readAllValues`, `readAllEntry`, `readAllKeySetAsync`, `readAllValuesAsync`, `readAllEntryAsync` methods were added to `RMap` object

@ -1,7 +1,9 @@
Redisson - distributed and scalable Java objects powered by Redis. Advanced Java Redis client
====
[![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/) please take part in [Redisson survey](https://ru.surveymonkey.com/r/LP7RG8Q)
[![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/)
##Please take part in [Redisson survey](https://ru.surveymonkey.com/r/LP7RG8Q)
Use familiar Java data structures with power of [Redis](http://redis.io).
@ -22,20 +24,18 @@ Features
* [AWS ElastiCache](https://aws.amazon.com/elasticache/) servers mode:
1. automatic new master server discovery
2. automatic new slave servers discovery
3. read data using slave servers, write data using master server
* Cluster servers mode:
1. automatic master and slave discovery
1. automatic master and slave servers discovery
2. automatic new master server discovery
3. automatic new slave servers discovery
4. automatic slots change discovery
5. read data using slave servers, write data using master server
4. automatic slave servers offline/online discovery
5. automatic slots change discovery
* Sentinel servers mode:
1. automatic master and slave servers discovery
2. automatic new master server discovery
3. automatic new slave servers discovery
4. automatic slave servers offline/online discovery
5. automatic sentinel servers discovery
6. read data using slave servers, write data using master server
* Master with Slave servers mode: read data using slave servers, write data using master server
* Single server mode: read and write data using single server
* Lua scripting
@ -72,30 +72,7 @@ Features
Projects using Redisson
================================
Setronica: [setronica.com](http://setronica.com/)
Monits: [monits.com](http://monits.com/)
Brookhaven National Laboratory: [bnl.gov](http://bnl.gov/)
Netflix Dyno client: [dyno] (https://github.com/Netflix/dyno)
武林Q传: [nbrpg.com](http://www.nbrpg.com/)
Ocous: [ocous.com](http://www.ocous.com/)
Invaluable: [invaluable.com](http://www.invaluable.com/)
Latest version changelog
=================================
####03-Feb-2016 - version 2.2.7 released
Feature - `readAllKeySet`, `readAllValues`, `readAllEntry`, `readAllKeySetAsync`, `readAllValuesAsync`, `readAllEntryAsync` methods were added to `RMap` object
Improvement - `RKeys.delete` optimization in Cluster mode
Fixed - minimal connections amount initialization
Fixed - `RKeys.deleteByPattern` throws an error in cluster mode
Fixed - `RKeys.deleteAsync` throws error in Cluster mode
Fixed - Redisson failed to start when one of sentinel servers is down
Fixed - Redisson failed to start when there are no slaves in Sentinel mode
Fixed - slave nodes up/down state discovery in Cluster mode
Fixed - slave can stay freezed when it has been just added in Sentinel mode
Fixed - offline slaves handling during Redisson start in Sentinel mode
Fixed - `SELECT` command can't be executed in Sentinel mode
Fixed - `database` setting removed from cluster config
[Setronica](http://setronica.com/), [Monits](http://monits.com/), [Brookhaven National Laboratory](http://bnl.gov/), [Netflix Dyno client] (https://github.com/Netflix/dyno), [武林Q传](http://www.nbrpg.com/), [Ocous](http://www.ocous.com/), [Invaluable](http://www.invaluable.com/), [Clover](https://www.clover.com/)
### Maven
@ -104,12 +81,12 @@ Include the following to your dependency list:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.2.7</version>
<version>2.2.9</version>
</dependency>
### Gradle
compile 'org.redisson:redisson:2.2.7'
compile 'org.redisson:redisson:2.2.9'
### Supported by

@ -127,7 +127,6 @@
<!-- See http://checkstyle.sf.net/config_whitespace.html -->
<module name="EmptyForIteratorPad"/>
<module name="MethodParamPad"/>
<module name="NoWhitespaceAfter"/>
<module name="NoWhitespaceBefore"/>
<module name="OperatorWrap"/>
<module name="ParenPad"/>

@ -3,7 +3,7 @@
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.2.8-SNAPSHOT</version>
<version>2.2.10-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Redisson</name>
@ -364,6 +364,7 @@
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<DynamicImport-Package>*</DynamicImport-Package>
</instructions>
</configuration>
</plugin>

@ -30,6 +30,7 @@ import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.RedisClientEntry;
import org.redisson.core.Node;
import org.redisson.core.NodeType;
import org.redisson.core.NodesGroup;
import io.netty.util.concurrent.Future;
@ -44,6 +45,19 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
this.connectionManager = connectionManager;
}
@Override
public Collection<N> getNodes(NodeType type) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();
List<N> result = new ArrayList<N>();
for (N node : clients) {
if (node.getType().equals(type)) {
result.add(node);
}
}
return result;
}
@Override
public Collection<N> getNodes() {
return (Collection<N>) connectionManager.getClients();

@ -53,6 +53,7 @@ import org.redisson.core.RHyperLogLog;
import org.redisson.core.RKeys;
import org.redisson.core.RLexSortedSet;
import org.redisson.core.RList;
import org.redisson.core.RListMultimap;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RMapCache;
@ -64,6 +65,7 @@ import org.redisson.core.RScript;
import org.redisson.core.RSemaphore;
import org.redisson.core.RSet;
import org.redisson.core.RSetCache;
import org.redisson.core.RSetMultimap;
import org.redisson.core.RSortedSet;
import org.redisson.core.RTopic;
@ -244,11 +246,31 @@ public class Redisson implements RedissonClient {
return new RedissonList<V>(codec, commandExecutor, name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(commandExecutor, name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(codec, commandExecutor, name);
}
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(commandExecutor, name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(commandExecutor, name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(codec, commandExecutor, name);
}
@Override
public <V> RSetCache<V> getSetCache(String name) {
return new RedissonSetCache<V>(evictionScheduler, commandExecutor, name);

@ -57,11 +57,14 @@ public class RedissonAtomicLong extends RedissonExpirable implements RAtomicLong
@Override
public Future<Boolean> compareAndSetAsync(long expect, long update) {
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
"local currValue = redis.call('get', KEYS[1]); "
+ "if currValue == ARGV[1] "
+ "or (tonumber(ARGV[1]) == 0 and currValue == false) then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return 1 "
+ "else "
+ "return 0 end",
+ "return 0 "
+ "end",
Collections.<Object>singletonList(getName()), expect, update);
}

@ -0,0 +1,126 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> iter;
protected long iterPos = 0;
protected InetSocketAddress client;
private boolean finished;
private boolean removeExecuted;
protected Map.Entry<ScanObjectEntry, ScanObjectEntry> entry;
@Override
public boolean hasNext() {
if (finished) {
return false;
}
if (iter == null || !iter.hasNext()) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = iterator();
client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) {
firstValues = convert(res.getMap());
} else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
if (newValues.equals(firstValues)) {
finished = true;
free(firstValues);
free(newValues);
firstValues = null;
return false;
}
free(newValues);
}
iter = res.getMap().entrySet().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
protected abstract MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator();
private void free(Map<ByteBuf, ByteBuf> map) {
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();
}
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
}
return result;
}
@Override
public M next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
entry = iter.next();
removeExecuted = false;
return getValue(entry);
}
@SuppressWarnings("unchecked")
M getValue(final Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) {
@Override
public V setValue(V value) {
return put(entry, value);
}
};
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
// lazy init iterator
hasNext();
iter.remove();
removeKey();
removeExecuted = true;
}
protected abstract void removeKey();
protected abstract V put(Entry<ScanObjectEntry, ScanObjectEntry> entry, V value);
}

@ -33,12 +33,14 @@ import org.redisson.core.RHyperLogLogAsync;
import org.redisson.core.RKeysAsync;
import org.redisson.core.RLexSortedSetAsync;
import org.redisson.core.RListAsync;
import org.redisson.core.RListMultimap;
import org.redisson.core.RMapAsync;
import org.redisson.core.RQueueAsync;
import org.redisson.core.RScoredSortedSetAsync;
import org.redisson.core.RScriptAsync;
import org.redisson.core.RSetAsync;
import org.redisson.core.RSetCacheAsync;
import org.redisson.core.RSetMultimap;
import org.redisson.core.RTopicAsync;
import io.netty.util.concurrent.Future;
@ -229,5 +231,25 @@ public class RedissonBatch implements RBatch {
return executorService.executeAsync();
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(executorService, name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(codec, executorService, name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(executorService, name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(codec, executorService, name);
}
}

@ -31,7 +31,7 @@ import io.netty.util.concurrent.Future;
public class RedissonBitSet extends RedissonExpirable implements RBitSet {
protected RedissonBitSet(CommandAsyncExecutor connectionManager, String name) {
public RedissonBitSet(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}

@ -125,26 +125,6 @@ public class RedissonBucket<V> extends RedissonExpirable implements RBucket<V> {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
}
/**
* Use {@link #isExistsAsync()}
*
* @return
*/
@Deprecated
public Future<Boolean> existsAsync() {
return isExistsAsync();
}
/**
* Use {@link #isExists()}
*
* @return
*/
@Deprecated
public boolean exists() {
return isExists();
}
@Override
public Future<Boolean> trySetAsync(V value) {
if (value == null) {

@ -38,6 +38,7 @@ import org.redisson.core.RHyperLogLog;
import org.redisson.core.RKeys;
import org.redisson.core.RLexSortedSet;
import org.redisson.core.RList;
import org.redisson.core.RListMultimap;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RPatternTopic;
@ -48,6 +49,7 @@ import org.redisson.core.RScript;
import org.redisson.core.RSemaphore;
import org.redisson.core.RSet;
import org.redisson.core.RSetCache;
import org.redisson.core.RSetMultimap;
import org.redisson.core.RSortedSet;
import org.redisson.core.RTopic;
@ -215,6 +217,24 @@ public interface RedissonClient {
*/
<V> RList<V> getList(String name, Codec codec);
/**
* Returns List based MultiMap instance by name.
*
* @param name
* @return
*/
<K, V> RListMultimap<K, V> getListMultimap(String name);
/**
* Returns List based MultiMap instance by name
* using provided codec for both map keys and values.
*
* @param name
* @param codec
* @return
*/
<K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec);
/**
* Returns map instance by name.
*
@ -233,6 +253,24 @@ public interface RedissonClient {
*/
<K, V> RMap<K, V> getMap(String name, Codec codec);
/**
* Returns Set based MultiMap instance by name.
*
* @param name
* @return
*/
<K, V> RSetMultimap<K, V> getSetMultimap(String name);
/**
* Returns Set based MultiMap instance by name
* using provided codec for both map keys and values.
*
* @param name
* @param codec
* @return
*/
<K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec);
/**
* Returns semaphore instance by name
*

@ -0,0 +1,292 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RListMultimap;
import io.netty.util.concurrent.Future;
/**
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class RedissonListMultimap<K, V> extends RedissonMultimap<K, V> implements RListMultimap<K, V> {
private static final RedisStrictCommand<Boolean> LLEN_VALUE = new RedisStrictCommand<Boolean>("LLEN", new BooleanAmountReplayConvertor());
RedissonListMultimap(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
RedissonListMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
public Future<Integer> sizeAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_INTEGER,
"local keys = redis.call('hgetall', KEYS[1]); " +
"local size = 0; " +
"for i, v in ipairs(keys) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"size = size + redis.call('llen', name); " +
"end;" +
"end; " +
"return size; ",
Arrays.<Object>asList(getName()));
}
public Future<Boolean> containsKeyAsync(Object key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.readAsync(getName(), codec, LLEN_VALUE, setName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> containsValueAsync(Object value) {
try {
byte[] valueState = codec.getMapValueEncoder().encode(value);
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local keys = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(keys) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"local items = redis.call('lrange', name, 0, -1) " +
"for i=1,#items do " +
"if items[i] == ARGV[1] then " +
"return 1; " +
"end; " +
"end; " +
"end;" +
"end; " +
"return 0; ",
Arrays.<Object>asList(getName()), valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean containsEntry(Object key, Object value) {
return get(containsEntryAsync(key, value));
}
public Future<Boolean> containsEntryAsync(Object key, Object value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
byte[] valueState = codec.getMapValueEncoder().encode(value);
String setName = getValuesName(keyHash);
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
"for i=1,#items do " +
"if items[i] == ARGV[1] then " +
"return 1; " +
"end; " +
"end; " +
"return 0; ",
Collections.<Object>singletonList(setName), valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean put(K key, V value) {
return get(putAsync(key, value));
}
public Future<Boolean> putAsync(K key, V value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
byte[] valueState = codec.getMapValueEncoder().encode(value);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"redis.call('rpush', KEYS[2], ARGV[3]); " +
"return 1; ",
Arrays.<Object>asList(getName(), setName), keyState, keyHash, valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> removeAsync(Object key, Object value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
byte[] valueState = codec.getMapValueEncoder().encode(value);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local res = redis.call('lrem', KEYS[2], 1, ARGV[2]); "
+ "if res == 1 and redis.call('llen', KEYS[2]) == 0 then "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "end; "
+ "return res; ",
Arrays.<Object>asList(getName(), setName), keyState, valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> putAllAsync(K key, Iterable<? extends V> values) {
try {
List<Object> params = new ArrayList<Object>();
byte[] keyState = codec.getMapKeyEncoder().encode(key);
params.add(keyState);
String keyHash = hash(keyState);
params.add(keyHash);
for (Object value : values) {
byte[] valueState = codec.getMapValueEncoder().encode(value);
params.add(valueState);
}
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_AMOUNT,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"return redis.call('rpush', KEYS[2], unpack(ARGV, 3, #ARGV)); ",
Arrays.<Object>asList(getName(), setName), params.toArray());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public List<V> get(K key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return new RedissonList<V>(codec, commandExecutor, setName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public List<V> getAll(K key) {
return (List<V>) get(getAllAsync(key));
}
public Future<Collection<V>> getAllAsync(K key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.readAsync(getName(), codec, RedisCommands.LRANGE, setName, 0, -1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public List<V> removeAll(Object key) {
return (List<V>) get(removeAllAsync(key));
}
public Future<Collection<V>> removeAllAsync(Object key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"redis.call('hdel', KEYS[1], ARGV[1]); " +
"local members = redis.call('lrange', KEYS[2], 0, -1); " +
"redis.call('del', KEYS[2]); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), keyState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public List<V> replaceValues(K key, Iterable<? extends V> values) {
return (List<V>) get(replaceValuesAsync(key, values));
}
public Future<Collection<V>> replaceValuesAsync(K key, Iterable<? extends V> values) {
try {
List<Object> params = new ArrayList<Object>();
byte[] keyState = codec.getMapKeyEncoder().encode(key);
params.add(keyState);
String keyHash = hash(keyState);
params.add(keyHash);
for (Object value : values) {
byte[] valueState = codec.getMapValueEncoder().encode(value);
params.add(valueState);
}
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"local members = redis.call('lrange', KEYS[2], 0, -1); " +
"redis.call('del', KEYS[2]); " +
"redis.call('rpush', KEYS[2], unpack(ARGV, 3, #ARGV)); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), params.toArray());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Iterator<V> valuesIterator() {
return new RedissonListMultimapIterator<K, V, V>(this, commandExecutor, codec) {
@Override
V getValue(V entry) {
return (V) entry;
}
};
}
RedissonMultiMapIterator<K, V, Entry<K, V>> entryIterator() {
return new RedissonListMultimapIterator<K, V, Map.Entry<K, V>>(this, commandExecutor, codec);
}
}

@ -0,0 +1,35 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Iterator;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
public class RedissonListMultimapIterator<K, V, M> extends RedissonMultiMapIterator<K, V, M> {
public RedissonListMultimapIterator(RedissonMultimap<K, V> map, CommandAsyncExecutor commandExecutor, Codec codec) {
super(map, commandExecutor, codec);
}
@Override
protected Iterator<V> getIterator(String name) {
RedissonList<V> set = new RedissonList<V>(codec, commandExecutor, map.getValuesName(name));
return set.iterator();
}
}

@ -577,7 +577,9 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return;
}
futureRef.get().cancel(false);
if (futureRef.get() != null) {
futureRef.get().cancel(false);
}
tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
}

@ -91,7 +91,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public Future<Boolean> containsKeyAsync(Object key) {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_KEY,
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_KEY,
"local value = redis.call('hget', KEYS[1], ARGV[2]); " +
"local expireDate = 92233720368547758; " +
"if value ~= false then " +
@ -122,7 +122,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public Future<Boolean> containsValueAsync(Object value) {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS_VALUE,
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS_VALUE,
"local s = redis.call('hgetall', KEYS[1]); "
+ "for i, v in ipairs(s) do "
+ "if i % 2 == 0 then "
@ -166,7 +166,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
args.add(System.currentTimeMillis());
args.addAll(keys);
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE),
return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" +
"local expireIdleHead = redis.call('zrange', KEYS[3], 0, 0, 'withscores');" +
"local maxDate = table.remove(ARGV, 1); " // index is the first parameter
@ -296,7 +296,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public Future<V> getAsync(K key) {
return commandExecutor.evalReadAsync(getName(), codec, EVAL_GET_TTL,
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
+ "return nil; "

@ -15,30 +15,12 @@
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
public class RedissonMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstValues;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> iter;
private long iterPos = 0;
private InetSocketAddress client;
private boolean finished;
private boolean removeExecuted;
private Map.Entry<ScanObjectEntry, ScanObjectEntry> entry;
public class RedissonMapIterator<K, V, M> extends RedissonBaseMapIterator<K, V, M> {
private final RedissonMap<K, V> map;
@ -46,82 +28,16 @@ public class RedissonMapIterator<K, V, M> implements Iterator<M> {
this.map = map;
}
@Override
public boolean hasNext() {
if (finished) {
return false;
}
if (iter == null || !iter.hasNext()) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = map.scanIterator(client, iterPos);
client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) {
firstValues = convert(res.getMap());
} else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
if (newValues.equals(firstValues)) {
finished = true;
free(firstValues);
free(newValues);
firstValues = null;
return false;
}
free(newValues);
}
iter = res.getMap().entrySet().iterator();
iterPos = res.getPos();
}
return iter.hasNext();
}
private void free(Map<ByteBuf, ByteBuf> map) {
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();
}
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
}
return result;
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() {
return map.scanIterator(client, iterPos);
}
@Override
public M next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
entry = iter.next();
removeExecuted = false;
return getValue(entry);
}
@SuppressWarnings("unchecked")
M getValue(final Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) {
@Override
public V setValue(V value) {
return map.put((K) entry.getKey().getObj(), value);
}
};
protected void removeKey() {
map.fastRemove((K)entry.getKey().getObj());
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
// lazy init iterator
hasNext();
iter.remove();
map.fastRemove((K)entry.getKey().getObj());
removeExecuted = true;
protected V put(Entry<ScanObjectEntry, ScanObjectEntry> entry, V value) {
return map.put((K) entry.getKey().getObj(), value);
}
}

@ -0,0 +1,160 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import io.netty.buffer.ByteBuf;
abstract class RedissonMultiMapIterator<K, V, M> implements Iterator<M> {
private Map<ByteBuf, ByteBuf> firstKeys;
private Iterator<Map.Entry<ScanObjectEntry, ScanObjectEntry>> keysIter;
protected long keysIterPos = 0;
private K currentKey;
private Iterator<V> valuesIter;
protected long valuesIterPos = 0;
protected InetSocketAddress client;
private boolean finished;
private boolean removeExecuted;
protected V entry;
final RedissonMultimap<K, V> map;
final CommandAsyncExecutor commandExecutor;
final Codec codec;
public RedissonMultiMapIterator(RedissonMultimap<K, V> map, CommandAsyncExecutor commandExecutor, Codec codec) {
this.map = map;
this.commandExecutor = commandExecutor;
this.codec = codec;
}
@Override
public boolean hasNext() {
if (finished) {
return false;
}
if (valuesIter != null && valuesIter.hasNext()) {
return true;
}
if (keysIter == null || !keysIter.hasNext()) {
MapScanResult<ScanObjectEntry, ScanObjectEntry> res = map.scanIterator(client, keysIterPos);
client = res.getRedisClient();
if (keysIterPos == 0 && firstKeys == null) {
firstKeys = convert(res.getMap());
} else {
Map<ByteBuf, ByteBuf> newValues = convert(res.getMap());
if (newValues.equals(firstKeys)) {
finished = true;
free(firstKeys);
free(newValues);
firstKeys = null;
return false;
}
free(newValues);
}
keysIter = res.getMap().entrySet().iterator();
keysIterPos = res.getPos();
}
while (true) {
if (keysIter.hasNext()) {
Entry<ScanObjectEntry, ScanObjectEntry> e = keysIter.next();
currentKey = (K) e.getKey().getObj();
String name = e.getValue().getObj().toString();
valuesIter = getIterator(name);
if (valuesIter.hasNext()) {
return true;
}
} else {
return false;
}
}
}
protected abstract Iterator<V> getIterator(String name);
private void free(Map<ByteBuf, ByteBuf> map) {
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();
}
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
}
return result;
}
@Override
public M next() {
if (!hasNext()) {
throw new NoSuchElementException("No such element at index");
}
entry = valuesIter.next();
removeExecuted = false;
return getValue(entry);
}
@SuppressWarnings("unchecked")
M getValue(V entry) {
return (M)new AbstractMap.SimpleEntry<K, V>(currentKey, entry) {
@Override
public V setValue(V value) {
throw new UnsupportedOperationException();
}
};
}
@Override
public void remove() {
if (removeExecuted) {
throw new IllegalStateException("Element been already deleted");
}
// lazy init iterator
hasNext();
keysIter.remove();
map.remove(currentKey, entry);
removeExecuted = true;
}
}

@ -0,0 +1,44 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Map.Entry;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
public class RedissonMultiMapKeysIterator<K, V, M> extends RedissonBaseMapIterator<K, V, M> {
private final RedissonMultimap<K, V> map;
public RedissonMultiMapKeysIterator(RedissonMultimap<K, V> map) {
this.map = map;
}
protected MapScanResult<ScanObjectEntry, ScanObjectEntry> iterator() {
return map.scanIterator(client, iterPos);
}
protected void removeKey() {
map.fastRemove((K)entry.getKey().getObj());
}
protected V put(Entry<ScanObjectEntry, ScanObjectEntry> entry, V value) {
map.put((K) entry.getKey().getObj(), value);
return null;
}
}

@ -0,0 +1,289 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractCollection;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RMultimap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import net.openhft.hashing.LongHashFunction;
/**
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public abstract class RedissonMultimap<K, V> extends RedissonExpirable implements RMultimap<K, V> {
RedissonMultimap(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
RedissonMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
protected String hash(byte[] objectState) {
long h1 = LongHashFunction.farmUo().hashBytes(objectState);
long h2 = LongHashFunction.xx_r39().hashBytes(objectState);
ByteBuf buf = Unpooled.buffer((2 * Long.SIZE) / Byte.SIZE).writeLong(h1).writeLong(h2);
ByteBuf b = Base64.encode(buf);
String s = b.toString(CharsetUtil.UTF_8);
b.release();
buf.release();
return s.substring(0, s.length() - 2);
}
@Override
public int size() {
return get(sizeAsync());
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean containsKey(Object key) {
return get(containsKeyAsync(key));
}
@Override
public boolean containsValue(Object value) {
return get(containsValueAsync(value));
}
@Override
public boolean containsEntry(Object key, Object value) {
return get(containsEntryAsync(key, value));
}
@Override
public boolean put(K key, V value) {
return get(putAsync(key, value));
}
String getValuesName(String hash) {
return "{" + getName() + "}:" + hash;
}
@Override
public boolean remove(Object key, Object value) {
return get(removeAsync(key, value));
}
@Override
public boolean putAll(K key, Iterable<? extends V> values) {
return get(putAllAsync(key, values));
}
@Override
public void clear() {
delete();
}
@Override
public Set<K> keySet() {
return new KeySet();
}
@Override
public Collection<V> values() {
return new Values();
}
@Override
public Collection<V> getAll(K key) {
return get(getAllAsync(key));
}
@Override
public Collection<V> removeAll(Object key) {
return get(removeAllAsync(key));
}
@Override
public Collection<V> replaceValues(K key, Iterable<? extends V> values) {
return get(replaceValuesAsync(key, values));
}
@Override
public Collection<Entry<K, V>> entries() {
return new EntrySet();
}
public long fastRemove(K ... keys) {
return get(fastRemoveAsync(keys));
}
public Future<Long> fastRemoveAsync(K ... keys) {
if (keys == null || keys.length == 0) {
return newSucceededFuture(0L);
}
try {
List<Object> args = new ArrayList<Object>(keys.length*2);
List<Object> hashes = new ArrayList<Object>();
for (K key : keys) {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
args.add(keyState);
String keyHash = hash(keyState);
String name = getValuesName(keyHash);
hashes.add(name);
}
args.addAll(hashes);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local res = redis.call('hdel', KEYS[1], unpack(ARGV, 1, #ARGV/2)); " +
"if res > 0 then " +
"redis.call('del', unpack(ARGV, #ARGV/2, #ARGV)); " +
"end; " +
"return res; ",
Collections.<Object>singletonList(getName()), args.toArray());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
return get(f);
}
abstract Iterator<V> valuesIterator();
abstract RedissonMultiMapIterator<K, V, Entry<K, V>> entryIterator();
final class KeySet extends AbstractSet<K> {
@Override
public Iterator<K> iterator() {
return new RedissonMultiMapKeysIterator<K, V, K>(RedissonMultimap.this) {
@Override
K getValue(java.util.Map.Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
}
};
}
@Override
public boolean contains(Object o) {
return RedissonMultimap.this.containsKey(o);
}
@Override
public boolean remove(Object o) {
return RedissonMultimap.this.fastRemove((K)o) == 1;
}
@Override
public int size() {
return RedissonMultimap.this.size();
}
@Override
public void clear() {
RedissonMultimap.this.clear();
}
}
final class Values extends AbstractCollection<V> {
@Override
public Iterator<V> iterator() {
return valuesIterator();
}
@Override
public boolean contains(Object o) {
return RedissonMultimap.this.containsValue(o);
}
@Override
public int size() {
return RedissonMultimap.this.size();
}
@Override
public void clear() {
RedissonMultimap.this.clear();
}
}
final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
public final Iterator<Map.Entry<K,V>> iterator() {
return entryIterator();
}
public final boolean contains(Object o) {
if (!(o instanceof Map.Entry))
return false;
Map.Entry<?,?> e = (Map.Entry<?,?>) o;
return containsEntry(e.getKey(), e.getValue());
}
public final boolean remove(Object o) {
if (o instanceof Map.Entry) {
Map.Entry<?,?> e = (Map.Entry<?,?>) o;
Object key = e.getKey();
Object value = e.getValue();
return RedissonMultimap.this.remove(key, value);
}
return false;
}
public final int size() {
return RedissonMultimap.this.size();
}
public final void clear() {
RedissonMultimap.this.clear();
}
}
}

@ -165,16 +165,22 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Integer> removeRangeByScoreAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZREMRANGEBYSCORE, getName(), startValue, endValue);
}
private String value(String element, boolean inclusive) {
private String value(double score, boolean inclusive) {
StringBuilder element = new StringBuilder();
if (!inclusive) {
element = "(" + element;
element.append("(");
}
return element;
if (Double.isInfinite(score)) {
element.append(score > 0 ? "+inf" : "-inf");
} else {
element.append(BigDecimal.valueOf(score).toPlainString());
}
return element.toString();
}
@Override
@ -406,11 +412,26 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Collection<V>> valueRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), startValue, endValue);
}
@Override
public Collection<V> valueRangeReversed(double startScore, boolean startScoreInclusive, double endScore,
boolean endScoreInclusive) {
return get(valueRangeReversedAsync(startScore, startScoreInclusive, endScore, endScoreInclusive));
}
@Override
public Future<Collection<V>> valueRangeReversedAsync(double startScore, boolean startScoreInclusive, double endScore,
boolean endScoreInclusive) {
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREVRANGEBYSCORE, getName(), endValue, startValue);
}
@Override
public Collection<ScoredEntry<V>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
return get(entryRangeAsync(startScore, startScoreInclusive, endScore, endScoreInclusive));
@ -418,8 +439,8 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Collection<ScoredEntry<V>>> entryRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE_ENTRY, getName(), startValue, endValue, "WITHSCORES");
}
@ -430,11 +451,23 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Collection<V>> valueRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), startValue, endValue, "LIMIT", offset, count);
}
@Override
public Collection<V> valueRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) {
return get(valueRangeReversedAsync(startScore, startScoreInclusive, endScore, endScoreInclusive, offset, count));
}
@Override
public Future<Collection<V>> valueRangeReversedAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) {
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREVRANGEBYSCORE, getName(), endValue, startValue, "LIMIT", offset, count);
}
@Override
public Collection<ScoredEntry<V>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) {
return get(entryRangeAsync(startScore, startScoreInclusive, endScore, endScoreInclusive, offset, count));
@ -442,8 +475,8 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public Future<Collection<ScoredEntry<V>>> entryRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) {
String startValue = value(BigDecimal.valueOf(startScore).toPlainString(), startScoreInclusive);
String endValue = value(BigDecimal.valueOf(endScore).toPlainString(), endScoreInclusive);
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE_ENTRY, getName(), startValue, endValue, "WITHSCORES", "LIMIT", offset, count);
}

@ -0,0 +1,275 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanAmountReplayConvertor;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RSetMultimap;
import io.netty.util.concurrent.Future;
/**
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class RedissonSetMultimap<K, V> extends RedissonMultimap<K, V> implements RSetMultimap<K, V> {
private static final RedisStrictCommand<Boolean> SCARD_VALUE = new RedisStrictCommand<Boolean>("SCARD", new BooleanAmountReplayConvertor());
private static final RedisCommand<Boolean> SISMEMBER_VALUE = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor());
RedissonSetMultimap(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
RedissonSetMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
public Future<Integer> sizeAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_INTEGER,
"local keys = redis.call('hgetall', KEYS[1]); " +
"local size = 0; " +
"for i, v in ipairs(keys) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"size = size + redis.call('scard', name); " +
"end;" +
"end; " +
"return size; ",
Arrays.<Object>asList(getName()));
}
public Future<Boolean> containsKeyAsync(Object key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.readAsync(getName(), codec, SCARD_VALUE, setName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> containsValueAsync(Object value) {
try {
byte[] valueState = codec.getMapValueEncoder().encode(value);
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local keys = redis.call('hgetall', KEYS[1]); " +
"for i, v in ipairs(keys) do " +
"if i % 2 == 0 then " +
"local name = '{' .. KEYS[1] .. '}:' .. v; " +
"if redis.call('sismember', name, ARGV[1]) == 1 then "
+ "return 1; " +
"end;" +
"end;" +
"end; " +
"return 0; ",
Arrays.<Object>asList(getName()), valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> containsEntryAsync(Object key, Object value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
byte[] valueState = codec.getMapValueEncoder().encode(value);
String setName = getValuesName(keyHash);
return commandExecutor.readAsync(getName(), codec, SISMEMBER_VALUE, setName, valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> putAsync(K key, V value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
byte[] valueState = codec.getMapValueEncoder().encode(value);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"return redis.call('sadd', KEYS[2], ARGV[3]); ",
Arrays.<Object>asList(getName(), setName), keyState, keyHash, valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> removeAsync(Object key, Object value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
byte[] valueState = codec.getMapValueEncoder().encode(value);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local res = redis.call('srem', KEYS[2], ARGV[2]); "
+ "if res == 1 and redis.call('scard', KEYS[2]) == 0 then "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
+ "end; "
+ "return res; ",
Arrays.<Object>asList(getName(), setName), keyState, valueState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Future<Boolean> putAllAsync(K key, Iterable<? extends V> values) {
try {
List<Object> params = new ArrayList<Object>();
byte[] keyState = codec.getMapKeyEncoder().encode(key);
params.add(keyState);
String keyHash = hash(keyState);
params.add(keyHash);
for (Object value : values) {
byte[] valueState = codec.getMapValueEncoder().encode(value);
params.add(valueState);
}
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN_AMOUNT,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"return redis.call('sadd', KEYS[2], unpack(ARGV, 3, #ARGV)); ",
Arrays.<Object>asList(getName(), setName), params.toArray());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Set<V> get(K key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return new RedissonSet<V>(codec, commandExecutor, setName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Set<V> getAll(K key) {
return (Set<V>) super.getAll(key);
}
public Future<Collection<V>> getAllAsync(K key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.readAsync(getName(), codec, RedisCommands.SMEMBERS, setName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Set<V> removeAll(Object key) {
return (Set<V>) get(removeAllAsync(key));
}
public Future<Collection<V>> removeAllAsync(Object key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_SET,
"redis.call('hdel', KEYS[1], ARGV[1]); " +
"local members = redis.call('smembers', KEYS[2]); " +
"redis.call('del', KEYS[2]); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), keyState);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Set<Entry<K, V>> entries() {
return (Set<Entry<K, V>>) super.entries();
}
@Override
public Set<V> replaceValues(K key, Iterable<? extends V> values) {
return (Set<V>) get(replaceValuesAsync(key, values));
}
Iterator<V> valuesIterator() {
return new RedissonSetMultimapIterator<K, V, V>(RedissonSetMultimap.this, commandExecutor, codec) {
@Override
V getValue(V entry) {
return (V) entry;
}
};
}
RedissonSetMultimapIterator<K, V, Entry<K, V>> entryIterator() {
return new RedissonSetMultimapIterator<K, V, Map.Entry<K, V>>(RedissonSetMultimap.this, commandExecutor, codec);
}
public Future<Collection<V>> replaceValuesAsync(K key, Iterable<? extends V> values) {
try {
List<Object> params = new ArrayList<Object>();
byte[] keyState = codec.getMapKeyEncoder().encode(key);
params.add(keyState);
String keyHash = hash(keyState);
params.add(keyHash);
for (Object value : values) {
byte[] valueState = codec.getMapValueEncoder().encode(value);
params.add(valueState);
}
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_SET,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"local members = redis.call('smembers', KEYS[2]); " +
"redis.call('del', KEYS[2]); " +
"redis.call('sadd', KEYS[2], unpack(ARGV, 3, #ARGV)); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), params.toArray());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,35 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.Iterator;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
public class RedissonSetMultimapIterator<K, V, M> extends RedissonMultiMapIterator<K, V, M> {
public RedissonSetMultimapIterator(RedissonMultimap<K, V> map, CommandAsyncExecutor commandExecutor, Codec codec) {
super(map, commandExecutor, codec);
}
@Override
protected Iterator<V> getIterator(String name) {
RedissonSet<V> set = new RedissonSet<V>(codec, commandExecutor, map.getValuesName(name));
return set.iterator();
}
}

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.io.IOException;
import java.net.URL;
import java.util.Enumeration;
import java.util.jar.Attributes;
@ -43,7 +42,7 @@ public class Version {
break;
}
}
} catch (IOException E) {
} catch (Exception E) {
// skip it
}
}

@ -0,0 +1,32 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
/**
* This error occurs in case then Redis server free memory has been exhausted.
*
* @author Nikita Koksharov
*
*/
public class RedisOutOfMemoryException extends RedisException {
private static final long serialVersionUID = -2565335188503354660L;
public RedisOutOfMemoryException(String message) {
super(message);
}
}

@ -24,15 +24,19 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
public class ScanCodec implements Codec {
public final Codec delegate;
private final Codec delegate;
private final Codec mapValueCodec;
public ScanCodec(Codec delegate) {
super();
this(delegate, null);
}
public ScanCodec(Codec delegate, Codec mapValueCodec) {
this.delegate = delegate;
this.mapValueCodec = mapValueCodec;
}
@Override
@ -51,7 +55,11 @@ public class ScanCodec implements Codec {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Object val = delegate.getMapValueDecoder().decode(buf, state);
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
Object val = c.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
@ -59,7 +67,12 @@ public class ScanCodec implements Codec {
@Override
public Encoder getMapValueEncoder() {
return delegate.getMapValueEncoder();
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
return c.getMapValueEncoder();
}
@Override

@ -16,6 +16,7 @@
package org.redisson.client.codec;
import java.io.IOException;
import java.nio.charset.Charset;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
@ -28,22 +29,36 @@ public class StringCodec implements Codec {
public static final StringCodec INSTANCE = new StringCodec();
private final Charset charset;
private final Encoder encoder = new Encoder() {
@Override
public byte[] encode(Object in) throws IOException {
return in.toString().getBytes("UTF-8");
return in.toString().getBytes(charset);
}
};
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) {
String str = buf.toString(CharsetUtil.UTF_8);
String str = buf.toString(charset);
buf.readerIndex(buf.readableBytes());
return str;
}
};
public StringCodec() {
this(CharsetUtil.UTF_8);
}
public StringCodec(String charsetName) {
this(Charset.forName(charsetName));
}
public StringCodec(Charset charset) {
this.charset = charset;
}
@Override
public Decoder<Object> getValueDecoder() {
return decoder;

@ -26,6 +26,7 @@ import org.redisson.client.RedisAskException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.StringCodec;
@ -186,8 +187,18 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} else if (error.startsWith("LOADING")) {
data.getPromise().tryFailure(new RedisLoadingException(error
+ ". channel: " + channel + " data: " + data));
} else if (error.startsWith("OOM")) {
data.getPromise().tryFailure(new RedisOutOfMemoryException(error.split("OOM ")[1]
+ ". channel: " + channel + " data: " + data));
} else if (error.contains("-OOM ")) {
data.getPromise().tryFailure(new RedisOutOfMemoryException(error.split("-OOM ")[1]
+ ". channel: " + channel + " data: " + data));
} else {
data.getPromise().tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data));
if (data != null) {
data.getPromise().tryFailure(new RedisException(error + ". channel: " + channel + " command: " + data));
} else {
log.error("Error: {} channel: {} data: {}", error, channel, data);
}
}
} else if (code == ':') {
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);

@ -91,6 +91,7 @@ public interface RedisCommands {
RedisStrictCommand<Integer> ZREMRANGEBYLEX = new RedisStrictCommand<Integer>("ZREMRANGEBYLEX", new IntegerReplayConvertor());
RedisCommand<List<Object>> ZRANGEBYLEX = new RedisCommand<List<Object>>("ZRANGEBYLEX", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZRANGEBYSCORE = new RedisCommand<List<Object>>("ZRANGEBYSCORE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZREVRANGEBYSCORE = new RedisCommand<List<Object>>("ZREVRANGEBYSCORE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new NestedMultiDecoder(new ScoredSortedSetScanDecoder<Object>(), new ScoredSortedSetScanReplayDecoder()), ValueType.OBJECT);
@ -159,6 +160,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> SCRIPT_FLUSH = new RedisStrictCommand<Boolean>("SCRIPT", "FLUSH", new BooleanReplayConvertor());
RedisStrictCommand<List<Boolean>> SCRIPT_EXISTS = new RedisStrictCommand<List<Boolean>>("SCRIPT", "EXISTS", new ObjectListReplayDecoder<Boolean>(), new BooleanReplayConvertor());
RedisStrictCommand<Boolean> EVAL_BOOLEAN_AMOUNT = new RedisStrictCommand<Boolean>("EVAL", new BooleanAmountReplayConvertor());
RedisStrictCommand<Boolean> EVAL_BOOLEAN = new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor());
RedisCommand<Boolean> EVAL_BOOLEAN_WITH_VALUES = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.OBJECTS);
RedisStrictCommand<String> EVAL_STRING = new RedisStrictCommand<String>("EVAL", new StringReplayDecoder());

@ -21,7 +21,7 @@ import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.FutureConnectionListener;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.core.NodeType;
public class ClusterConnectionListener extends DefaultConnectionListener {

@ -96,6 +96,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
if (lastPartitions.isEmpty()) {
try {
group.shutdownGracefully().await();
} catch (Exception e) {
// skip it
}
throw new RedisConnectionException("Can't connect to servers!", lastException);
}

@ -19,11 +19,11 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.ReconnectListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.core.NodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,15 +48,13 @@ public class ClientConnectionsEntry {
private FreezeReason freezeReason;
final RedisClient client;
public enum NodeType {SLAVE, MASTER, SENTINEL}
private final NodeType nodeType;
private ConnectionManager connectionManager;
private final AtomicInteger failedAttempts = new AtomicInteger();
public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize,
ConnectionManager connectionManager, NodeType serverMode, MasterSlaveServersConfig config) {
ConnectionManager connectionManager, NodeType serverMode) {
this.client = client;
this.freeConnectionsCounter.set(poolMaxSize);
this.connectionManager = connectionManager;
@ -138,7 +136,7 @@ public class ClientConnectionsEntry {
freeConnections.add(connection);
}
public Future<RedisConnection> connect(final MasterSlaveServersConfig config) {
public Future<RedisConnection> connect() {
final Promise<RedisConnection> connectionFuture = client.getBootstrap().group().next().newPromise();
Future<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {
@ -151,21 +149,21 @@ public class ClientConnectionsEntry {
RedisConnection conn = future.getNow();
log.debug("new connection created: {}", conn);
addReconnectListener(connectionFuture, config, conn);
addReconnectListener(connectionFuture, conn);
}
});
return connectionFuture;
}
private <T extends RedisConnection> void addReconnectListener(Promise<T> connectionFuture, final MasterSlaveServersConfig config, T conn) {
connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, config);
private <T extends RedisConnection> void addReconnectListener(Promise<T> connectionFuture, T conn) {
connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, connectionManager.getConfig());
addFireEventListener(connectionFuture);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn, Promise<RedisConnection> connectionFuture) {
connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, config);
connectionManager.getConnectListener().onConnect(connectionFuture, conn, nodeType, connectionManager.getConfig());
addFireEventListener(connectionFuture);
}
});
@ -187,7 +185,7 @@ public class ClientConnectionsEntry {
});
}
public Future<RedisPubSubConnection> connectPubSub(final MasterSlaveServersConfig config) {
public Future<RedisPubSubConnection> connectPubSub() {
final Promise<RedisPubSubConnection> connectionFuture = client.getBootstrap().group().next().newPromise();
Future<RedisPubSubConnection> future = client.connectPubSubAsync();
future.addListener(new FutureListener<RedisPubSubConnection>() {
@ -200,7 +198,7 @@ public class ClientConnectionsEntry {
RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn);
addReconnectListener(connectionFuture, config, conn);
addReconnectListener(connectionFuture, conn);
allSubscribeConnections.add(conn);
}

@ -17,7 +17,7 @@ package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.core.NodeType;
import io.netty.util.concurrent.Promise;

@ -28,6 +28,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.core.NodeType;
import org.redisson.misc.InfinitySemaphoreLatch;
import io.netty.channel.EventLoopGroup;
@ -87,7 +88,7 @@ public interface ConnectionManager {
RedisClient createClient(String host, int port, int timeout);
RedisClient createClient(String host, int port);
RedisClient createClient(NodeType type, String host, int port);
MasterSlaveEntry getEntry(InetSocketAddress addr);

@ -19,7 +19,7 @@ import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.core.NodeType;
import io.netty.util.concurrent.Promise;

@ -44,6 +44,7 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.core.NodeType;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -198,7 +199,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
connectionWatcher = new IdleConnectionWatcher(this, config);
initEntry(config);
try {
initEntry(config);
} catch (RuntimeException e) {
try {
group.shutdownGracefully().await();
} catch (Exception e1) {
// skip
}
throw e;
}
}
public ConnectionInitializer getConnectListener() {
@ -259,14 +269,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RedisClient createClient(String host, int port) {
public RedisClient createClient(NodeType type, String host, int port) {
RedisClient client = createClient(host, port, config.getConnectTimeout());
clients.add(new RedisClientEntry(client, this));
clients.add(new RedisClientEntry(client, this, type));
return client;
}
public void shutdownAsync(RedisClient client) {
clients.remove(new RedisClientEntry(client, this));
clients.remove(new RedisClientEntry(client, this, null));
client.shutdownAsync();
}

@ -30,10 +30,10 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.core.NodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -84,9 +84,9 @@ public class MasterSlaveEntry {
}
public Future<Void> setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port);
RedisClient client = connectionManager.createClient(NodeType.MASTER, host, port);
masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(),
0, 0, connectionManager, NodeType.MASTER, config);
0, 0, connectionManager, NodeType.MASTER);
return writeConnectionHolder.add(masterEntry);
}
@ -108,12 +108,12 @@ public class MasterSlaveEntry {
}
private Future<Void> addSlave(String host, int port, boolean freezed, NodeType mode) {
RedisClient client = connectionManager.createClient(host, port);
RedisClient client = connectionManager.createClient(NodeType.SLAVE, host, port);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionMinimumIdleSize(),
this.config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, mode, config);
this.config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, mode);
if (freezed) {
entry.setFreezed(freezed);
entry.setFreezeReason(FreezeReason.SYSTEM);

@ -20,6 +20,7 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.ClusterNode;
import org.redisson.core.NodeType;
import java.net.InetSocketAddress;
import java.util.Map;
@ -28,11 +29,18 @@ public class RedisClientEntry implements ClusterNode {
private final RedisClient client;
private final ConnectionManager manager;
private final NodeType type;
public RedisClientEntry(RedisClient client, ConnectionManager manager) {
public RedisClientEntry(RedisClient client, ConnectionManager manager, NodeType type) {
super();
this.client = client;
this.manager = manager;
this.type = type;
}
@Override
public NodeType getType() {
return type;
}
public RedisClient getClient() {

@ -24,9 +24,9 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SinglePubSubConnectionPool;
import org.redisson.core.NodeType;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -43,12 +43,12 @@ public class SingleEntry extends MasterSlaveEntry {
@Override
public Future<Void> setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port);
RedisClient masterClient = connectionManager.createClient(NodeType.MASTER, host, port);
masterEntry = new ClientConnectionsEntry(masterClient,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSlaveConnectionMinimumIdleSize(),
config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, NodeType.MASTER, config);
config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, NodeType.MASTER);
final Promise<Void> res = connectionManager.newPromise();
Future<Void> f = writeConnectionHolder.add(masterEntry);
Future<Void> s = pubSubConnectionHolder.add(masterEntry);

@ -28,9 +28,9 @@ import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.core.NodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -185,10 +185,10 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
protected Future<T> connect(ClientConnectionsEntry entry) {
return (Future<T>) entry.connect(config);
return (Future<T>) entry.connect();
}
private Future<T> connectTo(final ClientConnectionsEntry entry) {
private Future<T> connectTo(ClientConnectionsEntry entry) {
T conn = poll(entry);
if (conn != null) {
if (!conn.isActive()) {

@ -41,7 +41,7 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
@Override
protected Future<RedisPubSubConnection> connect(ClientConnectionsEntry entry) {
return entry.connectPubSub(config);
return entry.connectPubSub();
}
@Override

@ -25,6 +25,15 @@ import java.net.InetSocketAddress;
*/
public interface Node {
/**
* Returns node type
*
* @see {@link NodeType}
*
* @return
*/
NodeType getType();
/**
* Get Redis node address
*

@ -0,0 +1,22 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
public enum NodeType {
MASTER, SLAVE
}

@ -41,6 +41,16 @@ public interface NodesGroup<N extends Node> {
*/
void removeConnectionListener(int listenerId);
/**
* Get all nodes by type
*
* @see {@link NodeType}
*
* @param type
* @return
*/
Collection<N> getNodes(NodeType type);
/**
* All Redis nodes used by Redisson.
* This collection may change during master change, cluster topology update and etc.

@ -37,6 +37,24 @@ import io.netty.util.concurrent.Future;
*/
public interface RBatch {
/**
* Returns Set based MultiMap instance by name.
*
* @param name
* @return
*/
<K, V> RSetMultimap<K, V> getSetMultimap(String name);
/**
* Returns Set based MultiMap instance by name
* using provided codec for both map keys and values.
*
* @param name
* @param codec
* @return
*/
<K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec);
/**
* Returns set-based cache instance by <code>name</code>.
* Uses map (value_hash, value) under the hood for minimal memory consumption.
@ -118,6 +136,24 @@ public interface RBatch {
<V> RListAsync<V> getList(String name, Codec codec);
/**
* Returns List based MultiMap instance by name.
*
* @param name
* @return
*/
<K, V> RListMultimap<K, V> getListMultimap(String name);
/**
* Returns List based MultiMap instance by name
* using provided codec for both map keys and values.
*
* @param name
* @param codec
* @return
*/
<K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec);
/**
* Returns map instance by name.
*

@ -40,12 +40,4 @@ public interface RBucket<V> extends RExpirable, RBucketAsync<V> {
void set(V value, long timeToLive, TimeUnit timeUnit);
/**
* Use {@link #isExists()}
*
* @return
*/
@Deprecated
boolean exists();
}

@ -42,12 +42,4 @@ public interface RBucketAsync<V> extends RExpirableAsync {
Future<Void> setAsync(V value, long timeToLive, TimeUnit timeUnit);
/**
* Use {@link #isExistsAsync()}
*
* @return
*/
@Deprecated
Future<Boolean> existsAsync();
}

@ -0,0 +1,70 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.List;
/**
* List based Multimap. Stores insertion order and allows duplicates for values mapped to key.
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public interface RListMultimap<K, V> extends RMultimap<K, V> {
/**
* {@inheritDoc}
*
* <p>Because a {@code RListMultimap} may has duplicates among values mapped by key and stores insertion order
* method returns a {@link List}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*/
@Override
List<V> get(K key);
/**
* {@inheritDoc}
*
* <p>Because a {@code RListMultimap} may has duplicates among values mapped by key and stores insertion order
* method returns a {@link List}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*/
List<V> getAll(K key);
/**
* {@inheritDoc}
*
* <p>Because a {@code RListMultimap} may has duplicates among values mapped by key and stores insertion order
* method returns a {@link List}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*/
@Override
List<V> removeAll(Object key);
/**
* {@inheritDoc}
*
* <p>Because a {@code RListMultimap} may has duplicates among values mapped by key and stores insertion order
* method returns a {@link List}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*
*/
@Override
List<V> replaceValues(K key, Iterable<? extends V> values);
}

@ -0,0 +1,198 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
* Base Multimap interface. Allows to map multiple values per key.
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public interface RMultimap<K, V> extends RExpirable, RMultimapAsync<K, V> {
/**
* Returns the number of key-value pairs in this multimap.
*
* @return
*/
int size();
/**
* Check is map empty
*
* @return
*/
boolean isEmpty();
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the key {@code key}.
*/
boolean containsKey(Object key);
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the value {@code value}.
*/
boolean containsValue(Object value);
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the key {@code key} and the value {@code value}.
*/
boolean containsEntry(Object key, Object value);
/**
* Stores a key-value pair in this multimap.
*
* <p>Some multimap implementations allow duplicate key-value pairs, in which
* case {@code put} always adds a new key-value pair and increases the
* multimap size by 1. Other implementations prohibit duplicates, and storing
* a key-value pair that's already in the multimap has no effect.
*
* @return {@code true} if the method increased the size of the multimap, or
* {@code false} if the multimap already contained the key-value pair and
* doesn't allow duplicates
*/
boolean put(K key, V value);
/**
* Removes a single key-value pair with the key {@code key} and the value
* {@code value} from this multimap, if such exists. If multiple key-value
* pairs in the multimap fit this description, which one is removed is
* unspecified.
*
* @return {@code true} if the multimap changed
*/
boolean remove(Object key, Object value);
/**
* Stores a key-value pair in this multimap for each of {@code values}, all
* using the same key, {@code key}. Equivalent to (but expected to be more
* efficient than): <pre> {@code
*
* for (V value : values) {
* put(key, value);
* }}</pre>
*
* <p>In particular, this is a no-op if {@code values} is empty.
*
* @return {@code true} if the multimap changed
*/
boolean putAll(K key, Iterable<? extends V> values);
/**
* Stores a collection of values with the same key, replacing any existing
* values for that key.
*
* <p>If {@code values} is empty, this is equivalent to
* {@link #removeAll(Object) removeAll(key)}.
*
* @return the collection of replaced values, or an empty collection if no
* values were previously associated with the key. The collection
* <i>may</i> be modifiable, but updating it will have no effect on the
* multimap.
*/
Collection<V> replaceValues(K key, Iterable<? extends V> values);
/**
* Removes all values associated with the key {@code key}.
*
* <p>Once this method returns, {@code key} will not be mapped to any values,
* so it will not appear in {@link #keySet()}, {@link #asMap()}, or any other
* views.
*
* @return the values that were removed (possibly empty). The returned
* collection <i>may</i> be modifiable, but updating it will have no
* effect on the multimap.
*/
Collection<V> removeAll(Object key);
/**
* Removes all key-value pairs from the multimap, leaving it {@linkplain
* #isEmpty empty}.
*/
void clear();
/**
* Returns a view collection of the values associated with {@code key} in this
* multimap, if any. Note that when {@code containsKey(key)} is false, this
* returns an empty collection, not {@code null}.
*
* <p>Changes to the returned collection will update the underlying multimap,
* and vice versa.
*/
Collection<V> get(K key);
/**
* Returns all elements at once. Result collection is <b>NOT</b> backed by map,
* so changes are not reflected in map.
*
* @param key
* @return
*/
Collection<V> getAll(K key);
/**
* Returns a view collection of all <i>distinct</i> keys contained in this
* multimap. Note that the key set contains a key if and only if this multimap
* maps that key to at least one value.
*
* <p>Changes to the returned set will update the underlying multimap, and
* vice versa. However, <i>adding</i> to the returned set is not possible.
*/
Set<K> keySet();
/**
* Returns a view collection containing the <i>value</i> from each key-value
* pair contained in this multimap, without collapsing duplicates (so {@code
* values().size() == size()}).
*
* <p>Changes to the returned collection will update the underlying multimap,
* and vice versa. However, <i>adding</i> to the returned collection is not
* possible.
*/
Collection<V> values();
/**
* Returns a view collection of all key-value pairs contained in this
* multimap, as {@link Map.Entry} instances.
*
* <p>Changes to the returned collection or the entries it contains will
* update the underlying multimap, and vice versa. However, <i>adding</i> to
* the returned collection is not possible.
*/
Collection<Map.Entry<K, V>> entries();
/**
* Removes <code>keys</code> from map by one operation
*
* Works faster than <code>RMap.remove</code> but not returning
* the value associated with <code>key</code>
*
* @param keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
*/
long fastRemove(K ... keys);
}

@ -0,0 +1,139 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import io.netty.util.concurrent.Future;
/**
* Base asynchronous MultiMap interface. A collection that maps multiple values per one key.
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public interface RMultimapAsync<K, V> extends RExpirableAsync {
/**
* Returns the number of key-value pairs in this multimap.
*
* @return
*/
Future<Integer> sizeAsync();
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the key {@code key}.
*/
Future<Boolean> containsKeyAsync(Object key);
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the value {@code value}.
*/
Future<Boolean> containsValueAsync(Object value);
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the key {@code key} and the value {@code value}.
*/
Future<Boolean> containsEntryAsync(Object key, Object value);
/**
* Stores a key-value pair in this multimap.
*
* <p>Some multimap implementations allow duplicate key-value pairs, in which
* case {@code put} always adds a new key-value pair and increases the
* multimap size by 1. Other implementations prohibit duplicates, and storing
* a key-value pair that's already in the multimap has no effect.
*
* @return {@code true} if the method increased the size of the multimap, or
* {@code false} if the multimap already contained the key-value pair and
* doesn't allow duplicates
*/
Future<Boolean> putAsync(K key, V value);
/**
* Removes a single key-value pair with the key {@code key} and the value
* {@code value} from this multimap, if such exists. If multiple key-value
* pairs in the multimap fit this description, which one is removed is
* unspecified.
*
* @return {@code true} if the multimap changed
*/
Future<Boolean> removeAsync(Object key, Object value);
// Bulk Operations
/**
* Stores a key-value pair in this multimap for each of {@code values}, all
* using the same key, {@code key}. Equivalent to (but expected to be more
* efficient than): <pre> {@code
*
* for (V value : values) {
* put(key, value);
* }}</pre>
*
* <p>In particular, this is a no-op if {@code values} is empty.
*
* @return {@code true} if the multimap changed
*/
Future<Boolean> putAllAsync(K key, Iterable<? extends V> values);
/**
* Stores a collection of values with the same key, replacing any existing
* values for that key.
*
* <p>If {@code values} is empty, this is equivalent to
* {@link #removeAll(Object) removeAll(key)}.
*
* @return the collection of replaced values, or an empty collection if no
* values were previously associated with the key. The collection
* <i>may</i> be modifiable, but updating it will have no effect on the
* multimap.
*/
Future<Collection<V>> replaceValuesAsync(K key, Iterable<? extends V> values);
/**
* Removes all values associated with the key {@code key}.
*
* <p>Once this method returns, {@code key} will not be mapped to any values,
* so it will not appear in {@link #keySet()}, {@link #asMap()}, or any other
* views.
*
* @return the values that were removed (possibly empty). The returned
* collection <i>may</i> be modifiable, but updating it will have no
* effect on the multimap.
*/
Future<Collection<V>> removeAllAsync(Object key);
Future<Collection<V>> getAllAsync(K key);
/**
* Removes <code>keys</code> from map by one operation
*
* Works faster than <code>removeAll</code> but not returning
* the value associated with <code>key</code>
*
* @param keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
*/
Future<Long> fastRemoveAsync(K ... keys);
}

@ -88,10 +88,14 @@ public interface RScoredSortedSet<V> extends RScoredSortedSetAsync<V>, Iterable<
Collection<V> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Collection<V> valueRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Collection<ScoredEntry<V>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Collection<V> valueRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);
Collection<V> valueRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);
Collection<ScoredEntry<V>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);
}

@ -82,10 +82,14 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync {
Future<Collection<V>> valueRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Future<Collection<V>> valueRangeReversedAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Future<Collection<ScoredEntry<V>>> entryRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);
Future<Collection<V>> valueRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);
Future<Collection<V>> valueRangeReversedAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);
Future<Collection<ScoredEntry<V>>> entryRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);
}

@ -0,0 +1,82 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Map;
import java.util.Set;
/**
* Set based Multimap. Doesn't allow duplications for values mapped to key.
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public interface RSetMultimap<K, V> extends RMultimap<K, V> {
/**
* {@inheritDoc}
*
* <p>Because a {@code RSetMultiMap} has unique values for a given key, this
* method returns a {@link Set}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*/
@Override
Set<V> get(K key);
/**
* {@inheritDoc}
*
* <p>Because a {@code RSetMultiMap} has unique values for a given key, this
* method returns a {@link Set}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*/
Set<V> getAll(K key);
/**
* {@inheritDoc}
*
* <p>Because a {@code RSetMultiMap} has unique values for a given key, this
* method returns a {@link Set}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*/
@Override
Set<V> removeAll(Object key);
/**
* {@inheritDoc}
*
* <p>Because a {@code RSetMultiMap} has unique values for a given key, this
* method returns a {@link Set}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*
* <p>Any duplicates in {@code values} will be stored in the multimap once.
*/
@Override
Set<V> replaceValues(K key, Iterable<? extends V> values);
/**
* {@inheritDoc}
*
* <p>Because a {@code RSetMultiMap} has unique values for a given key, this
* method returns a {@link Set}, instead of the {@link java.util.Collection}
* specified in the {@link RMultimap} interface.
*/
@Override
Set<Map.Entry<K, V>> entries();
}

@ -15,153 +15,110 @@
*/
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.RedissonBitSet;
import org.redisson.api.RBitSetReactive;
import org.redisson.client.codec.BitSetCodec;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandBatchService;
import org.redisson.command.CommandReactiveExecutor;
import reactor.rx.Streams;
public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive {
private final RedissonBitSet instance;
public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
this.instance = new RedissonBitSet(connectionManager, name);
}
public Publisher<Boolean> get(long bitIndex) {
return commandExecutor.readReactive(getName(), codec, RedisCommands.GETBIT, getName(), bitIndex);
return reactive(instance.getAsync(bitIndex));
}
public Publisher<Void> set(long bitIndex, boolean value) {
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SETBIT_VOID, getName(), bitIndex, value ? 1 : 0);
return reactive(instance.setAsync(bitIndex, value));
}
public Publisher<byte[]> toByteArray() {
return commandExecutor.readReactive(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName());
}
private Publisher<Void> op(String op, String... bitSetNames) {
List<Object> params = new ArrayList<Object>(bitSetNames.length + 3);
params.add(op);
params.add(getName());
params.add(getName());
params.addAll(Arrays.asList(bitSetNames));
return commandExecutor.writeReactive(getName(), codec, RedisCommands.BITOP, params.toArray());
return reactive(instance.toByteArrayAsync());
}
public Publisher<BitSet> asBitSet() {
return commandExecutor.readReactive(getName(), BitSetCodec.INSTANCE, RedisCommands.GET, getName());
}
//Copied from: https://github.com/xetorthio/jedis/issues/301
private static byte[] toByteArrayReverse(BitSet bits) {
byte[] bytes = new byte[bits.length() / 8 + 1];
for (int i = 0; i < bits.length(); i++) {
if (bits.get(i)) {
final int value = bytes[i / 8] | (1 << (7 - (i % 8)));
bytes[i / 8] = (byte) value;
}
}
return bytes;
}
@Override
public Publisher<Long> length() {
return commandExecutor.evalReadReactive(getName(), codec, RedisCommands.EVAL_LONG,
"local fromBit = redis.call('bitpos', KEYS[1], 1, -1);"
+ "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;"
+ "for i = toBit, fromBit, -1 do "
+ "if redis.call('getbit', KEYS[1], i) == 1 then "
+ "return i+1;"
+ "end;"
+ "end;" +
"return fromBit+1",
Collections.<Object>singletonList(getName()));
return reactive(instance.lengthAsync());
}
@Override
public Publisher<Void> set(long fromIndex, long toIndex, boolean value) {
if (value) {
return set(fromIndex, toIndex);
}
return clear(fromIndex, toIndex);
return reactive(instance.setAsync(fromIndex, toIndex, value));
}
@Override
public Publisher<Void> clear(long fromIndex, long toIndex) {
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (long i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, 0);
}
return new NettyFuturePublisher<Void>(executorService.executeAsyncVoid());
return reactive(instance.clearAsync(fromIndex, toIndex));
}
@Override
public Publisher<Void> set(BitSet bs) {
return commandExecutor.writeReactive(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs));
return reactive(instance.setAsync(bs));
}
@Override
public Publisher<Void> not() {
return op("NOT");
return reactive(instance.notAsync());
}
@Override
public Publisher<Void> set(long fromIndex, long toIndex) {
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (long i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, 1);
}
return new NettyFuturePublisher<Void>(executorService.executeAsyncVoid());
return reactive(instance.setAsync(fromIndex, toIndex));
}
@Override
public Publisher<Integer> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.BITS_SIZE, getName());
return reactive(instance.sizeAsync());
}
@Override
public Publisher<Void> set(long bitIndex) {
return set(bitIndex, true);
return reactive(instance.setAsync(bitIndex));
}
@Override
public Publisher<Long> cardinality() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.BITCOUNT, getName());
return reactive(instance.cardinalityAsync());
}
@Override
public Publisher<Void> clear(long bitIndex) {
return set(bitIndex, false);
return reactive(instance.clearAsync(bitIndex));
}
@Override
public Publisher<Void> clear() {
return commandExecutor.writeReactive(getName(), RedisCommands.DEL_VOID, getName());
return reactive(instance.clearAsync());
}
@Override
public Publisher<Void> or(String... bitSetNames) {
return op("OR", bitSetNames);
return reactive(instance.orAsync(bitSetNames));
}
@Override
public Publisher<Void> and(String... bitSetNames) {
return op("AND", bitSetNames);
return reactive(instance.andAsync(bitSetNames));
}
@Override
public Publisher<Void> xor(String... bitSetNames) {
return op("XOR", bitSetNames);
return reactive(instance.xorAsync(bitSetNames));
}
@Override

@ -26,8 +26,8 @@ public abstract class BaseTest {
redisAddress = "127.0.0.1:6379";
}
Config config = new Config();
config.setCodec(new MsgPackJacksonCodec());
// config.useSentinelConnection().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26379", "127.0.0.1:26389");
// config.setCodec(new MsgPackJacksonCodec());
// config.useSentinelServers().setMasterName("mymaster").addSentinelAddress("127.0.0.1:26379", "127.0.0.1:26389");
// config.useClusterServers().addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001", "127.0.0.1:7000");
config.useSingleServer().setAddress(redisAddress);
// .setPassword("mypass1");

@ -6,6 +6,20 @@ import org.redisson.core.RAtomicLong;
public class RedissonAtomicLongTest extends BaseTest {
@Test
public void testCompareAndSetZero() {
RAtomicLong al = redisson.getAtomicLong("test");
Assert.assertTrue(al.compareAndSet(0, 2));
Assert.assertEquals(2, al.get());
RAtomicLong al2 = redisson.getAtomicLong("test1");
al2.set(0);
Assert.assertTrue(al2.compareAndSet(0, 2));
Assert.assertEquals(2, al2.get());
}
@Test
public void testCompareAndSet() {
RAtomicLong al = redisson.getAtomicLong("test");

@ -157,7 +157,7 @@ public class RedissonBucketTest extends BaseTest {
bucket.set(value);
Assert.assertEquals(value, bucket.get());
Assert.assertTrue(bucket.exists());
Assert.assertTrue(bucket.isExists());
}
@Test
@ -168,11 +168,11 @@ public class RedissonBucketTest extends BaseTest {
bucket.set(value);
Assert.assertEquals(value, bucket.get());
Assert.assertTrue(bucket.exists());
Assert.assertTrue(bucket.isExists());
bucket.delete();
Assert.assertFalse(bucket.exists());
Assert.assertFalse(bucket.isExists());
}
@Test

@ -0,0 +1,288 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import org.redisson.core.RListMultimap;
public class RedissonListMultimapTest extends BaseTest {
public static class SimpleKey implements Serializable {
private String key;
public SimpleKey() {
}
public SimpleKey(String field) {
this.key = field;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
@Override
public String toString() {
return "key: " + key;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SimpleKey other = (SimpleKey) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
return true;
}
}
public static class SimpleValue implements Serializable {
private String value;
public SimpleValue() {
}
public SimpleValue(String field) {
this.value = field;
}
public void setValue(String field) {
this.value = field;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "value: " + value;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SimpleValue other = (SimpleValue) obj;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
}
@Test
public void testSize() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("1"), new SimpleValue("4"));
assertThat(map.size()).isEqualTo(3);
assertThat(map.fastRemove(new SimpleKey("0"))).isEqualTo(1);
List<SimpleValue> s = map.get(new SimpleKey("0"));
assertThat(s).isEmpty();
assertThat(map.size()).isEqualTo(1);
}
@Test
public void testPut() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.size()).isEqualTo(5);
List<SimpleValue> s1 = map.get(new SimpleKey("0"));
assertThat(s1).containsExactly(new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"), new SimpleValue("3"));
List<SimpleValue> allValues = map.getAll(new SimpleKey("0"));
assertThat(allValues).containsExactly(new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"), new SimpleValue("3"));
List<SimpleValue> s2 = map.get(new SimpleKey("3"));
assertThat(s2).containsExactly(new SimpleValue("4"));
}
@Test
public void testRemoveAll() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
List<SimpleValue> values = map.removeAll(new SimpleKey("0"));
assertThat(values).containsExactly(new SimpleValue("1"), new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"));
assertThat(map.size()).isZero();
List<SimpleValue> values2 = map.removeAll(new SimpleKey("0"));
assertThat(values2).isEmpty();
}
@Test
public void testFastRemove() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
assertThat(map.put(new SimpleKey("0"), new SimpleValue("1"))).isTrue();
assertThat(map.put(new SimpleKey("0"), new SimpleValue("2"))).isTrue();
assertThat(map.put(new SimpleKey("0"), new SimpleValue("2"))).isTrue();
assertThat(map.put(new SimpleKey("0"), new SimpleValue("3"))).isTrue();
long removed = map.fastRemove(new SimpleKey("0"), new SimpleKey("1"));
assertThat(removed).isEqualTo(1);
assertThat(map.size()).isZero();
}
@Test
public void testContainsKey() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
assertThat(map.containsKey(new SimpleKey("0"))).isTrue();
assertThat(map.containsKey(new SimpleKey("1"))).isFalse();
}
@Test
public void testContainsValue() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
assertThat(map.containsValue(new SimpleValue("1"))).isTrue();
assertThat(map.containsValue(new SimpleValue("0"))).isFalse();
}
@Test
public void testContainsEntry() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
assertThat(map.containsEntry(new SimpleKey("0"), new SimpleValue("1"))).isTrue();
assertThat(map.containsEntry(new SimpleKey("0"), new SimpleValue("2"))).isFalse();
}
@Test
public void testRemove() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
assertThat(map.remove(new SimpleKey("0"), new SimpleValue("2"))).isTrue();
assertThat(map.remove(new SimpleKey("0"), new SimpleValue("5"))).isFalse();
assertThat(map.get(new SimpleKey("0")).size()).isEqualTo(2);
assertThat(map.getAll(new SimpleKey("0")).size()).isEqualTo(2);
}
@Test
public void testPutAll() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
List<SimpleValue> values = Arrays.asList(new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"), new SimpleValue("3"));
assertThat(map.putAll(new SimpleKey("0"), values)).isTrue();
assertThat(map.putAll(new SimpleKey("0"), Arrays.asList(new SimpleValue("1")))).isTrue();
List<SimpleValue> testValues = Arrays.asList(new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"), new SimpleValue("3"), new SimpleValue("1"));
assertThat(map.get(new SimpleKey("0"))).containsExactlyElementsOf(testValues);
}
@Test
public void testKeySet() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.keySet()).containsOnly(new SimpleKey("0"), new SimpleKey("3"));
assertThat(map.keySet().size()).isEqualTo(2);
}
@Test
public void testValues() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
map.put(new SimpleKey("2"), new SimpleValue("5"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.values().size()).isEqualTo(5);
assertThat(map.values()).containsOnly(new SimpleValue("1"), new SimpleValue("1"), new SimpleValue("3"), new SimpleValue("5"), new SimpleValue("4"));
}
@Test
public void testEntrySet() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.entries().size()).isEqualTo(3);
List<Map.Entry<SimpleKey, SimpleValue>> testMap = new ArrayList<Map.Entry<SimpleKey, SimpleValue>>();
testMap.add(new AbstractMap.SimpleEntry(new SimpleKey("0"), new SimpleValue("1")));
testMap.add(new AbstractMap.SimpleEntry(new SimpleKey("0"), new SimpleValue("1")));
testMap.add(new AbstractMap.SimpleEntry(new SimpleKey("3"), new SimpleValue("4")));
assertThat(map.entries()).containsOnlyElementsOf(testMap);
}
@Test
public void testReplaceValues() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
List<SimpleValue> values = Arrays.asList(new SimpleValue("11"), new SimpleValue("12"), new SimpleValue("12"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), values);
assertThat(oldValues).containsExactly(new SimpleValue("1"));
List<SimpleValue> allValues = map.getAll(new SimpleKey("0"));
assertThat(allValues).containsExactlyElementsOf(values);
}
}

@ -103,6 +103,36 @@ public class RedissonScoredSortedSetTest extends BaseTest {
MatcherAssert.assertThat(set, Matchers.contains("a", "d", "e", "f", "g"));
}
@Test
public void testRemoveRangeByScoreNegativeInf() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
set.add(0.5, "e");
set.add(0.6, "f");
set.add(0.7, "g");
Assert.assertEquals(3, set.removeRangeByScore(Double.NEGATIVE_INFINITY, false, 0.3, true));
MatcherAssert.assertThat(set, Matchers.contains("d", "e", "f", "g"));
}
@Test
public void testRemoveRangeByScorePositiveInf() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0.1, "a");
set.add(0.2, "b");
set.add(0.3, "c");
set.add(0.4, "d");
set.add(0.5, "e");
set.add(0.6, "f");
set.add(0.7, "g");
Assert.assertEquals(3, set.removeRangeByScore(0.4, false, Double.POSITIVE_INFINITY, true));
MatcherAssert.assertThat(set, Matchers.contains("a", "b", "c", "d"));
}
@Test
public void testRemoveRangeByRank() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
@ -507,8 +537,64 @@ public class RedissonScoredSortedSetTest extends BaseTest {
Assert.assertArrayEquals(new String[]{"c", "d"}, a);
}
@Test
public void testScoredSortedSetValueRangeLimit() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0, "a");
set.add(1, "b");
set.add(2, "c");
set.add(3, "d");
set.add(4, "e");
Collection<String> r = set.valueRange(1, true, 4, false, 1, 2);
assertThat(r).containsExactly("c", "d");
}
@Test
public void testScoredSortedSetValueRange() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0, "a");
set.add(1, "b");
set.add(2, "c");
set.add(3, "d");
set.add(4, "e");
Collection<String> r = set.valueRange(1, true, 4, false);
assertThat(r).containsExactly("b", "c", "d");
}
@Test
public void testScoredSortedSetValueRangeReversedLimit() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0, "a");
set.add(1, "b");
set.add(2, "c");
set.add(3, "d");
set.add(4, "e");
Collection<String> r = set.valueRangeReversed(1, true, 4, false, 1, 2);
assertThat(r).containsExactly("c", "b");
}
@Test
public void testScoredSortedSetValueRangeReversed() {
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0, "a");
set.add(1, "b");
set.add(2, "c");
set.add(3, "d");
set.add(4, "e");
Collection<String> r = set.valueRangeReversed(1, true, 4, false);
assertThat(r).containsExactly("d", "c", "b");
}
@Test
public void testScoredSortedSetValueRangeNegativeInf() {
RScoredSortedSet<String> set = redisson.<String>getScoredSortedSet("simple");
set.add(0, "a");
@ -517,14 +603,29 @@ public class RedissonScoredSortedSetTest extends BaseTest {
set.add(3, "d");
set.add(4, "e");
Collection<String> r = set.valueRange(1, true, 4, false, 1, 2);
Collection<String> r = set.valueRange(Double.NEGATIVE_INFINITY, true, 4, false, 1, 2);
String[] a = r.toArray(new String[0]);
Assert.assertArrayEquals(new String[]{"b", "c"}, a);
}
@Test
public void testScoredSortedSetValueRangePositiveInf() {
RScoredSortedSet<String> set = redisson.<String>getScoredSortedSet("simple");
set.add(0, "a");
set.add(1, "b");
set.add(2, "c");
set.add(3, "d");
set.add(4, "e");
Collection<String> r = set.valueRange(1, true, Double.POSITIVE_INFINITY, false, 1, 2);
String[] a = r.toArray(new String[0]);
Assert.assertArrayEquals(new String[]{"c", "d"}, a);
}
@Test
public void testScoredSortedSetEntryRange() {
RScoredSortedSet<String> set = redisson.<String>getScoredSortedSet("simple");
RScoredSortedSet<String> set = redisson.getScoredSortedSet("simple");
set.add(0, "a");
set.add(1, "b");
@ -540,6 +641,42 @@ public class RedissonScoredSortedSetTest extends BaseTest {
Assert.assertEquals("d", a[1].getValue());
}
@Test
public void testScoredSortedSetEntryRangeNegativeInf() {
RScoredSortedSet<String> set = redisson.<String>getScoredSortedSet("simple");
set.add(0, "a");
set.add(1, "b");
set.add(2, "c");
set.add(3, "d");
set.add(4, "e");
Collection<ScoredEntry<String>> r = set.entryRange(Double.NEGATIVE_INFINITY, true, 4, false, 1, 2);
ScoredEntry<String>[] a = r.toArray(new ScoredEntry[0]);
Assert.assertEquals(1d, a[0].getScore(), 0);
Assert.assertEquals(2d, a[1].getScore(), 0);
Assert.assertEquals("b", a[0].getValue());
Assert.assertEquals("c", a[1].getValue());
}
@Test
public void testScoredSortedSetEntryRangePositiveInf() {
RScoredSortedSet<String> set = redisson.<String>getScoredSortedSet("simple");
set.add(0, "a");
set.add(1, "b");
set.add(2, "c");
set.add(3, "d");
set.add(4, "e");
Collection<ScoredEntry<String>> r = set.entryRange(1, true, Double.POSITIVE_INFINITY, false, 1, 2);
ScoredEntry<String>[] a = r.toArray(new ScoredEntry[0]);
Assert.assertEquals(2d, a[0].getScore(), 0);
Assert.assertEquals(3d, a[1].getScore(), 0);
Assert.assertEquals("c", a[0].getValue());
Assert.assertEquals("d", a[1].getValue());
}
@Test
public void testAddAndGet() throws InterruptedException {
RScoredSortedSet<Integer> set = redisson.getScoredSortedSet("simple", StringCodec.INSTANCE);

@ -0,0 +1,277 @@
package org.redisson;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Test;
import org.redisson.core.RSetMultimap;
import static org.assertj.core.api.Assertions.*;
public class RedissonSetMultimapTest extends BaseTest {
public static class SimpleKey implements Serializable {
private String key;
public SimpleKey() {
}
public SimpleKey(String field) {
this.key = field;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
@Override
public String toString() {
return "key: " + key;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SimpleKey other = (SimpleKey) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
return true;
}
}
public static class SimpleValue implements Serializable {
private String value;
public SimpleValue() {
}
public SimpleValue(String field) {
this.value = field;
}
public void setValue(String field) {
this.value = field;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "value: " + value;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((value == null) ? 0 : value.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SimpleValue other = (SimpleValue) obj;
if (value == null) {
if (other.value != null)
return false;
} else if (!value.equals(other.value))
return false;
return true;
}
}
@Test
public void testSize() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
assertThat(map.size()).isEqualTo(2);
map.fastRemove(new SimpleKey("0"));
Set<SimpleValue> s = map.get(new SimpleKey("0"));
assertThat(s).isEmpty();
assertThat(map.size()).isEqualTo(0);
}
@Test
public void testPut() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.size()).isEqualTo(4);
Set<SimpleValue> s1 = map.get(new SimpleKey("0"));
assertThat(s1).containsOnly(new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"));
Set<SimpleValue> allValues = map.getAll(new SimpleKey("0"));
assertThat(allValues).containsOnly(new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"));
Set<SimpleValue> s2 = map.get(new SimpleKey("3"));
assertThat(s2).containsOnly(new SimpleValue("4"));
}
@Test
public void testRemoveAll() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
Set<SimpleValue> values = map.removeAll(new SimpleKey("0"));
assertThat(values).containsOnly(new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"));
assertThat(map.size()).isZero();
Set<SimpleValue> values2 = map.removeAll(new SimpleKey("0"));
assertThat(values2).isEmpty();
}
@Test
public void testFastRemove() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
assertThat(map.put(new SimpleKey("0"), new SimpleValue("1"))).isTrue();
assertThat(map.put(new SimpleKey("0"), new SimpleValue("2"))).isTrue();
assertThat(map.put(new SimpleKey("0"), new SimpleValue("2"))).isFalse();
assertThat(map.put(new SimpleKey("0"), new SimpleValue("3"))).isTrue();
long removed = map.fastRemove(new SimpleKey("0"), new SimpleKey("1"));
assertThat(removed).isEqualTo(1);
assertThat(map.size()).isZero();
}
@Test
public void testContainsKey() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
assertThat(map.containsKey(new SimpleKey("0"))).isTrue();
assertThat(map.containsKey(new SimpleKey("1"))).isFalse();
}
@Test
public void testContainsValue() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
assertThat(map.containsValue(new SimpleValue("1"))).isTrue();
assertThat(map.containsValue(new SimpleValue("0"))).isFalse();
}
@Test
public void testContainsEntry() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
assertThat(map.containsEntry(new SimpleKey("0"), new SimpleValue("1"))).isTrue();
assertThat(map.containsEntry(new SimpleKey("0"), new SimpleValue("2"))).isFalse();
}
@Test
public void testRemove() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("3"));
assertThat(map.remove(new SimpleKey("0"), new SimpleValue("2"))).isTrue();
assertThat(map.remove(new SimpleKey("0"), new SimpleValue("5"))).isFalse();
assertThat(map.get(new SimpleKey("0")).size()).isEqualTo(2);
assertThat(map.getAll(new SimpleKey("0")).size()).isEqualTo(2);
}
@Test
public void testPutAll() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
List<SimpleValue> values = Arrays.asList(new SimpleValue("1"), new SimpleValue("2"), new SimpleValue("3"));
assertThat(map.putAll(new SimpleKey("0"), values)).isTrue();
assertThat(map.putAll(new SimpleKey("0"), Arrays.asList(new SimpleValue("1")))).isFalse();
assertThat(map.get(new SimpleKey("0"))).containsOnlyElementsOf(values);
}
@Test
public void testKeySet() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.keySet()).containsOnly(new SimpleKey("0"), new SimpleKey("3"));
assertThat(map.keySet().size()).isEqualTo(2);
}
@Test
public void testValues() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.values()).containsOnly(new SimpleValue("1"), new SimpleValue("4"));
}
@Test
public void testEntrySet() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.entries().size()).isEqualTo(2);
Map<SimpleKey, SimpleValue> testMap = new HashMap<SimpleKey, SimpleValue>();
testMap.put(new SimpleKey("0"), new SimpleValue("1"));
testMap.put(new SimpleKey("3"), new SimpleValue("4"));
assertThat(map.entries()).containsOnlyElementsOf(testMap.entrySet());
}
@Test
public void testReplaceValues() {
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));
List<SimpleValue> values = Arrays.asList(new SimpleValue("11"), new SimpleValue("12"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), values);
assertThat(oldValues).containsOnly(new SimpleValue("1"));
Set<SimpleValue> allValues = map.getAll(new SimpleKey("0"));
assertThat(allValues).containsOnlyElementsOf(values);
}
}

@ -14,6 +14,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.CommandDecoder;
@ -26,6 +27,7 @@ import org.redisson.connection.ConnectionListener;
import org.redisson.core.ClusterNode;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;
import org.redisson.core.RBatch;
import org.redisson.core.RMap;
import io.netty.bootstrap.Bootstrap;
@ -57,6 +59,41 @@ public class RedissonTest {
r.getMap("test").put("1", new Dummy());
}
@Test(expected = RedisOutOfMemoryException.class)
public void testMemoryScript() throws IOException, InterruptedException {
Process p = RedisRunner.runRedis("/redis_oom_test.conf");
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319").setTimeout(100000);
try {
RedissonClient r = Redisson.create(config);
for (int i = 0; i < 10000; i++) {
r.getMap("test").put("" + i, "" + i);
}
} finally {
p.destroy();
}
}
@Test(expected = RedisOutOfMemoryException.class)
public void testMemoryCommand() throws IOException, InterruptedException {
Process p = RedisRunner.runRedis("/redis_oom_test.conf");
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319").setTimeout(100000);
try {
RedissonClient r = Redisson.create(config);
for (int i = 0; i < 10000; i++) {
r.getMap("test").fastPut("" + i, "" + i);
}
} finally {
p.destroy();
}
}
@Test
public void testConnectionListener() throws IOException, InterruptedException, TimeoutException {

@ -0,0 +1,622 @@
# Redis configuration file example
# Note on units: when memory size is needed, it is possible to specify
# it in the usual form of 1k 5GB 4M and so forth:
#
# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes
#
# units are case insensitive so 1GB 1Gb 1gB are all the same.
# By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
daemonize no
# When running daemonized, Redis writes a pid file in /var/run/redis.pid by
# default. You can specify a custom pid file location here.
#pidfile /var/run/redis.pid
# Accept connections on the specified port, default is 6379.
# If port 0 is specified Redis will not listen on a TCP socket.
port 6319
# If you want you can bind a single interface, if the bind option is not
# specified all the interfaces will listen for incoming connections.
#
# bind 127.0.0.1
# Specify the path for the unix socket that will be used to listen for
# incoming connections. There is no default, so Redis will not listen
# on a unix socket when not specified.
#
# unixsocket /tmp/redis.sock
# unixsocketperm 755
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 0
# TCP keepalive.
#
# If non-zero, use SO_KEEPALIVE to send TCP ACKs to clients in absence
# of communication. This is useful for two reasons:
#
# 1) Detect dead peers.
# 2) Take the connection alive from the point of view of network
# equipment in the middle.
#
# On Linux, the specified value (in seconds) is the period used to send ACKs.
# Note that to close the connection the double of the time is needed.
# On other kernels the period depends on the kernel configuration.
#
# A reasonable value for this option is 60 seconds.
tcp-keepalive 0
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel debug
# Specify the log file name. Also 'stdout' can be used to force
# Redis to log on the standard output. Note that if you use standard
# output for logging but daemonize, logs will be sent to /dev/null
logfile "stdout"
# To enable logging to the system logger, just set 'syslog-enabled' to yes,
# and optionally update the other syslog parameters to suit your needs.
# syslog-enabled no
# Specify the syslog identity.
# syslog-ident redis
# Specify the syslog facility. Must be USER or between LOCAL0-LOCAL7.
# syslog-facility local0
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
################################ SNAPSHOTTING #################################
#
# Save the DB on disk:
#
# save <seconds> <changes>
#
# Will save the DB if both the given number of seconds and the given
# number of write operations against the DB occurred.
#
# In the example below the behaviour will be to save:
# after 900 sec (15 min) if at least 1 key changed
# after 300 sec (5 min) if at least 10 keys changed
# after 60 sec if at least 10000 keys changed
#
# Note: you can disable saving at all commenting all the "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive with a single empty string argument
# like in the following example:
#
# save ""
#save 900 1
#save 300 10
#save 60 10000
# By default Redis will stop accepting writes if RDB snapshots are enabled
# (at least one save point) and the latest background save failed.
# This will make the user aware (in an hard way) that data is not persisting
# on disk properly, otherwise chances are that no one will notice and some
# distater will happen.
#
# If the background saving process will start working again Redis will
# automatically allow writes again.
#
# However if you have setup your proper monitoring of the Redis server
# and persistence, you may want to disable this feature so that Redis will
# continue to work as usually even if there are problems with disk,
# permissions, and so forth.
stop-writes-on-bgsave-error yes
# Compress string objects using LZF when dump .rdb databases?
# For default that's set to 'yes' as it's almost always a win.
# If you want to save some CPU in the saving child set it to 'no' but
# the dataset will likely be bigger if you have compressible values or keys.
rdbcompression yes
# Since version 5 of RDB a CRC64 checksum is placed at the end of the file.
# This makes the format more resistant to corruption but there is a performance
# hit to pay (around 10%) when saving and loading RDB files, so you can disable it
# for maximum performances.
#
# RDB files created with checksum disabled have a checksum of zero that will
# tell the loading code to skip the check.
rdbchecksum yes
# The filename where to dump the DB
#dbfilename "dump.rdb"
# The working directory.
#
# The DB will be written inside this directory, with the filename specified
# above using the 'dbfilename' configuration directive.
#
# The Append Only File will also be created inside this directory.
#
# Note that you must specify a directory here, not a file name.
dir "C:\\Devel\\projects\\redis"
################################# REPLICATION #################################
# Master-Slave replication. Use slaveof to make a Redis instance a copy of
# another Redis server. Note that the configuration is local to the slave
# so for example it is possible to configure the slave to save the DB with a
# different interval, or to listen to another port, and so on.
#
# slaveof <masterip> <masterport>
# If the master is password protected (using the "requirepass" configuration
# directive below) it is possible to tell the slave to authenticate before
# starting the replication synchronization process, otherwise the master will
# refuse the slave request.
#
# masterauth <master-password>
# When a slave loses its connection with the master, or when the replication
# is still in progress, the slave can act in two different ways:
#
# 1) if slave-serve-stale-data is set to 'yes' (the default) the slave will
# still reply to client requests, possibly with out of date data, or the
# data set may just be empty if this is the first synchronization.
#
# 2) if slave-serve-stale-data is set to 'no' the slave will reply with
# an error "SYNC with master in progress" to all the kind of commands
# but to INFO and SLAVEOF.
#
slave-serve-stale-data yes
# You can configure a slave instance to accept writes or not. Writing against
# a slave instance may be useful to store some ephemeral data (because data
# written on a slave will be easily deleted after resync with the master) but
# may also cause problems if clients are writing to it because of a
# misconfiguration.
#
# Since Redis 2.6 by default slaves are read-only.
#
# Note: read only slaves are not designed to be exposed to untrusted clients
# on the internet. It's just a protection layer against misuse of the instance.
# Still a read only slave exports by default all the administrative commands
# such as CONFIG, DEBUG, and so forth. To a limited extend you can improve
# security of read only slaves using 'rename-command' to shadow all the
# administrative / dangerous commands.
slave-read-only yes
# Slaves send PINGs to server in a predefined interval. It's possible to change
# this interval with the repl_ping_slave_period option. The default value is 10
# seconds.
#
# repl-ping-slave-period 10
# The following option sets a timeout for both Bulk transfer I/O timeout and
# master data or ping response timeout. The default value is 60 seconds.
#
# It is important to make sure that this value is greater than the value
# specified for repl-ping-slave-period otherwise a timeout will be detected
# every time there is low traffic between the master and the slave.
#
# repl-timeout 60
# Disable TCP_NODELAY on the slave socket after SYNC?
#
# If you select "yes" Redis will use a smaller number of TCP packets and
# less bandwidth to send data to slaves. But this can add a delay for
# the data to appear on the slave side, up to 40 milliseconds with
# Linux kernels using a default configuration.
#
# If you select "no" the delay for data to appear on the slave side will
# be reduced but more bandwidth will be used for replication.
#
# By default we optimize for low latency, but in very high traffic conditions
# or when the master and slaves are many hops away, turning this to "yes" may
# be a good idea.
repl-disable-tcp-nodelay no
# The slave priority is an integer number published by Redis in the INFO output.
# It is used by Redis Sentinel in order to select a slave to promote into a
# master if the master is no longer working correctly.
#
# A slave with a low priority number is considered better for promotion, so
# for instance if there are three slaves with priority 10, 100, 25 Sentinel will
# pick the one wtih priority 10, that is the lowest.
#
# However a special priority of 0 marks the slave as not able to perform the
# role of master, so a slave with priority of 0 will never be selected by
# Redis Sentinel for promotion.
#
# By default the priority is 100.
slave-priority 100
################################## SECURITY ###################################
# Require clients to issue AUTH <PASSWORD> before processing any other
# commands. This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
# Warning: since Redis is pretty fast an outside user can try up to
# 150k passwords per second against a good box. This means that you should
# use a very strong password otherwise it will be very easy to break.
#
#requirepass mypass
# Command renaming.
#
# It is possible to change the name of dangerous commands in a shared
# environment. For instance the CONFIG command may be renamed into something
# hard to guess so that it will still be available for internal-use tools
# but not available for general clients.
#
# Example:
#
# rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c52
#
# It is also possible to completely kill a command by renaming it into
# an empty string:
#
# rename-command CONFIG ""
#
# Please note that changing the name of commands that are logged into the
# AOF file or transmitted to slaves may cause problems.
################################### LIMITS ####################################
# Set the max number of connected clients at the same time. By default
# this limit is set to 10000 clients, however if the Redis server is not
# able to configure the process file limit to allow for the specified limit
# the max number of allowed clients is set to the current file limit
# minus 32 (as Redis reserves a few file descriptors for internal uses).
#
# Once the limit is reached Redis will close all the new connections sending
# an error 'max number of clients reached'.
#
# maxclients 10000
# Don't use more memory than the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys
# accordingly to the eviction policy selected (see maxmemmory-policy).
#
# If Redis can't remove keys according to the policy, or if the policy is
# set to 'noeviction', Redis will start to reply with errors to commands
# that would use more memory, like SET, LPUSH, and so on, and will continue
# to reply to read-only commands like GET.
#
# This option is usually useful when using Redis as an LRU cache, or to set
# an hard memory limit for an instance (using the 'noeviction' policy).
#
# WARNING: If you have slaves attached to an instance with maxmemory on,
# the size of the output buffers needed to feed the slaves are subtracted
# from the used memory count, so that network problems / resyncs will
# not trigger a loop where keys are evicted, and in turn the output
# buffer of slaves is full with DELs of keys evicted triggering the deletion
# of more keys, and so forth until the database is completely emptied.
#
# In short... if you have slaves attached it is suggested that you set a lower
# limit for maxmemory so that there is some free RAM on the system for slave
# output buffers (but this is not needed if the policy is 'noeviction').
#
maxmemory 1mb
# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select among five behaviors:
#
# volatile-lru -> remove the key with an expire set using an LRU algorithm
# allkeys-lru -> remove any key accordingly to the LRU algorithm
# volatile-random -> remove a random key with an expire set
# allkeys-random -> remove a random key, any key
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)
# noeviction -> don't expire at all, just return an error on write operations
#
# Note: with any of the above policies, Redis will return an error on write
# operations, when there are not suitable keys for eviction.
#
# At the date of writing this commands are: set setnx setex append
# incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
# sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
# zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
# getset mset msetnx exec sort
#
# The default is:
#
# maxmemory-policy volatile-lru
# LRU and minimal TTL algorithms are not precise algorithms but approximated
# algorithms (in order to save memory), so you can select as well the sample
# size to check. For instance for default Redis will check three keys and
# pick the one that was used less recently, you can change the sample size
# using the following configuration directive.
#
# maxmemory-samples 3
############################## APPEND ONLY MODE ###############################
# By default Redis asynchronously dumps the dataset on disk. This mode is
# good enough in many applications, but an issue with the Redis process or
# a power outage may result into a few minutes of writes lost (depending on
# the configured save points).
#
# The Append Only File is an alternative persistence mode that provides
# much better durability. For instance using the default data fsync policy
# (see later in the config file) Redis can lose just one second of writes in a
# dramatic event like a server power outage, or a single write if something
# wrong with the Redis process itself happens, but the operating system is
# still running correctly.
#
# AOF and RDB persistence can be enabled at the same time without problems.
# If the AOF is enabled on startup Redis will load the AOF, that is the file
# with the better durability guarantees.
#
# Please check http://redis.io/topics/persistence for more information.
appendonly no
# The name of the append only file (default: "appendonly.aof")
# appendfilename appendonly.aof
# The fsync() call tells the Operating System to actually write data on disk
# instead to wait for more data in the output buffer. Some OS will really flush
# data on disk, some other OS will just try to do it ASAP.
#
# Redis supports three different modes:
#
# no: don't fsync, just let the OS flush the data when it wants. Faster.
# always: fsync after every write to the append only log . Slow, Safest.
# everysec: fsync only one time every second. Compromise.
#
# The default is "everysec", as that's usually the right compromise between
# speed and data safety. It's up to you to understand if you can relax this to
# "no" that will let the operating system flush the output buffer when
# it wants, for better performances (but if you can live with the idea of
# some data loss consider the default persistence mode that's snapshotting),
# or on the contrary, use "always" that's very slow but a bit safer than
# everysec.
#
# More details please check the following article:
# http://antirez.com/post/redis-persistence-demystified.html
#
# If unsure, use "everysec".
# appendfsync always
appendfsync everysec
# appendfsync no
# When the AOF fsync policy is set to always or everysec, and a background
# saving process (a background save or AOF log background rewriting) is
# performing a lot of I/O against the disk, in some Linux configurations
# Redis may block too long on the fsync() call. Note that there is no fix for
# this currently, as even performing fsync in a different thread will block
# our synchronous write(2) call.
#
# In order to mitigate this problem it's possible to use the following option
# that will prevent fsync() from being called in the main process while a
# BGSAVE or BGREWRITEAOF is in progress.
#
# This means that while another child is saving, the durability of Redis is
# the same as "appendfsync none". In practical terms, this means that it is
# possible to lose up to 30 seconds of log in the worst scenario (with the
# default Linux settings).
#
# If you have latency problems turn this to "yes". Otherwise leave it as
# "no" that is the safest pick from the point of view of durability.
no-appendfsync-on-rewrite no
# Automatic rewrite of the append only file.
# Redis is able to automatically rewrite the log file implicitly calling
# BGREWRITEAOF when the AOF log size grows by the specified percentage.
#
# This is how it works: Redis remembers the size of the AOF file after the
# latest rewrite (if no rewrite has happened since the restart, the size of
# the AOF at startup is used).
#
# This base size is compared to the current size. If the current size is
# bigger than the specified percentage, the rewrite is triggered. Also
# you need to specify a minimal size for the AOF file to be rewritten, this
# is useful to avoid rewriting the AOF file even if the percentage increase
# is reached but it is still pretty small.
#
# Specify a percentage of zero in order to disable the automatic AOF
# rewrite feature.
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
################################ LUA SCRIPTING ###############################
# Max execution time of a Lua script in milliseconds.
#
# If the maximum execution time is reached Redis will log that a script is
# still in execution after the maximum allowed time and will start to
# reply to queries with an error.
#
# When a long running script exceed the maximum execution time only the
# SCRIPT KILL and SHUTDOWN NOSAVE commands are available. The first can be
# used to stop a script that did not yet called write commands. The second
# is the only way to shut down the server in the case a write commands was
# already issue by the script but the user don't want to wait for the natural
# termination of the script.
#
# Set it to 0 or a negative value for unlimited execution without warnings.
lua-time-limit 5000
################################## SLOW LOG ###################################
# The Redis Slow Log is a system to log queries that exceeded a specified
# execution time. The execution time does not include the I/O operations
# like talking with the client, sending the reply and so forth,
# but just the time needed to actually execute the command (this is the only
# stage of command execution where the thread is blocked and can not serve
# other requests in the meantime).
#
# You can configure the slow log with two parameters: one tells Redis
# what is the execution time, in microseconds, to exceed in order for the
# command to get logged, and the other parameter is the length of the
# slow log. When a new command is logged the oldest one is removed from the
# queue of logged commands.
# The following time is expressed in microseconds, so 1000000 is equivalent
# to one second. Note that a negative number disables the slow log, while
# a value of zero forces the logging of every command.
slowlog-log-slower-than 10000
# There is no limit to this length. Just be aware that it will consume memory.
# You can reclaim memory used by the slow log with SLOWLOG RESET.
slowlog-max-len 128
############################### ADVANCED CONFIG ###############################
# Hashes are encoded using a memory efficient data structure when they have a
# small number of entries, and the biggest entry does not exceed a given
# threshold. These thresholds can be configured using the following directives.
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
# Similarly to hashes, small lists are also encoded in a special way in order
# to save a lot of space. The special representation is only used when
# you are under the following limits:
list-max-ziplist-entries 512
list-max-ziplist-value 64
# Sets have a special encoding in just one case: when a set is composed
# of just strings that happens to be integers in radix 10 in the range
# of 64 bit signed integers.
# The following configuration setting sets the limit in the size of the
# set in order to use this special memory saving encoding.
set-max-intset-entries 512
# Similarly to hashes and lists, sorted sets are also specially encoded in
# order to save a lot of space. This encoding is only used when the length and
# elements of a sorted set are below the following limits:
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
# Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in
# order to help rehashing the main Redis hash table (the one mapping top-level
# keys to values). The hash table implementation Redis uses (see dict.c)
# performs a lazy rehashing: the more operation you run into an hash table
# that is rehashing, the more rehashing "steps" are performed, so if the
# server is idle the rehashing is never complete and some more memory is used
# by the hash table.
#
# The default is to use this millisecond 10 times every second in order to
# active rehashing the main dictionaries, freeing memory when possible.
#
# If unsure:
# use "activerehashing no" if you have hard latency requirements and it is
# not a good thing in your environment that Redis can reply form time to time
# to queries with 2 milliseconds delay.
#
# use "activerehashing yes" if you don't have such hard requirements but
# want to free memory asap when possible.
activerehashing yes
# The client output buffer limits can be used to force disconnection of clients
# that are not reading data from the server fast enough for some reason (a
# common reason is that a Pub/Sub client can't consume messages as fast as the
# publisher can produce them).
#
# The limit can be set differently for the three different classes of clients:
#
# normal -> normal clients
# slave -> slave clients and MONITOR clients
# pubsub -> clients subcribed to at least one pubsub channel or pattern
#
# The syntax of every client-output-buffer-limit directive is the following:
#
# client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
#
# A client is immediately disconnected once the hard limit is reached, or if
# the soft limit is reached and remains reached for the specified number of
# seconds (continuously).
# So for instance if the hard limit is 32 megabytes and the soft limit is
# 16 megabytes / 10 seconds, the client will get disconnected immediately
# if the size of the output buffers reach 32 megabytes, but will also get
# disconnected if the client reaches 16 megabytes and continuously overcomes
# the limit for 10 seconds.
#
# By default normal clients are not limited because they don't receive data
# without asking (in a push way), but just after a request, so only
# asynchronous clients may create a scenario where data is requested faster
# than it can read.
#
# Instead there is a default limit for pubsub and slave clients, since
# subscribers and slaves receive data in a push fashion.
#
# Both the hard or the soft limit can be disabled by setting them to zero.
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60
# Redis calls an internal function to perform many background tasks, like
# closing connections of clients in timeot, purging expired keys that are
# never requested, and so forth.
#
# Not all tasks are perforemd with the same frequency, but Redis checks for
# tasks to perform accordingly to the specified "hz" value.
#
# By default "hz" is set to 10. Raising the value will use more CPU when
# Redis is idle, but at the same time will make Redis more responsive when
# there are many keys expiring at the same time, and timeouts may be
# handled with more precision.
#
# The range is between 1 and 500, however a value over 100 is usually not
# a good idea. Most users should use the default of 10 and raise this up to
# 100 only in environments where very low latency is required.
hz 10
# When a child rewrites the AOF file, if the following option is enabled
# the file will be fsync-ed every 32 MB of data generated. This is useful
# in order to commit the file to the disk more incrementally and avoid
# big latency spikes.
#aof-rewrite-incremental-fsync yes
################################## INCLUDES ###################################
# Include one or more other config files here. This is useful if you
# have a standard template that goes to all Redis server but also need
# to customize a few per-server settings. Include files can include
# other files, so use this wisely.
#
# include /path/to/local.conf
# include /path/to/other.conf
# Generated by CONFIG REWRITE
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6399
#notify-keyspace-events "xE"
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6389
#slaveof 127.0.0.1 6389
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6399
#slaveof 127.0.0.1 6389
#slaveof 127.0.0.1 6389
#slaveof 127.0.0.1 6399
notify-keyspace-events "xE"
Loading…
Cancel
Save