Merge branch 'mrniko/master' into live-object

# Conflicts:
#	src/main/java/org/redisson/Redisson.java
pull/527/head
jackygurui 9 years ago
commit aa754a9799

@ -2,6 +2,13 @@ Redisson Releases History
================================
####Please Note: trunk is current development branch.
####08-Jun-2016 - version 2.2.15 released
Improvement - Performance boost up to 30% for `RSortedSet.add` method
Fixed - auth during reconnection (thanks to fransiskusx)
Fixed - Infinity loop with iterator
Fixed - NPE in `RSortedSet`
Fixed - `RSortedSet.remove` and `iterator.remove` methods can break elements ordering
####27-May-2016 - version 2.2.14 released
Redisson Team is pleased to announce [Redisson PRO](http://redisson.pro) edition. This version is based on open-source edition and has 24x7 support and some features.

@ -83,12 +83,12 @@ Include the following to your dependency list:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.2.14</version>
<version>2.2.15</version>
</dependency>
### Gradle
compile 'org.redisson:redisson:2.2.14'
compile 'org.redisson:redisson:2.2.15'
### Supported by

@ -3,7 +3,7 @@
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.2.15-SNAPSHOT</version>
<version>2.2.16-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>Redisson</name>
@ -100,33 +100,33 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.0.36.Final</version>
<version>4.0.37.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.0.36.Final</version>
<version>4.0.37.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>4.0.36.Final</version>
<version>4.0.37.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.0.36.Final</version>
<version>4.0.37.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.0.36.Final</version>
<version>4.0.37.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.0.36.Final</version>
<version>4.0.37.Final</version>
</dependency>
<dependency>
@ -138,7 +138,7 @@
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.3.0</version>
<version>3.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
@ -150,7 +150,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
@ -175,19 +175,19 @@
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>jackson-dataformat-msgpack</artifactId>
<version>0.8.2</version>
<version>0.8.7</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.2.1</version>
<version>1.1.2.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.43</version>
<version>2.47</version>
<scope>provided</scope>
</dependency>
<dependency>
@ -199,7 +199,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.14</version>
<version>1.7.21</version>
</dependency>
<dependency>

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
@ -79,7 +80,6 @@ import org.redisson.liveobject.RAttachedLiveObjectService;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.PlatformDependent;
import java.util.concurrent.TimeUnit;
/**
* Main infrastructure class allows to get access
@ -409,15 +409,25 @@ public class Redisson implements RedissonClient {
public RRemoteService getRemoteSerivce(String name) {
return new RedissonRemoteService(this, name);
}
@Override
public RRemoteService getRemoteSerivce(Codec codec) {
return new RedissonRemoteService(codec, this);
}
@Override
public RRemoteService getRemoteSerivce(String name, Codec codec) {
return new RedissonRemoteService(codec, this, name);
}
@Override
public <V> RSortedSet<V> getSortedSet(String name) {
return new RedissonSortedSet<V>(commandExecutor, name);
return new RedissonSortedSet<V>(commandExecutor, name, this);
}
@Override
public <V> RSortedSet<V> getSortedSet(String name, Codec codec) {
return new RedissonSortedSet<V>(codec, commandExecutor, name);
return new RedissonSortedSet<V>(codec, commandExecutor, name, this);
}
@Override

@ -73,12 +73,18 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty() && tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
continue;
if (firstValues.isEmpty()) {
if (tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
continue;
}
if (res.getPos() == 0) {
finished = true;
return false;
}
}
} else if (lastValues.removeAll(firstValues)) {
currentElementRemoved = false;

@ -82,12 +82,18 @@ abstract class RedissonBaseMapIterator<K, V, M> implements Iterator<M> {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty() && tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
continue;
if (firstValues.isEmpty()) {
if (tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
continue;
}
if (res.getPos() == 0) {
finished = true;
return false;
}
}
} else if (lastValues.keySet().removeAll(firstValues.keySet())) {
free(firstValues);

@ -27,20 +27,21 @@ import org.redisson.core.RBitSetAsync;
import org.redisson.core.RBlockingDequeAsync;
import org.redisson.core.RBlockingQueueAsync;
import org.redisson.core.RBucketAsync;
import org.redisson.core.RMapCacheAsync;
import org.redisson.core.RDequeAsync;
import org.redisson.core.RGeoAsync;
import org.redisson.core.RHyperLogLogAsync;
import org.redisson.core.RKeysAsync;
import org.redisson.core.RLexSortedSetAsync;
import org.redisson.core.RListAsync;
import org.redisson.core.RListMultimap;
import org.redisson.core.RMapAsync;
import org.redisson.core.RMapCacheAsync;
import org.redisson.core.RMultimapAsync;
import org.redisson.core.RMultimapCacheAsync;
import org.redisson.core.RQueueAsync;
import org.redisson.core.RScoredSortedSetAsync;
import org.redisson.core.RScriptAsync;
import org.redisson.core.RSetAsync;
import org.redisson.core.RSetCacheAsync;
import org.redisson.core.RSetMultimap;
import org.redisson.core.RTopicAsync;
import io.netty.util.concurrent.Future;
@ -232,24 +233,54 @@ public class RedissonBatch implements RBatch {
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
public <K, V> RMultimapAsync<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(executorService, name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec) {
public <K, V> RMultimapAsync<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(codec, executorService, name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name) {
public <K, V> RMultimapAsync<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(executorService, name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec) {
public <K, V> RMultimapAsync<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(codec, executorService, name);
}
@Override
public <V> RGeoAsync<V> getGeo(String name) {
return new RedissonGeo<V>(executorService, name);
}
@Override
public <V> RGeoAsync<V> getGeo(String name, Codec codec) {
return new RedissonGeo<V>(codec, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getListMultimapCache(String name) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getListMultimapCache(String name, Codec codec) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, executorService, name);
}
}

@ -614,6 +614,14 @@ public interface RedissonClient {
* @return
*/
RRemoteService getRemoteSerivce();
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
* and uses provided codec for method arguments and result.
*
* @return
*/
RRemoteService getRemoteSerivce(Codec codec);
/**
* Returns object for remote operations prefixed with the specified name
@ -622,6 +630,15 @@ public interface RedissonClient {
* @return
*/
RRemoteService getRemoteSerivce(String name);
/**
* Returns object for remote operations prefixed with the specified name
* and uses provided codec for method arguments and result.
*
* @param name The name used as the Redis key prefix for the services
* @return
*/
RRemoteService getRemoteSerivce(String name, Codec codec);
/**
* Return batch object which executes group of

@ -328,8 +328,6 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
@Override
public V remove(int index) {
checkIndex(index);
if (index == 0) {
Future<V> f = commandExecutor.writeAsync(getName(), codec, LPOP, getName());
return get(f);
@ -337,18 +335,26 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
Future<V> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"local tail = redis.call('lrange', KEYS[1], ARGV[1] + 1, -1);" +
"redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" +
"if #tail > 0 then " +
"for i=1, #tail, 5000 do "
+ "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); "
+ "end "
+ "end;" +
"return v",
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" +
"return v",
Collections.<Object>singletonList(getName()), index);
return get(f);
}
@Override
public void fastRemove(int index) {
get(fastRemoveAsync(index));
}
@Override
public Future<Void> fastRemoveAsync(int index) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');",
Collections.<Object>singletonList(getName()), index);
}
@Override
public int indexOf(Object o) {
return get(indexOfAsync(o));

@ -433,8 +433,6 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
@Override
public V remove(int index) {
checkIndex(index);
if (index == 0) {
Future<V> f = commandExecutor.writeAsync(getName(), codec, LPOP, getName());
return get(f);
@ -442,18 +440,26 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
Future<V> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"local tail = redis.call('lrange', KEYS[1], ARGV[1] + 1, -1);" +
"redis.call('ltrim', KEYS[1], 0, ARGV[1] - 1);" +
"if #tail > 0 then " +
"for i=1, #tail, 5000 do "
+ "redis.call('rpush', KEYS[1], unpack(tail, i, math.min(i+4999, #tail))); "
+ "end "
+ "end;" +
"return v",
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" +
"return v",
Collections.<Object>singletonList(getName()), index);
return get(f);
}
@Override
public void fastRemove(int index) {
get(fastRemoveAsync(index));
}
@Override
public Future<Void> fastRemoveAsync(int index) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');",
Collections.<Object>singletonList(getName()), index);
}
@Override
public int indexOf(Object o) {
return get(indexOfAsync(o));

@ -38,13 +38,9 @@ import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.core.RMultimap;
import org.redisson.misc.Hash;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import net.openhft.hashing.LongHashFunction;
/**
* @author Nikita Koksharov
@ -63,16 +59,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
}
protected String hash(byte[] objectState) {
long h1 = LongHashFunction.farmUo().hashBytes(objectState);
long h2 = LongHashFunction.xx_r39().hashBytes(objectState);
ByteBuf buf = Unpooled.buffer((2 * Long.SIZE) / Byte.SIZE).writeLong(h1).writeLong(h2);
ByteBuf b = Base64.encode(buf);
String s = b.toString(CharsetUtil.UTF_8);
b.release();
buf.release();
return s.substring(0, s.length() - 2);
return Hash.hashToBase64(objectState);
}
@Override

@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueue;
@ -64,6 +65,7 @@ public class RedissonRemoteService implements RRemoteService {
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Codec codec;
private final Redisson redisson;
private final String name;
@ -72,6 +74,15 @@ public class RedissonRemoteService implements RRemoteService {
}
public RedissonRemoteService(Redisson redisson, String name) {
this(null, redisson, name);
}
public RedissonRemoteService(Codec codec, Redisson redisson) {
this(codec, redisson, "redisson_remote_service");
}
public RedissonRemoteService(Codec codec, Redisson redisson, String name) {
this.codec = codec;
this.redisson = redisson;
this.name = name;
}
@ -96,10 +107,17 @@ public class RedissonRemoteService implements RRemoteService {
for (int i = 0; i < executorsAmount; i++) {
String requestQueueName = name + ":{" + remoteInterface.getName() + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
subscribe(remoteInterface, requestQueue);
}
}
private Codec getCodec() {
if (codec != null) {
return codec;
}
return redisson.getConfig().getCodec();
}
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) {
Future<RemoteServiceRequest> take = requestQueue.takeAsync();
@ -141,7 +159,7 @@ public class RedissonRemoteService implements RRemoteService {
+ "return 1;"
+ "end;"
+ "return 0;", RScript.ReturnType.BOOLEAN, Arrays.<Object>asList(ackName, responseName),
redisson.getConfig().getCodec().getValueEncoder().encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis());
getCodec().getValueEncoder().encode(new RemoteServiceAck()), request.getOptions().getAckTimeoutInMillis());
// Future<List<?>> ackClientsFuture = send(request.getOptions().getAckTimeoutInMillis(), responseName, new RemoteServiceAck());
// ackClientsFuture.addListener(new FutureListener<List<?>>() {
ackClientsFuture.addListener(new FutureListener<Boolean>() {
@ -271,7 +289,7 @@ public class RedissonRemoteService implements RRemoteService {
final Promise<Object> result = ImmediateEventExecutor.INSTANCE.newPromise();
String requestQueueName = name + ":{" + interfaceName + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
final RemoteServiceRequest request = new RemoteServiceRequest(requestId,
method.getName(), args, optionsCopy, System.currentTimeMillis());
Future<Boolean> addFuture = requestQueue.addAsync(request);
@ -287,7 +305,7 @@ public class RedissonRemoteService implements RRemoteService {
final RBlockingQueue<? extends RRemoteServiceResponse> responseQueue;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
String responseName = name + ":{" + interfaceName + "}:" + requestId;
responseQueue = redisson.getBlockingQueue(responseName);
responseQueue = redisson.getBlockingQueue(responseName, getCodec());
} else {
responseQueue = null;
}
@ -408,7 +426,7 @@ public class RedissonRemoteService implements RRemoteService {
String requestId = generateRequestId();
String requestQueueName = name + ":{" + interfaceName + "}";
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, getCodec());
RemoteServiceRequest request = new RemoteServiceRequest(requestId,
method.getName(), args, optionsCopy, System.currentTimeMillis());
requestQueue.add(request);
@ -416,7 +434,7 @@ public class RedissonRemoteService implements RRemoteService {
RBlockingQueue<RRemoteServiceResponse> responseQueue = null;
if (optionsCopy.isAckExpected() || optionsCopy.isResultExpected()) {
String responseName = name + ":{" + interfaceName + "}:" + requestId;
responseQueue = redisson.getBlockingQueue(responseName);
responseQueue = redisson.getBlockingQueue(responseName, getCodec());
}
// poll for the ack only if expected

@ -19,28 +19,25 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedSet;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.core.RBucket;
import org.redisson.core.RLock;
import org.redisson.core.RSortedSet;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
/**
@ -95,34 +92,36 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
private Comparator<? super V> comparator = NaturalComparator.NATURAL_ORDER;
CommandExecutor commandExecutor;
private RLock lock;
private RedissonList<V> list;
private RBucket<String> comparatorHolder;
protected RedissonSortedSet(CommandExecutor commandExecutor, String name) {
protected RedissonSortedSet(CommandExecutor commandExecutor, String name, Redisson redisson) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE);
lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}");
list = (RedissonList<V>) redisson.getList(getName());
loadComparator();
}
public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name) {
public RedissonSortedSet(Codec codec, CommandExecutor commandExecutor, String name, Redisson redisson) {
super(codec, commandExecutor, name);
this.commandExecutor = commandExecutor;
comparatorHolder = redisson.getBucket(getComparatorKeyName(), StringCodec.INSTANCE);
lock = redisson.getLock("redisson_sortedset_lock:{" + getName() + "}");
list = (RedissonList<V>) redisson.getList(getName());
loadComparator();
}
private void loadComparator() {
commandExecutor.read(getName(), codec, new SyncOperation<Void>() {
@Override
public Void execute(Codec codec, RedisConnection conn) {
loadComparator(conn);
return null;
}
});
}
private void loadComparator(RedisConnection connection) {
try {
String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName());
String comparatorSign = comparatorHolder.get();
if (comparatorSign != null) {
String[] parts = comparatorSign.split(":");
String className = parts[0];
@ -136,6 +135,8 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
Class<?> clazz = Class.forName(className);
comparator = (Comparator<V>) clazz.newInstance();
}
} catch (IllegalStateException e) {
throw e;
} catch (Exception e) {
throw new IllegalStateException(e);
}
@ -163,26 +164,17 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public int size() {
return commandExecutor.read(getName(), codec, RedisCommands.LLEN_INT, getName());
}
private int size(RedisConnection connection) {
return connection.sync(RedisCommands.LLEN_INT, getName()).intValue();
return list.size();
}
@Override
public boolean isEmpty() {
return size() == 0;
return list.isEmpty();
}
@Override
public boolean contains(final Object o) {
return commandExecutor.read(getName(), codec, new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return binarySearch((V)o, codec, conn).getIndex() >= 0;
}
});
return binarySearch((V)o, codec).getIndex() >= 0;
}
public Iterator<V> iterator() {
@ -206,7 +198,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
currentIndex++;
removeExecuted = false;
currentElement = RedissonSortedSet.this.get(currentIndex);
currentElement = RedissonSortedSet.this.list.getValue(currentIndex);
return currentElement;
}
@ -223,57 +215,24 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
};
}
private V get(int index) {
return commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), index);
}
@Override
public Object[] toArray() {
List<V> res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1);
return res.toArray();
return list.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
List<V> res = commandExecutor.read(getName(), codec, RedisCommands.LRANGE, getName(), 0, -1);
return res.toArray(a);
return list.toArray(a);
}
@Override
public boolean add(final V value) {
return commandExecutor.write(getName(), codec, new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return add(value, codec, conn);
}
});
}
public Future<Boolean> addAsync(final V value) {
final Promise<Boolean> promise = commandExecutor.getConnectionManager().newPromise();
commandExecutor.getConnectionManager().getGroup().execute(new Runnable() {
public void run() {
try {
boolean res = add(value);
promise.setSuccess(res);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
}
boolean add(V value, Codec codec, RedisConnection connection) {
while (true) {
connection.sync(RedisCommands.WATCH, getName(), getComparatorKeyName());
checkComparator(connection);
BinarySearchResult<V> res = binarySearch(value, codec, connection);
if (res.getIndex() == null) {
continue;
}
public boolean add(V value) {
lock.lock();
try {
checkComparator();
BinarySearchResult<V> res = binarySearch(value, codec);
if (res.getIndex() < 0) {
int index = -(res.getIndex() + 1);
@ -284,45 +243,47 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
throw new IllegalArgumentException(e);
}
connection.sync(RedisCommands.MULTI);
connection.sync(RedisCommands.EVAL_VOID,
"local len = redis.call('llen', KEYS[1]);"
+ "if tonumber(ARGV[1]) < len then "
+ "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);"
+ "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);"
+ "return;"
+ "end;"
+ "redis.call('rpush', KEYS[1], ARGV[2]);", 1, getName(), index, encodedValue);
List<Object> re = connection.sync(codec, RedisCommands.EXEC);
if (re.size() == 1) {
return true;
}
commandExecutor.evalWrite(getName(), RedisCommands.EVAL_VOID,
"local len = redis.call('llen', KEYS[1]);"
+ "if tonumber(ARGV[1]) < len then "
+ "local pivot = redis.call('lindex', KEYS[1], ARGV[1]);"
+ "redis.call('linsert', KEYS[1], 'before', pivot, ARGV[2]);"
+ "return;"
+ "end;"
+ "redis.call('rpush', KEYS[1], ARGV[2]);", Arrays.<Object>asList(getName()), index, encodedValue);
return true;
} else {
connection.sync(RedisCommands.UNWATCH);
return false;
}
} finally {
lock.unlock();
}
}
private void checkComparator(RedisConnection connection) {
String comparatorSign = connection.sync(StringCodec.INSTANCE, RedisCommands.GET, getComparatorKeyName());
private void checkComparator() {
String comparatorSign = comparatorHolder.get();
if (comparatorSign != null) {
String[] vals = comparatorSign.split(":");
String className = vals[0];
if (!comparator.getClass().getName().equals(className)) {
loadComparator(connection);
loadComparator();
}
}
}
public static double calcIncrement(double value) {
BigDecimal b = BigDecimal.valueOf(value);
BigDecimal r = b.remainder(BigDecimal.ONE);
if (r.compareTo(BigDecimal.ZERO) == 0) {
return 1;
}
double res = 1/Math.pow(10, r.scale());
return res;
public Future<Boolean> addAsync(final V value) {
final Promise<Boolean> promise = newPromise();
commandExecutor.getConnectionManager().getGroup().execute(new Runnable() {
public void run() {
try {
boolean res = add(value);
promise.setSuccess(res);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
return promise;
}
@Override
@ -346,44 +307,21 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
@Override
public boolean remove(final Object value) {
return commandExecutor.write(getName(), codec, new SyncOperation<Boolean>() {
@Override
public Boolean execute(Codec codec, RedisConnection conn) {
return remove(value, codec, conn);
}
});
}
public boolean remove(Object value) {
lock.lock();
boolean remove(Object value, Codec codec, RedisConnection conn) {
while (true) {
conn.sync(RedisCommands.WATCH, getName());
BinarySearchResult<V> res = binarySearch((V) value, codec, conn);
if (res.getIndex() == null) {
conn.sync(RedisCommands.UNWATCH);
continue;
}
try {
checkComparator();
BinarySearchResult<V> res = binarySearch((V) value, codec);
if (res.getIndex() < 0) {
conn.sync(RedisCommands.UNWATCH);
return false;
}
conn.sync(RedisCommands.MULTI);
if (res.getIndex() == 0) {
conn.sync(codec, RedisCommands.LPOP, getName());
} else {
conn.sync(RedisCommands.EVAL_VOID,
"local len = redis.call('llen', KEYS[1]);"
+ "local tail = redis.call('lrange', KEYS[1], tonumber(ARGV[1]) + 1, len);"
+ "redis.call('ltrim', KEYS[1], 0, tonumber(ARGV[1]) - 1);"
+ "if #tail > 0 then "
+ "redis.call('rpush', KEYS[1], unpack(tail)); "
+ "end;", 1, getName(), res.getIndex());
}
if (((List<Object>)conn.sync(codec, RedisCommands.EXEC)).size() == 1) {
return true;
}
list.remove((int)res.getIndex());
return true;
} finally {
lock.unlock();
}
}
@ -460,7 +398,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public V first() {
V res = commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), 0);
V res = list.getValue(0);
if (res == null) {
throw new NoSuchElementException();
}
@ -469,7 +407,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
@Override
public V last() {
V res = commandExecutor.read(getName(), codec, RedisCommands.LINDEX, getName(), -1);
V res = list.getValue(-1);
if (res == null) {
throw new NoSuchElementException();
}
@ -477,7 +415,7 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
private String getComparatorKeyName() {
return "redisson__sortedset__comparator__{" + getName() + "}";
return "redisson_sortedset_comparator:{" + getName() + "}";
}
@Override
@ -498,15 +436,15 @@ public class RedissonSortedSet<V> extends RedissonObject implements RSortedSet<V
}
return res;
}
public BinarySearchResult<V> binarySearch(V value, Codec codec, RedisConnection connection) {
int size = size(connection);
public BinarySearchResult<V> binarySearch(V value, Codec codec) {
int size = list.size();
int upperIndex = size - 1;
int lowerIndex = 0;
while (lowerIndex <= upperIndex) {
int index = lowerIndex + (upperIndex - lowerIndex) / 2;
V res = connection.sync(codec, RedisCommands.LINDEX, getName(), index);
V res = list.getValue(index);
if (res == null) {
return new BinarySearchResult<V>();
}

@ -1,25 +0,0 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
public interface SyncOperation<R> {
R execute(Codec codec, RedisConnection conn);
}

@ -226,9 +226,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
e = new MasterSlaveEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config);
List<Future<Void>> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses());
futures.addAll(fs);
if (!partition.getSlaveAddresses().isEmpty()) {
List<Future<Void>> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses());
futures.addAll(fs);
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
if (!partition.getFailedSlaveAddresses().isEmpty()) {
log.warn("slaves: {} is down for slot ranges: {}", partition.getFailedSlaveAddresses(), partition.getSlotRanges());
@ -403,7 +403,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
currentPart.addSlaveAddress(uri);
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
}
});
}

@ -18,7 +18,6 @@ package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.List;
import org.redisson.SyncOperation;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
@ -42,10 +41,6 @@ public interface CommandSyncExecutor {
<T, R> R read(String key, RedisCommand<T> command, Object ... params);
<R> R read(String key, Codec codec, SyncOperation<R> operation);
<R> R write(String key, Codec codec, SyncOperation<R> operation);
<T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

@ -18,23 +18,13 @@ package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.List;
import org.redisson.SyncOperation;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
/**
*
@ -100,70 +90,6 @@ public class CommandSyncService extends CommandAsyncService implements CommandEx
return get(res);
}
@Override
public <R> R write(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return sync(false, codec, new NodeSource(slot), operation, 0);
}
@Override
public <R> R read(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return sync(true, codec, new NodeSource(slot), operation, 0);
}
<R> R sync(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
throw new IllegalStateException("Redisson is shutdown");
}
try {
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(source, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(source, null);
}
connectionFuture.syncUninterruptibly();
RedisConnection connection = connectionFuture.getNow();
try {
return operation.execute(codec, connection);
} catch (RedisMovedException e) {
return sync(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.MOVED), operation, attempt);
} catch (RedisAskException e) {
return sync(readOnlyMode, codec, new NodeSource(e.getSlot(), e.getAddr(), Redirect.ASK), operation, attempt);
} catch (RedisLoadingException e) {
return sync(readOnlyMode, codec, source, operation, attempt);
} catch (RedisTimeoutException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
attempt++;
return sync(readOnlyMode, codec, source, operation, attempt);
} finally {
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
connectionManager.releaseRead(source, connection);
} else {
connectionManager.releaseWrite(source, connection);
}
}
} catch (RedisException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
try {
Thread.sleep(connectionManager.getConfig().getRetryInterval());
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
attempt++;
return sync(readOnlyMode, codec, source, operation, attempt);
}
}
@Override
public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> res = writeAsync(key, codec, command, params);

@ -19,6 +19,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.ReconnectListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
@ -186,6 +187,10 @@ public class ClientConnectionsEntry {
});
}
public MasterSlaveServersConfig getConfig() {
return connectionManager.getConfig();
}
public Future<RedisPubSubConnection> connectPubSub() {
final Promise<RedisPubSubConnection> connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise();
Future<RedisPubSubConnection> future = client.connectPubSubAsync();

@ -93,7 +93,7 @@ public interface ConnectionManager {
Future<PubSubConnectionEntry> psubscribe(String pattern, Codec codec);
Codec unsubscribe(String channelName);
Future<Codec> unsubscribe(String channelName);
Codec punsubscribe(String channelName);

@ -464,13 +464,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public Codec unsubscribe(final String channelName) {
public Future<Codec> unsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
Codec entryCodec = entry.getConnection().getChannels().get(channelName);
final Promise<Codec> result = newPromise();
final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
@Override
@ -481,13 +482,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
releaseSubscribeConnection(0, entry);
}
}
result.setSuccess(entryCodec);
return true;
}
return false;
}
});
return entryCodec;
return result;
}
@Override

@ -180,26 +180,34 @@ public class MasterSlaveEntry {
}
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener> listeners) {
Codec subscribeCodec = connectionManager.unsubscribe(channelName);
if (!listeners.isEmpty()) {
Future<PubSubConnectionEntry> future = connectionManager.subscribe(subscribeCodec, channelName, null);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
Future<Codec> unsubscribeFuture = connectionManager.unsubscribe(channelName);
unsubscribeFuture.addListener(new FutureListener<Codec>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
if (listeners.isEmpty()) {
return;
}
});
}
Codec subscribeCodec = future.getNow();
Future<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
});
}
});
}
private void reattachPatternPubSubListeners(final String channelName,

@ -326,13 +326,12 @@ abstract class ConnectionPool<T extends RedisConnection> {
return;
}
Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING);
f.addListener(new FutureListener<String>() {
final FutureListener<String> pingListener = new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) throws Exception {
try {
if (entry.getFreezeReason() != FreezeReason.RECONNECT
|| !entry.isFreezed()) {
|| !entry.isFreezed()) {
return;
}
@ -342,7 +341,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
promise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future)
throws Exception {
throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr());
@ -365,13 +364,32 @@ abstract class ConnectionPool<T extends RedisConnection> {
c.closeAsync();
}
}
});
};
if (entry.getConfig().getPassword() != null) {
Future<Void> temp = c.asyncWithTimeout(null, RedisCommands.AUTH, config.getPassword());
FutureListener<Void> listener = new FutureListener<Void> () {
@Override public void operationComplete (Future < Void > future)throws Exception {
ping(c, pingListener);
}
};
temp.addListener(listener);
} else {
ping(c, pingListener);
}
}
});
}
}, config.getReconnectionTimeout(), TimeUnit.MILLISECONDS);
}
private void ping(RedisConnection c, final FutureListener<String> pingListener) {
Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING);
f.addListener(pingListener);
}
public void returnConnection(ClientConnectionsEntry entry, T connection) {
if (entry.isFreezed()) {
connection.closeAsync();

@ -37,13 +37,31 @@ import io.netty.util.concurrent.Future;
*/
public interface RBatch {
/**
* Returns geospatial items holder instance by <code>name</code>.
*
* @param name
* @return
*/
<V> RGeoAsync<V> getGeo(String name);
/**
* Returns geospatial items holder instance by <code>name</code>
* using provided codec for geospatial members.
*
* @param name
* @param geospatial member codec
* @return
*/
<V> RGeoAsync<V> getGeo(String name, Codec codec);
/**
* Returns Set based MultiMap instance by name.
*
* @param name
* @return
*/
<K, V> RSetMultimap<K, V> getSetMultimap(String name);
<K, V> RMultimapAsync<K, V> getSetMultimap(String name);
/**
* Returns Set based MultiMap instance by name
@ -53,7 +71,30 @@ public interface RBatch {
* @param codec
* @return
*/
<K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec);
<K, V> RMultimapAsync<K, V> getSetMultimap(String name, Codec codec);
/**
* Returns Set based Multimap instance by name.
* Supports key-entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSetMultimap(String)}.</p>
*
* @param name
* @return
*/
<K, V> RMultimapCacheAsync<K, V> getSetMultimapCache(String name);
/**
* Returns Set based Multimap instance by name
* using provided codec for both map keys and values.
* Supports key-entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSetMultimap(String, Codec)}.</p>
*
* @param name
* @return
*/
<K, V> RMultimapCacheAsync<K, V> getSetMultimapCache(String name, Codec codec);
/**
* Returns set-based cache instance by <code>name</code>.
@ -142,7 +183,7 @@ public interface RBatch {
* @param name
* @return
*/
<K, V> RListMultimap<K, V> getListMultimap(String name);
<K, V> RMultimapAsync<K, V> getListMultimap(String name);
/**
* Returns List based MultiMap instance by name
@ -152,7 +193,30 @@ public interface RBatch {
* @param codec
* @return
*/
<K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec);
<K, V> RMultimapAsync<K, V> getListMultimap(String name, Codec codec);
/**
* Returns List based Multimap instance by name.
* Supports key-entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSetMultimap(String)}.</p>
*
* @param name
* @return
*/
<K, V> RMultimapAsync<K, V> getListMultimapCache(String name);
/**
* Returns List based Multimap instance by name
* using provided codec for both map keys and values.
* Supports key-entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getSetMultimap(String, Codec)}.</p>
*
* @param name
* @return
*/
<K, V> RMultimapAsync<K, V> getListMultimapCache(String name, Codec codec);
/**
* Returns map instance by name.

@ -74,4 +74,6 @@ public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RandomAcce
*/
void trim(int fromIndex, int toIndex);
void fastRemove(int index);
}

