Merge branch 'master' into 3.0.0

pull/802/merge
Nikita 9 years ago
commit 101a14a470

@ -2,6 +2,41 @@ Redisson Releases History
================================
####Please Note: trunk is current development branch.
####17-Oct-2016 - version 3.0.0 released
Fully compatible with JDK 8. Includes all code changes from __2.5.0__ version
Feature - `RFeature` extends `CompletionStage`
####17-Oct-2016 - version 2.5.0 released
This version brings greatly improved version of `RLiveObjectService` and adds cascade handling, cyclic dependency resolving, simplified object creation. Read more in this [article](https://dzone.com/articles/java-distributed-in-memory-data-model-powered-by-r)
Includes all code changes from __2.2.26__ version
Feautre - COUNT and ASC/DESC support for `RGeo` radius methods
Feature - `RGeo` extends `RScoredSortedSet`
Feature - `RCascade` annotation support LiveObjectService
Improvement - `RId` generator should be empty by default
Improvement - support setter/getter with protected visibility scope for LiveObject
Fixed - `RMapCache` doesn't keep entries insertion order during iteration
Fixed - `@RId` is returned/overwritten by similarly named methods (thanks to Rui Gu)
Fixed - typo `getRemoteSerivce` -> `getRemoteService` (thanks to Slava Rosin)
Fixed - `RPermitExpirableSemaphore.availablePermits` doesn't return actual permits account under certain conditions
Fixed - `readAllValues` and `readAllEntrySet` methods of `RLocalCacheMap` return wrong values
Fixed - setter for collection field of LiveObject entity should rewrite collection content
Fixed - `RSetCache` TTL not updated if element already present
Fixed - `RLiveObjectService` swallow exceptions during `merge` or `persist` operation
Fixed - `RLiveObjectService` doesn't support protected constructors
Fixed - object with cyclic dependencies lead to stackoverflow during `RLiveObjectService.detach` process
Fixed - not persisted `REntity` object allowed to store automatically
Fixed - `RLexSortedSet.addAll` doesn't work
Fixed - `RLiveObjectService` can't detach content of List object
Fixed - `RLiveObjectService` doesn't create objects mapped to Redisson objects in runtime during getter accesss
Fixed - `RLiveObjectService` can't recognize id field of object without setter
####17-Oct-2016 - version 2.2.26 released
Fixed - NPE in CommandDecoder
Fixed - PubSub connection re-subscription doesn't work in case when there is only one slave available
####27-Sep-2016 - version 2.4.0 released
Includes all code changes from __2.2.25__ version

@ -4,16 +4,21 @@ Redis based In-Memory Data Grid for Java. Redisson.
[![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/)
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
Redis 2.8+ and JDK 1.6+ compatible.
Redis 2.8+ compatible.
| Stable Release Version | JDK Version compatibility | Release Date |
| ------------- | ------------- | ------------|
| 3.0.0 | 1.8+ | 17.10.2016 |
| 2.5.0 | 1.6+ | 17.10.2016 |
Please read [documentation](https://github.com/mrniko/redisson/wiki) for more details.
Redisson [releases history](https://github.com/mrniko/redisson/blob/master/CHANGELOG.md).
Licensed under the Apache License 2.0.
Welcome to support chat - [![Join the chat at https://gitter.im/mrniko/redisson](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mrniko/redisson?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Welcome to support chat [![Join the chat at https://gitter.im/mrniko/redisson](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mrniko/redisson?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
####Try [Redisson PRO](http://redisson.pro) version
Features
================================
@ -61,6 +66,7 @@ Articles
================================
[Java data structures powered by Redis. Introduction to Redisson (pdf)](http://redisson.org/Redisson.pdf)
[A Look at the Java Distributed In-Memory Data Model (Powered by Redis)](https://dzone.com/articles/java-distributed-in-memory-data-model-powered-by-r)
[Distributed tasks Execution and Scheduling in Java, powered by Redis](https://dzone.com/articles/distributed-tasks-execution-and-scheduling-in-java)
[Introducing Redisson Live Objects (Object Hash Mapping)](https://dzone.com/articles/introducing-redisson-live-object-object-hash-mappi)
[Java Remote Method Invocation with Redisson](https://dzone.com/articles/java-remote-method-invocation-with-redisson)
@ -71,17 +77,28 @@ Quick start
===============================
#### Maven
<!-- JDK 1.8+ compatible -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.0.0</version>
</dependency>
<!-- JDK 1.6+ compatible -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.4.0</version>
<version>2.5.0</version>
</dependency>
#### Gradle
// JDK 1.8+ compatible
compile 'org.redisson:redisson:3.0.0'
// JDK 1.6+ compatible
compile 'org.redisson:redisson:2.5.0'
compile 'org.redisson:redisson:2.4.0'
#### Java
```java
@ -105,8 +122,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService");
Downloads
===============================
[Redisson 2.4.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.4.0&e=jar)
[Redisson node 2.4.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.4.0&e=jar)
[Redisson 3.0.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.0.0&e=jar),
[Redisson node 3.0.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.0.0&e=jar)
[Redisson 2.5.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.5.0&e=jar),
[Redisson node 2.5.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.5.0&e=jar)
### Supported by

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.AbstractCollection;
@ -86,6 +85,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Integer> valueSizeAsync(K key) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
return commandExecutor.readAsync(getName(), codec, RedisCommands.HSTRLEN, getName(key), key);
}
@ -101,6 +104,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Boolean> containsKeyAsync(Object key) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
return commandExecutor.readAsync(getName(key), codec, RedisCommands.HEXISTS, getName(key), key);
}
@ -111,6 +118,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Boolean> containsValueAsync(Object value) {
if (value == null) {
throw new NullPointerException("map value can't be null");
}
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local s = redis.call('hvals', KEYS[1]);" +
"for i = 1, #s, 1 do "
@ -232,6 +243,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<V> putIfAbsentAsync(K key, V value) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
if (value == null) {
throw new NullPointerException("map value can't be null");
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT,
"if redis.call('hsetnx', KEYS[1], ARGV[1], ARGV[2]) == 1 then "
+ "return nil "
@ -248,6 +266,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
if (value == null) {
throw new NullPointerException("map value can't be null");
}
return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSETNX, getName(key), key, value);
}
@ -258,6 +283,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
if (value == null) {
throw new NullPointerException("map value can't be null");
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "return redis.call('hdel', KEYS[1], ARGV[1]) "
@ -274,6 +306,17 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Boolean> replaceAsync(K key, V oldValue, V newValue) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
if (oldValue == null) {
throw new NullPointerException("map oldValue can't be null");
}
if (newValue == null) {
throw new NullPointerException("map newValue can't be null");
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE_VALUE,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
@ -291,6 +334,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<V> replaceAsync(K key, V value) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
if (value == null) {
throw new NullPointerException("map value can't be null");
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REPLACE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
@ -304,6 +354,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<V> getAsync(K key) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
return commandExecutor.readAsync(getName(key), codec, RedisCommands.HGET, getName(key), key);
}
@ -313,6 +367,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<V> putAsync(K key, V value) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
if (value == null) {
throw new NullPointerException("map value can't be null");
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_PUT,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
@ -323,6 +384,10 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<V> removeAsync(K key) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
return commandExecutor.evalWriteAsync(getName(key), codec, EVAL_REMOVE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hdel', KEYS[1], ARGV[1]); "
@ -332,6 +397,13 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<Boolean> fastPutAsync(K key, V value) {
if (key == null) {
throw new NullPointerException("map key can't be null");
}
if (value == null) {
throw new NullPointerException("map value can't be null");
}
return commandExecutor.writeAsync(getName(key), codec, RedisCommands.HSET, getName(key), key, value);
}
@ -370,14 +442,17 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
@Override
public RFuture<V> addAndGetAsync(K key, Number value) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
return commandExecutor.writeAsync(getName(key), StringCodec.INSTANCE,
new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
getName(key), keyState, new BigDecimal(value.toString()).toPlainString());
} catch (IOException e) {
throw new IllegalArgumentException(e);
if (key == null) {
throw new NullPointerException("map key can't be null");
}
if (value == null) {
throw new NullPointerException("map value can't be null");
}
byte[] keyState = encodeMapKey(key);
return commandExecutor.writeAsync(getName(key), StringCodec.INSTANCE,
new RedisCommand<Object>("HINCRBYFLOAT", new NumberConvertor(value.getClass())),
getName(key), keyState, new BigDecimal(value.toString()).toPlainString());
}
@Override

@ -70,6 +70,7 @@ import io.netty.util.concurrent.FutureListener;
*/
public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> {
static final RedisCommand<Boolean> EVAL_PUT_IF_ABSENT = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP);
static final RedisCommand<Boolean> EVAL_HSET = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP);
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
@ -617,10 +618,40 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public RFuture<Boolean> fastPutIfAbsentAsync(K key, V value) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HSET,
"local val = struct.pack('dLc0', 0, string.len(ARGV[2]), ARGV[2]); "
+ "return redis.call('hsetnx', KEYS[1], ARGV[1], val); ",
Collections.<Object>singletonList(getName()), key, value);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_IF_ABSENT,
"local value = redis.call('hget', KEYS[1], ARGV[2]); "
+ "if value == false then "
+ "local val = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[2], val); "
+ "return 1; "
+ "end; "
+ "local t, val = struct.unpack('dLc0', value); "
+ "local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], ARGV[2]); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], ARGV[2], value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), ARGV[2]); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then "
+ "return 0; "
+ "end; "
+ "redis.call('zrem', KEYS[2], ARGV[2]); "
+ "redis.call('zrem', KEYS[3], ARGV[2]); "
+ "local val = struct.pack('dLc0', 0, string.len(ARGV[3]), ARGV[3]); "
+ "redis.call('hset', KEYS[1], ARGV[2], val); "
+ "return 1; ",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), key, value);
}
@Override

@ -86,7 +86,7 @@ public class LocalCachedMapOptions {
}
/**
* Sets cache size. If size is <code>0</code> then cache is unbounded.
* Sets cache size. If size is <code>0</code> then local cache is unbounded.
*
* @param cacheSize - size of cache
* @return LocalCachedMapOptions instance

@ -23,7 +23,7 @@ package org.redisson.api;
public interface RDestroyable {
/**
* Allows to destroy object then it's not necessary anymore.
* Destroys object when it's not necessary anymore.
*/
void destroy();

@ -97,7 +97,11 @@ public class RedisClient {
public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port,
int connectTimeout, int commandTimeout) {
if (timer == null) {
throw new NullPointerException("timer param can't be null");
}
this.executor = executor;
this.timer = timer;
addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
bootstrap.handler(new ChannelInitializer<Channel>() {

@ -74,6 +74,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
connectListener = new ClusterConnectionListener(cfg.getReadMode() != ReadMode.MASTER);
this.config = create(cfg);
initTimer(this.config);
init(this.config);
Throwable lastException = null;

@ -27,12 +27,12 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class ClientConnectionsEntry {
@ -40,10 +40,10 @@ public class ClientConnectionsEntry {
private final Queue<RedisPubSubConnection> allSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger freeSubscribeConnectionsCounter = new AtomicInteger();
private final AsyncSemaphore freeSubscribeConnectionsCounter;
private final Queue<RedisConnection> freeConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger freeConnectionsCounter = new AtomicInteger();
private final AsyncSemaphore freeConnectionsCounter;
public enum FreezeReason {MANAGER, RECONNECT, SYSTEM}
@ -59,10 +59,10 @@ public class ClientConnectionsEntry {
public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize,
ConnectionManager connectionManager, NodeType serverMode) {
this.client = client;
this.freeConnectionsCounter.set(poolMaxSize);
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize);
this.connectionManager = connectionManager;
this.nodeType = serverMode;
this.freeSubscribeConnectionsCounter.set(subscribePoolMaxSize);
this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize);
if (subscribePoolMaxSize > 0) {
connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter);
@ -107,27 +107,15 @@ public class ClientConnectionsEntry {
}
public int getFreeAmount() {
return freeConnectionsCounter.get();
return freeConnectionsCounter.getCounter();
}
private boolean tryAcquire(AtomicInteger counter) {
while (true) {
int value = counter.get();
if (value == 0) {
return false;
}
if (counter.compareAndSet(value, value - 1)) {
return true;
}
}
}
public boolean tryAcquireConnection() {
return tryAcquire(freeConnectionsCounter);
public void acquireConnection(Runnable runnable) {
freeConnectionsCounter.acquire(runnable);
}
public void releaseConnection() {
freeConnectionsCounter.incrementAndGet();
freeConnectionsCounter.release();
}
public RedisConnection pollConnection() {
@ -228,12 +216,12 @@ public class ClientConnectionsEntry {
freeSubscribeConnections.add(connection);
}
public boolean tryAcquireSubscribeConnection() {
return tryAcquire(freeSubscribeConnectionsCounter);
public void acquireSubscribeConnection(Runnable runnable) {
freeSubscribeConnectionsCounter.acquire(runnable);
}
public void releaseSubscribeConnection() {
freeSubscribeConnectionsCounter.incrementAndGet();
freeSubscribeConnectionsCounter.release();
}
public boolean freezeMaster(FreezeReason reason) {

@ -65,6 +65,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
super(config);
this.config = create(cfg);
initTimer(this.config);
for (URI addr : cfg.getNodeAddresses()) {
RedisConnection connection = connect(cfg, addr);

@ -19,10 +19,10 @@ import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.client.RedisConnection;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,10 +38,10 @@ public class IdleConnectionWatcher {
private final int minimumAmount;
private final int maximumAmount;
private final AtomicInteger freeConnectionsCounter;
private final AsyncSemaphore freeConnectionsCounter;
private final Collection<? extends RedisConnection> connections;
public Entry(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AtomicInteger freeConnectionsCounter) {
public Entry(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AsyncSemaphore freeConnectionsCounter) {
super();
this.minimumAmount = minimumAmount;
this.maximumAmount = maximumAmount;
@ -84,10 +84,10 @@ public class IdleConnectionWatcher {
}
private boolean validateAmount(Entry entry) {
return entry.maximumAmount - entry.freeConnectionsCounter.get() + entry.connections.size() > entry.minimumAmount;
return entry.maximumAmount - entry.freeConnectionsCounter.getCounter() + entry.connections.size() > entry.minimumAmount;
}
public void add(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AtomicInteger freeConnectionsCounter) {
public void add(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AsyncSemaphore freeConnectionsCounter) {
entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter));
}

@ -155,6 +155,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
this(config);
initTimer(cfg);
init(cfg);
}
@ -224,6 +225,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void init(MasterSlaveServersConfig config) {
this.config = config;
connectionWatcher = new IdleConnectionWatcher(this, config);
try {
initEntry(config);
} catch (RuntimeException e) {
stopThreads();
throw e;
}
}
protected void initTimer(MasterSlaveServersConfig config) {
int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
Arrays.sort(timeouts);
int minTimeout = timeouts[0];
@ -235,15 +247,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
minTimeout = 100;
}
timer = new HashedWheelTimer(minTimeout, TimeUnit.MILLISECONDS);
connectionWatcher = new IdleConnectionWatcher(this, config);
try {
initEntry(config);
} catch (RuntimeException e) {
stopThreads();
throw e;
}
}
public ConnectionInitializer getConnectListener() {

@ -63,6 +63,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
super(config);
final MasterSlaveServersConfig c = create(cfg);
initTimer(c);
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());

@ -104,39 +104,52 @@ abstract class ConnectionPool<T extends RedisConnection> {
initPromise.tryFailure(cause);
return;
}
RFuture<T> promise = createConnection(entry);
promise.addListener(new FutureListener<T>() {
acquireConnection(entry, new Runnable() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (future.isSuccess()) {
T conn = future.getNow();
releaseConnection(entry, conn);
}
public void run() {
RPromise<T> promise = connectionManager.newPromise();
createConnection(entry, promise);
promise.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (future.isSuccess()) {
T conn = future.getNow();
releaseConnection(entry);
releaseConnection(entry, conn);
}
if (!future.isSuccess()) {
Throwable cause = new RedisConnectionException(
"Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: "
+ entry.getClient().getAddr(), future.cause());
initPromise.tryFailure(cause);
return;
}
releaseConnection(entry);
int value = initializedConnections.decrementAndGet();
if (value == 0) {
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
if (!initPromise.trySuccess(null)) {
throw new IllegalStateException();
}
} else if (value > 0 && !initPromise.isDone()) {
if (requests.incrementAndGet() <= minimumIdleSize) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
if (!future.isSuccess()) {
Throwable cause = new RedisConnectionException(
"Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: "
+ entry.getClient().getAddr(), future.cause());
initPromise.tryFailure(cause);
return;
}
int value = initializedConnections.decrementAndGet();
if (value == 0) {
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
if (!initPromise.trySuccess(null)) {
throw new IllegalStateException();
}
} else if (value > 0 && !initPromise.isDone()) {
if (requests.incrementAndGet() <= minimumIdleSize) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
}
}
}
});
}
});
}
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) {
entry.acquireConnection(runnable);
}
protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
@ -147,28 +160,36 @@ abstract class ConnectionPool<T extends RedisConnection> {
public RFuture<T> get() {
for (int j = entries.size() - 1; j >= 0; j--) {
ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
return connectTo(entry);
final ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed()
&& tryAcquireConnection(entry)) {
final RPromise<T> result = connectionManager.newPromise();
acquireConnection(entry, new Runnable() {
@Override
public void run() {
connectTo(entry, result);
}
});
return result;
}
}
List<InetSocketAddress> zeroConnectionsAmount = new LinkedList<InetSocketAddress>();
List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();
List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();
for (ClientConnectionsEntry entry : entries) {
if (entry.isFreezed()) {
freezed.add(entry.getClient().getAddr());
} else {
zeroConnectionsAmount.add(entry.getClient().getAddr());
failedAttempts.add(entry.getClient().getAddr());
}
}
StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " exhausted! ");
StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
if (!freezed.isEmpty()) {
errorMsg.append(" Disconnected hosts: " + freezed);
}
if (!zeroConnectionsAmount.isEmpty()) {
errorMsg.append(" Hosts with fully busy connections: " + zeroConnectionsAmount);
if (!failedAttempts.isEmpty()) {
errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);
}
RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
@ -178,7 +199,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
public RFuture<T> get(ClientConnectionsEntry entry) {
if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed())
&& tryAcquireConnection(entry)) {
return connectTo(entry);
RPromise<T> result = connectionManager.newPromise();
connectTo(entry, result);
return result;
}
RedisConnectionException exception = new RedisConnectionException(
@ -187,7 +210,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
return entry.getFailedAttempts() < config.getFailedAttempts() && entry.tryAcquireConnection();
return entry.getFailedAttempts() < config.getFailedAttempts();
}
protected T poll(ClientConnectionsEntry entry) {
@ -198,21 +221,26 @@ abstract class ConnectionPool<T extends RedisConnection> {
return (RFuture<T>) entry.connect();
}
private RFuture<T> connectTo(ClientConnectionsEntry entry) {
private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise) {
if (promise.isDone()) {
releaseConnection(entry);
return;
}
T conn = poll(entry);
if (conn != null) {
if (!conn.isActive()) {
return promiseFailure(entry, conn);
promiseFailure(entry, promise, conn);
return;
}
return promiseSuccessful(entry, conn);
connectedSuccessful(entry, promise, conn);
return;
}
return createConnection(entry);
createConnection(entry, promise);
}
private RFuture<T> createConnection(final ClientConnectionsEntry entry) {
final RPromise<T> promise = connectionManager.newPromise();
private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
RFuture<T> connFuture = connect(entry);
connFuture.addListener(new FutureListener<T>() {
@Override
@ -231,7 +259,6 @@ abstract class ConnectionPool<T extends RedisConnection> {
connectedSuccessful(entry, promise, conn);
}
});
return promise;
}
private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
@ -242,11 +269,6 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
}
private RFuture<T> promiseSuccessful(ClientConnectionsEntry entry, T conn) {
entry.resetFailedAttempts();
return (RFuture<T>) conn.getAcquireFuture();
}
private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getFailedAttempts()) {
checkForReconnect(entry);
@ -274,23 +296,6 @@ abstract class ConnectionPool<T extends RedisConnection> {
promise.tryFailure(cause);
}
private RFuture<T> promiseFailure(ClientConnectionsEntry entry, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
conn.closeAsync();
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
} else {
conn.closeAsync();
}
releaseConnection(entry);
RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
return connectionManager.newFailedFuture(cause);
}
private void checkForReconnect(ClientConnectionsEntry entry) {
if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(),

@ -50,10 +50,10 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
}
@Override
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
return entry.tryAcquireSubscribeConnection();
protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) {
entry.acquireSubscribeConnection(runnable);
}
@Override
protected void releaseConnection(ClientConnectionsEntry entry) {
entry.releaseSubscribeConnection();

@ -74,6 +74,10 @@ public class AsyncSemaphore {
}
}
public int getCounter() {
return counter;
}
public void release() {
Runnable runnable = null;

@ -655,6 +655,15 @@ public class RedissonMapCacheTest extends BaseTest {
SimpleValue value1 = new SimpleValue("4");
assertThat(map.fastPutIfAbsent(key1, value1)).isTrue();
assertThat(map.get(key1)).isEqualTo(value1);
SimpleKey key2 = new SimpleKey("3");
map.put(key2, new SimpleValue("31"), 500, TimeUnit.MILLISECONDS);
assertThat(map.fastPutIfAbsent(key2, new SimpleValue("32"))).isFalse();
Thread.sleep(500);
assertThat(map.fastPutIfAbsent(key2, new SimpleValue("32"))).isTrue();
assertThat(map.get(key2)).isEqualTo(new SimpleValue("32"));
}
@Test

@ -292,21 +292,16 @@ public class RedissonMapTest extends BaseTest {
assertThat(counter).isEqualTo(size);
}
@Test
public void testNull() {
@Test(expected = NullPointerException.class)
public void testNullValue() {
Map<Integer, String> map = redisson.getMap("simple12");
map.put(1, null);
map.put(2, null);
map.put(3, "43");
assertThat(map.size()).isEqualTo(3);
String val = map.get(2);
assertThat(val).isNull();
String val2 = map.get(1);
assertThat(val2).isNull();
String val3 = map.get(3);
assertThat(val3).isEqualTo("43");
}
@Test(expected = NullPointerException.class)
public void testNullKey() {
Map<Integer, String> map = redisson.getMap("simple12");
map.put(null, "1");
}
@Test

Loading…
Cancel
Save