@ -85,4 +85,6 @@ public interface RListAsync<V> extends RCollectionAsync<V>, RandomAccess {
*/
Future<Void> trimAsync(int fromIndex, int toIndex);
Future<Void> fastRemoveAsync(int index);
}

@ -0,0 +1,42 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.util.CharsetUtil;
import net.openhft.hashing.LongHashFunction;
public class Hash {
private Hash() {
}
public static String hashToBase64(byte[] objectState) {
long h1 = LongHashFunction.farmUo().hashBytes(objectState);
long h2 = LongHashFunction.xx_r39().hashBytes(objectState);
ByteBuf buf = Unpooled.buffer((2 * Long.SIZE) / Byte.SIZE).writeLong(h1).writeLong(h2);
ByteBuf b = Base64.encode(buf);
String s = b.toString(CharsetUtil.UTF_8);
b.release();
buf.release();
return s.substring(0, s.length() - 2);
}
}

@ -38,7 +38,7 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
// just an assertion
boolean removed = entries.remove(entryName) == entry;
if (removed) {
connectionManager.unsubscribe(channelName);
connectionManager.unsubscribe(channelName).syncUninterruptibly();
}
}
}

@ -15,10 +15,19 @@
*/
package org.redisson.spring.cache;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.redisson.RedissonClient;
import org.redisson.core.RLock;
import org.redisson.core.RMap;
import org.redisson.core.RMapCache;
import org.redisson.misc.Hash;
import org.springframework.cache.Cache;
import org.springframework.cache.support.SimpleValueWrapper;
@ -34,15 +43,19 @@ public class RedissonCache implements Cache {
private final RMap<Object, Object> map;
private CacheConfig config;
private final RedissonClient redisson;
public RedissonCache(RMapCache<Object, Object> mapCache, CacheConfig config) {
public RedissonCache(RedissonClient redisson, RMapCache<Object, Object> mapCache, CacheConfig config) {
this.mapCache = mapCache;
this.map = mapCache;
this.config = config;
this.redisson = redisson;
}
public RedissonCache(RMap<Object, Object> map) {
public RedissonCache(RedissonClient redisson, RMap<Object, Object> map) {
this.map = map;
this.redisson = redisson;
}
@Override
@ -113,4 +126,65 @@ public class RedissonCache implements Cache {
return new SimpleValueWrapper(value);
}
final Map<Object, Lock> valueLoaderLocks = new ConcurrentHashMap<Object, Lock>();
public Lock getLock(Object key) {
Lock lock = valueLoaderLocks.get(key);
if (lock == null) {
Lock newlock = new ReentrantLock();
lock = valueLoaderLocks.putIfAbsent(key, newlock);
if (lock == null) {
lock = newlock;
}
}
return lock;
}
public <T> T get(Object key, Callable<T> valueLoader) {
Object value = map.get(key);
if (value == null) {
String lockName = getLockName(key);
RLock lock = redisson.getLock(lockName);
lock.lock();
try {
value = map.get(key);
if (value == null) {
try {
value = toStoreValue(valueLoader.call());
} catch (Exception ex) {
throw new ValueRetrievalException(key, valueLoader, ex.getCause());
}
map.put(key, value);
}
} finally {
lock.unlock();
}
}
return (T) fromStoreValue(value);
}
private String getLockName(Object key) {
try {
byte[] keyState = redisson.getConfig().getCodec().getMapKeyEncoder().encode(key);
return "{" + map.getName() + "}:" + Hash.hashToBase64(keyState) + ":key";
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
protected Object fromStoreValue(Object storeValue) {
if (storeValue == NullValue.INSTANCE) {
return null;
}
return storeValue;
}
protected Object toStoreValue(Object userValue) {
if (userValue == null) {
return NullValue.INSTANCE;
}
return userValue;
}
}

@ -158,14 +158,14 @@ public class RedissonSpringCacheManager implements CacheManager, ResourceLoaderA
configMap.put(name, config);
RMap<Object, Object> map = createMap(name);
return new RedissonCache(map);
return new RedissonCache(redisson, map);
}
if (config.getMaxIdleTime() == 0 && config.getTTL() == 0) {
RMap<Object, Object> map = createMap(name);
return new RedissonCache(map);
return new RedissonCache(redisson, map);
}
RMapCache<Object, Object> map = createMapCache(name);
return new RedissonCache(map, config);
return new RedissonCache(redisson, map, config);
}
private RMap<Object, Object> createMap(String name) {

@ -3,7 +3,6 @@ package org.redisson;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;

@ -12,6 +12,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
@ -20,6 +21,8 @@ import org.redisson.RedisRunner.RedisProcess;
import org.redisson.core.RBlockingQueue;
import io.netty.util.concurrent.Future;
import static com.jayway.awaitility.Awaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonBlockingQueueTest extends BaseTest {
@ -38,10 +41,12 @@ public class RedissonBlockingQueueTest extends BaseTest {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollTimeout");
Future<Integer> f = queue1.pollAsync(5, TimeUnit.SECONDS);
f.await(1, TimeUnit.SECONDS);
Assert.assertFalse(f.await(1, TimeUnit.SECONDS));
runner.stop();
long start = System.currentTimeMillis();
assertThat(f.get()).isNull();
assertThat(System.currentTimeMillis() - start).isGreaterThan(3800);
}
@Test
@ -56,6 +61,58 @@ public class RedissonBlockingQueueTest extends BaseTest {
config.useSingleServer().setAddress("127.0.0.1:6319");
RedissonClient redisson = Redisson.create(config);
final AtomicBoolean executed = new AtomicBoolean();
Thread t = new Thread() {
public void run() {
try {
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
long start = System.currentTimeMillis();
Integer res = queue1.poll(10, TimeUnit.SECONDS);
assertThat(System.currentTimeMillis() - start).isGreaterThan(2000);
assertThat(res).isEqualTo(123);
executed.set(true);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
};
};
t.start();
t.join(1000);
runner.stop();
runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
Thread.sleep(1000);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
queue1.put(123);
t.join();
await().atMost(5, TimeUnit.SECONDS).until(() -> assertThat(executed.get()).isTrue());
runner.stop();
}
@Test
public void testPollAsyncReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
RedisProcess runner = new RedisRunner()
.port(6319)
.nosave()
.randomDir()
.run();
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6319");
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Future<Integer> f = queue1.pollAsync(10, TimeUnit.SECONDS);
f.await(1, TimeUnit.SECONDS);

@ -0,0 +1,73 @@
package org.redisson;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.redisson.core.RBucket;
import org.redisson.core.RLock;
import org.redisson.core.RSemaphore;
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
public class RedissonLockHeavyTest extends BaseTest {
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { 2, 5000 }, { 2, 50000 }, { 5, 50000 }, { 10, 50000 }, { 20, 50000 }, });
}
private ExecutorService executor;
private int threads;
private int loops;
public RedissonLockHeavyTest(int threads, int loops) {
this.threads = threads;
executor = Executors.newFixedThreadPool(threads);
this.loops = loops;
}
@Test
public void lockUnlockRLock() throws Exception {
for (int i = 0; i < threads; i++) {
Runnable worker = new Runnable() {
@Override
public void run() {
for (int j = 0; j < loops; j++) {
RLock lock = redisson.getLock("RLOCK_" + j);
lock.lock();
try {
RBucket<String> bucket = redisson.getBucket("RBUCKET_" + j);
bucket.set("TEST", 30, TimeUnit.SECONDS);
RSemaphore semaphore = redisson.getSemaphore("SEMAPHORE_" + j);
semaphore.release();
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.expire(30, TimeUnit.SECONDS);
} finally {
lock.unlock();
}
}
}
};
executor.execute(worker);
}
executor.shutdown();
executor.awaitTermination(threads * loops, TimeUnit.SECONDS);
}
}

@ -2,6 +2,7 @@ package org.redisson;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -19,6 +20,7 @@ import org.redisson.RedisRunner.RedisProcess;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisOutOfMemoryException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.codec.SerializationCodec;
import org.redisson.connection.ConnectionListener;
import org.redisson.core.ClusterNode;
@ -33,6 +35,32 @@ public class RedissonTest {
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@Test
public void testIterator() {
RedissonBaseIterator iter = new RedissonBaseIterator() {
int i;
@Override
ListScanResult iterator(InetSocketAddress client, long nextIterPos) {
i++;
if (i == 1) {
return new ListScanResult(13L, Collections.emptyList());
}
if (i == 2) {
return new ListScanResult(0L, Collections.emptyList());
}
Assert.fail();
return null;
}
@Override
void remove(Object value) {
}
};
Assert.assertFalse(iter.hasNext());
}
@BeforeClass
public static void beforeClass() throws IOException, InterruptedException {
if (!RedissonRuntimeEnvironment.isTravis) {

Loading…
Cancel
Save