Merge branch 'master' into 3.0.0

# Conflicts:
#	pom.xml
#	redisson-all/pom.xml
#	redisson-tomcat/pom.xml
#	redisson-tomcat/redisson-tomcat-6/pom.xml
#	redisson-tomcat/redisson-tomcat-7/pom.xml
#	redisson-tomcat/redisson-tomcat-8/pom.xml
#	redisson/pom.xml
pull/802/merge
Nikita 8 years ago
commit b45343b0d7

@ -224,4 +224,4 @@ before_script:
- export REDIS_VERSION="$(redis-cli INFO SERVER | sed -n 2p)" - export REDIS_VERSION="$(redis-cli INFO SERVER | sed -n 2p)"
- echo $REDIS_VERSION - echo $REDIS_VERSION
- redis-cli SHUTDOWN NOSAVE - redis-cli SHUTDOWN NOSAVE
script: mvn -Dtest=$REDISSON_TEST -Dsurefire.rerunFailingTestsCount=5 -DargLine="-Xmx2g -DredisBinary=$REDIS_BIN/redis-server -DtravisEnv=true" -Punit-test clean test -e -X script: mvn -Dtest=$REDISSON_TEST -Dsurefire.rerunFailingTestsCount=5 -DargLine="-Xmx2g -DredisBinary=$REDIS_BIN/redis-server -DtravisEnv=true -XX:SoftRefLRUPolicyMSPerMB=0" -Punit-test clean test -e -X

@ -4,6 +4,21 @@ Redisson Releases History
Try __ULTRA-FAST__ [Redisson PRO](https://redisson.pro) edition. Try __ULTRA-FAST__ [Redisson PRO](https://redisson.pro) edition.
####19-Feb-2017 - versions 2.8.0 and 3.3.0 released
Feature - __`RClusteredLocalCachedMap` object added__ More details [here](https://github.com/redisson/redisson/wiki/7.-distributed-collections#713-map-data-partitioning)
Feature - __`RClusteredMapCache` object added__ More details [here](https://github.com/redisson/redisson/wiki/7.-distributed-collections#713-map-data-partitioning)
Feature - __`RClusteredSetCache` object added__ More details [here](https://github.com/redisson/redisson/wiki/7.-distributed-collections/#732-set-data-partitioning)
Feature - __`RPriorityQueue` object added__ More details [here](https://github.com/redisson/redisson/wiki/7.-distributed-collections/#715-priority-queue)
Feature - __`RPriorityDeque` object added__ More details [here](https://github.com/redisson/redisson/wiki/7.-distributed-collections/#716-priority-deque)
Feature - `removeAllListeners` and `removeListener` by instance methods added for `RTopic` and `RPatternTopic`
Feature - `RLockAsync` interface added
Improvement - `RRemoteService` is now able to support method overload
Fixed - `RLocalCachedMap` is not Redis cluster compatible
Fixed - cascade slaves are not supported in cluster mode
Fixed - shutdown checking during master change state check added
Fixed - master isn't checked during new slave discovery in Sentinel mode
####02-Feb-2017 - versions 2.7.4 and 3.2.4 released ####02-Feb-2017 - versions 2.7.4 and 3.2.4 released
Feature - Allow to specify Redisson instance/config during JCache cache creation Feature - Allow to specify Redisson instance/config during JCache cache creation

@ -1,24 +1,16 @@
Redis based In-Memory Data Grid for Java. Redisson. Redis based In-Memory Data Grid for Java. Redisson.
==== ====
[Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.3.0) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [Support chat](https://gitter.im/mrniko/redisson)
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework. Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
Redis 2.8+ compatible. Redis 2.8+ compatible.
| Stable Release Version | JDK Version compatibility | Release Date | | Stable Release Version | JDK Version compatibility | Release Date |
| ------------- | ------------- | ------------| | ------------- | ------------- | ------------|
| 3.2.3 | 1.8+ | 19.01.2017 | | 3.3.0 | 1.8+ | 19.02.2017 |
| 2.7.3 | 1.6, 1.7, 1.8 and Android | 19.01.2017 | | 2.8.0 | 1.6, 1.7, 1.8 and Android | 19.02.2017 |
__NOTE__: Both version lines have same features except `CompletionStage` interface supported by 3.x.x line __NOTE__: Both version lines have same features except `CompletionStage` interface added in 3.x.x
Please read [documentation](https://github.com/redisson/redisson/wiki) for more details.
Redisson [releases history](https://github.com/redisson/redisson/blob/master/CHANGELOG.md)
Checkout more [code examples](https://github.com/redisson/redisson-examples)
Browse [javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.2.2)
Licensed under the Apache License 2.0.
Welcome to support chat [![Join the chat at https://gitter.im/mrniko/redisson](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/mrniko/redisson?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
Features Features
@ -42,7 +34,7 @@ Features
Object holder, Binary stream holder, Geospatial holder, BitSet, AtomicLong, AtomicDouble, PublishSubscribe, Object holder, Binary stream holder, Geospatial holder, BitSet, AtomicLong, AtomicDouble, PublishSubscribe,
Bloom filter, HyperLogLog Bloom filter, HyperLogLog
* [Distributed collections](https://github.com/redisson/redisson/wiki/7.-Distributed-collections) * [Distributed collections](https://github.com/redisson/redisson/wiki/7.-Distributed-collections)
Map, Multimap, Set, List, SortedSet, ScoredSortedSet, LexSortedSet, Queue, Deque, Blocking Queue, Bounded Blocking Queue, Blocking Deque, Delayed Queue Map, Multimap, Set, List, SortedSet, ScoredSortedSet, LexSortedSet, Queue, Deque, Blocking Queue, Bounded Blocking Queue, Blocking Deque, Delayed Queue, Priority Queue, Priority Deque
* [Distributed locks and synchronizers](https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers) * [Distributed locks and synchronizers](https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers)
Lock, FairLock, MultiLock, RedLock, ReadWriteLock, Semaphore, PermitExpirableSemaphore, CountDownLatch Lock, FairLock, MultiLock, RedLock, ReadWriteLock, Semaphore, PermitExpirableSemaphore, CountDownLatch
* [Distributed services](https://github.com/redisson/redisson/wiki/9.-distributed-services) * [Distributed services](https://github.com/redisson/redisson/wiki/9.-distributed-services)
@ -63,7 +55,7 @@ Features
Who uses Redisson Who uses Redisson
================================ ================================
[Electronic Arts](http://ea.com), [Baidu](http://baidu.com), [New Relic Synthetics](https://newrelic.com/synthetics), [National Australia Bank](https://www.nab.com.au/), [Brookhaven National Laboratory](http://bnl.gov/), [Singtel](http://singtel.com), [Infor](http://www.infor.com/), [Setronica](http://setronica.com/), [Monits](http://monits.com/), [Netflix Dyno client] (https://github.com/Netflix/dyno), [武林Q传](http://www.nbrpg.com/), [Ocous](http://www.ocous.com/), [Invaluable](http://www.invaluable.com/), [Clover](https://www.clover.com/) , [Apache Karaf Decanter](https://karaf.apache.org/projects.html#decanter), [Atmosphere Framework](http://async-io.org/), [BrandsEye](http://brandseye.com), [Datorama](http://datorama.com/), [BrightCloud](http://brightcloud.com/), [Azar](http://azarlive.com/), [Snapfish](http://snapfish.com), [Crimson Hexagon](http://www.crimsonhexagon.com) [Electronic Arts](http://ea.com), [Baidu](http://baidu.com), [New Relic Synthetics](https://newrelic.com/synthetics), [Brookhaven National Laboratory](http://bnl.gov/), [Singtel](http://singtel.com), [Infor](http://www.infor.com/), [Setronica](http://setronica.com/), [Monits](http://monits.com/), [Netflix Dyno client] (https://github.com/Netflix/dyno), [武林Q传](http://www.nbrpg.com/), [Ocous](http://www.ocous.com/), [Invaluable](http://www.invaluable.com/), [Clover](https://www.clover.com/) , [Apache Karaf Decanter](https://karaf.apache.org/projects.html#decanter), [Atmosphere Framework](http://async-io.org/), [BrandsEye](http://brandseye.com), [Datorama](http://datorama.com/), [BrightCloud](http://brightcloud.com/), [Azar](http://azarlive.com/), [Snapfish](http://snapfish.com), [Crimson Hexagon](http://www.crimsonhexagon.com), [Quby](http://quby.com/), [Base CRM](http://getbase.com)
Articles Articles
================================ ================================
@ -90,23 +82,23 @@ Quick start
<dependency> <dependency>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson</artifactId> <artifactId>redisson</artifactId>
<version>3.2.4</version> <version>3.3.0</version>
</dependency> </dependency>
<!-- JDK 1.6+ compatible --> <!-- JDK 1.6+ compatible -->
<dependency> <dependency>
<groupId>org.redisson</groupId> <groupId>org.redisson</groupId>
<artifactId>redisson</artifactId> <artifactId>redisson</artifactId>
<version>2.7.4</version> <version>2.8.0</version>
</dependency> </dependency>
#### Gradle #### Gradle
// JDK 1.8+ compatible // JDK 1.8+ compatible
compile 'org.redisson:redisson:3.2.4' compile 'org.redisson:redisson:3.3.0'
// JDK 1.6+ compatible // JDK 1.6+ compatible
compile 'org.redisson:redisson:2.7.4' compile 'org.redisson:redisson:2.8.0'
#### Java #### Java
@ -131,11 +123,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService");
Downloads Downloads
=============================== ===============================
[Redisson 3.2.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.2.4&e=jar), [Redisson 3.3.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.3.0&e=jar),
[Redisson node 3.2.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.2.4&e=jar) [Redisson node 3.3.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.3.0&e=jar)
[Redisson 2.7.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.7.4&e=jar), [Redisson 2.8.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.8.0&e=jar),
[Redisson node 2.7.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.7.4&e=jar) [Redisson node 2.8.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.8.0&e=jar)
### Supported by ### Supported by

@ -1,13 +1,14 @@
Redis based Tomcat Session Manager Redis based Tomcat Session Manager
=== ===
Implements non-sticky session management backed by Redis. Stores session of Apache Tomcat in Redis and allows to distribute requests across a cluster of Tomcat servers. Implements non-sticky session management backed by Redis.
Supports Tomcat 6.x, 7.x, 8.x
Supports Apache Tomcat 6.x, 7.x, 8.x
Advantages Advantages
=== ===
Current implementation differs from any other Tomcat Session Manager in terms of efficient storage and optimized writes. Each session attribute is written into Redis during each `HttpSession.setAttribute` invocation. While other solutions serialize whole session each time. Current implementation differs from any other Redis based Tomcat Session Manager in terms of efficient storage and optimized writes. Each session attribute is written into Redis during each `HttpSession.setAttribute` invocation. While other solutions serialize whole session each time.
Usage Usage
=== ===
@ -21,22 +22,22 @@ Usage
2. Copy two jars into `TOMCAT_BASE/lib` directory: 2. Copy two jars into `TOMCAT_BASE/lib` directory:
1. __For JDK 1.8+__ 1. __For JDK 1.8+__
[redisson-all-3.2.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.2.4&e=jar) [redisson-all-3.3.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.3.0&e=jar)
for Tomcat 6.x for Tomcat 6.x
[redisson-tomcat-6-3.2.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.2.4&e=jar) [redisson-tomcat-6-3.3.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.3.0&e=jar)
for Tomcat 7.x for Tomcat 7.x
[redisson-tomcat-7-3.2.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.2.4&e=jar) [redisson-tomcat-7-3.3.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.3.0&e=jar)
for Tomcat 8.x for Tomcat 8.x
[redisson-tomcat-8-3.2.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.2.4&e=jar) [redisson-tomcat-8-3.3.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.3.0&e=jar)
1. __For JDK 1.6+__ 1. __For JDK 1.6+__
[redisson-all-2.7.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.7.4&e=jar) [redisson-all-2.8.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.8.0&e=jar)
for Tomcat 6.x for Tomcat 6.x
[redisson-tomcat-6-2.7.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.7.4&e=jar) [redisson-tomcat-6-2.8.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.8.0&e=jar)
for Tomcat 7.x for Tomcat 7.x
[redisson-tomcat-7-2.7.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.7.4&e=jar) [redisson-tomcat-7-2.8.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.8.0&e=jar)
for Tomcat 8.x for Tomcat 8.x
[redisson-tomcat-8-2.7.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.7.4&e=jar) [redisson-tomcat-8-2.8.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.8.0&e=jar)

@ -143,11 +143,11 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
public void start() throws LifecycleException { public void start() throws LifecycleException {
Config config = null; Config config = null;
try { try {
config = Config.fromJSON(new File(configPath)); config = Config.fromJSON(new File(configPath), getClass().getClassLoader());
} catch (IOException e) { } catch (IOException e) {
// trying next format // trying next format
try { try {
config = Config.fromYAML(new File(configPath)); config = Config.fromYAML(new File(configPath), getClass().getClassLoader());
} catch (IOException e1) { } catch (IOException e1) {
log.error("Can't parse json config " + configPath, e); log.error("Can't parse json config " + configPath, e);
throw new LifecycleException("Can't parse yaml config " + configPath, e1); throw new LifecycleException("Can't parse yaml config " + configPath, e1);

@ -146,11 +146,11 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
super.startInternal(); super.startInternal();
Config config = null; Config config = null;
try { try {
config = Config.fromJSON(new File(configPath)); config = Config.fromJSON(new File(configPath), getClass().getClassLoader());
} catch (IOException e) { } catch (IOException e) {
// trying next format // trying next format
try { try {
config = Config.fromYAML(new File(configPath)); config = Config.fromYAML(new File(configPath), getClass().getClassLoader());
} catch (IOException e1) { } catch (IOException e1) {
log.error("Can't parse json config " + configPath, e); log.error("Can't parse json config " + configPath, e);
throw new LifecycleException("Can't parse yaml config " + configPath, e1); throw new LifecycleException("Can't parse yaml config " + configPath, e1);

@ -146,11 +146,11 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
super.startInternal(); super.startInternal();
Config config = null; Config config = null;
try { try {
config = Config.fromJSON(new File(configPath)); config = Config.fromJSON(new File(configPath), getClass().getClassLoader());
} catch (IOException e) { } catch (IOException e) {
// trying next format // trying next format
try { try {
config = Config.fromYAML(new File(configPath)); config = Config.fromYAML(new File(configPath), getClass().getClassLoader());
} catch (IOException e1) { } catch (IOException e1) {
log.error("Can't parse json config " + configPath, e); log.error("Can't parse json config " + configPath, e);
throw new LifecycleException("Can't parse yaml config " + configPath, e1); throw new LifecycleException("Can't parse yaml config " + configPath, e1);

@ -151,7 +151,7 @@
<dependency> <dependency>
<groupId>org.msgpack</groupId> <groupId>org.msgpack</groupId>
<artifactId>jackson-dataformat-msgpack</artifactId> <artifactId>jackson-dataformat-msgpack</artifactId>
<version>0.8.7</version> <version>0.8.11</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
@ -181,45 +181,45 @@
<dependency> <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId> <groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId> <artifactId>jackson-dataformat-yaml</artifactId>
<version>2.7.6</version> <version>2.8.7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId> <artifactId>jackson-core</artifactId>
<version>2.7.6</version> <version>2.8.7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
<version>2.7.6</version> <version>2.8.7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId> <groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId> <artifactId>jackson-dataformat-cbor</artifactId>
<version>2.7.6</version> <version>2.8.7</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId> <groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId> <artifactId>jackson-dataformat-smile</artifactId>
<version>2.7.6</version> <version>2.8.7</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId> <groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId> <artifactId>jackson-dataformat-avro</artifactId>
<version>2.7.6</version> <version>2.8.7</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>net.openhft</groupId> <groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId> <artifactId>zero-allocation-hashing</artifactId>
<version>0.5</version> <version>0.7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>net.bytebuddy</groupId> <groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId> <artifactId>byte-buddy</artifactId>
<version>1.4.26</version> <version>1.6.8</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.jodd</groupId> <groupId>org.jodd</groupId>

@ -629,12 +629,18 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.and(ElementMatchers.isGetter().or(ElementMatchers.isSetter()) .and(ElementMatchers.isGetter().or(ElementMatchers.isSetter())
.or(ElementMatchers.named("isPhantom")) .or(ElementMatchers.named("isPhantom"))
.or(ElementMatchers.named("delete")))) .or(ElementMatchers.named("delete"))))
.intercept(MethodDelegation.to( .intercept(MethodDelegation.withDefaultConfiguration()
new LiveObjectInterceptor(redisson, codecProvider, entityClass, .withBinders(FieldProxy.Binder
getRIdFieldName(entityClass)))
.appendParameterBinder(FieldProxy.Binder
.install(LiveObjectInterceptor.Getter.class, .install(LiveObjectInterceptor.Getter.class,
LiveObjectInterceptor.Setter.class))) LiveObjectInterceptor.Setter.class))
.to(new LiveObjectInterceptor(redisson, codecProvider, entityClass,
getRIdFieldName(entityClass))))
// .intercept(MethodDelegation.to(
// new LiveObjectInterceptor(redisson, codecProvider, entityClass,
// getRIdFieldName(entityClass)))
// .appendParameterBinder(FieldProxy.Binder
// .install(LiveObjectInterceptor.Getter.class,
// LiveObjectInterceptor.Setter.class)))
.implement(RLiveObject.class) .implement(RLiveObject.class)
.method(ElementMatchers.isAnnotatedWith(RFieldAccessor.class) .method(ElementMatchers.isAnnotatedWith(RFieldAccessor.class)
.and(ElementMatchers.named("get") .and(ElementMatchers.named("get")

@ -38,6 +38,11 @@ import org.redisson.api.RFuture;
import org.redisson.api.RLocalCachedMap; import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.cache.Cache;
import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.SoftCacheMap;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
@ -48,11 +53,7 @@ import org.redisson.client.protocol.convertor.NumberConvertor;
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder; import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.Cache;
import org.redisson.misc.Hash; import org.redisson.misc.Hash;
import org.redisson.misc.LFUCacheMap;
import org.redisson.misc.LRUCacheMap;
import org.redisson.misc.NoneCacheMap;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
@ -73,14 +74,14 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
public static class LocalCachedMapInvalidate implements Serializable { public static class LocalCachedMapInvalidate implements Serializable {
private byte[] excludedId; private byte[] excludedId;
private byte[] keyHash; private List<byte[]> keyHashes;
public LocalCachedMapInvalidate() { public LocalCachedMapInvalidate() {
} }
public LocalCachedMapInvalidate(byte[] excludedId, byte[] keyHash) { public LocalCachedMapInvalidate(byte[] excludedId, byte[]... keyHash) {
super(); super();
this.keyHash = keyHash; this.keyHashes = Arrays.asList(keyHash);
this.excludedId = excludedId; this.excludedId = excludedId;
} }
@ -88,8 +89,8 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return excludedId; return excludedId;
} }
public byte[] getKeyHash() { public Collection<byte[]> getKeyHashes() {
return keyHash; return keyHashes;
} }
} }
@ -215,6 +216,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (options.getEvictionPolicy() == EvictionPolicy.LFU) { if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
cache = new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis()); cache = new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
} }
if (options.getEvictionPolicy() == EvictionPolicy.SOFT) {
cache = new SoftCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
invalidationTopic = new RedissonTopic<Object>(commandExecutor, suffixName(name, "topic")); invalidationTopic = new RedissonTopic<Object>(commandExecutor, suffixName(name, "topic"));
if (options.isInvalidateEntryOnChange()) { if (options.isInvalidateEntryOnChange()) {
@ -227,8 +231,10 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
if (msg instanceof LocalCachedMapInvalidate) { if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg; LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) { if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
CacheKey key = new CacheKey(invalidateMsg.getKeyHash()); for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
cache.remove(key); CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
} }
} }
} }
@ -722,27 +728,30 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
} }
List<Object> params = new ArrayList<Object>(map.size()*3); List<Object> params = new ArrayList<Object>(map.size()*3);
List<Object> msgs = new ArrayList<Object>(map.size());
params.add(invalidateEntryOnChange); params.add(invalidateEntryOnChange);
params.add(map.size()*2); params.add(map.size()*2);
byte[][] hashes = new byte[map.size()][];
int i = 0;
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) { for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
byte[] mapKey = encodeMapKey(t.getKey()); byte[] mapKey = encodeMapKey(t.getKey());
byte[] mapValue = encodeMapValue(t.getValue()); byte[] mapValue = encodeMapValue(t.getValue());
params.add(mapKey); params.add(mapKey);
params.add(mapValue); params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey); CacheKey cacheKey = toCacheKey(mapKey);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash())); hashes[i] = cacheKey.getKeyHash();
msgs.add(msgEncoded); i++;
} }
params.addAll(msgs);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes));
params.add(msgEncoded);
final RPromise<Void> result = newPromise(); final RPromise<Void> result = newPromise();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));" "redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));"
+ "if ARGV[1] == '1' then " + "if ARGV[1] == '1' then "
+ "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do " // + "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do "
+ "redis.call('publish', KEYS[2], ARGV[i]); " + "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
+ "end; " // + "end; "
+ "end;", + "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray()); Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray());

@ -65,11 +65,15 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"if (mode == false) then " + "if (mode == false) then " +
"redis.call('hset', KEYS[1], 'mode', 'read'); " + "redis.call('hset', KEYS[1], 'mode', 'read'); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[1] .. ':timeout:1', 1); " +
"redis.call('pexpire', KEYS[1] .. ':timeout:1', ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " + "return nil; " +
"end; " + "end; " +
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " + "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('set', KEYS[1] .. ':timeout:' .. ind, 1); " +
"redis.call('pexpire', KEYS[1] .. ':timeout:' .. ind, ARGV[1]); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " + "return nil; " +
"end;" + "end;" +
@ -85,27 +89,39 @@ public class RedissonReadLock extends RedissonLock implements RLock {
"redis.call('publish', KEYS[2], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " + "return 1; " +
"end; " + "end; " +
// "if (mode == 'read') then " + "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " + "if (lockExists == 0) then " +
"if (lockExists == 0) then " + "return nil;" +
"return nil;" + "else " +
"else " + "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "redis.call('del', KEYS[1] .. ':timeout:' .. (counter+1)); " +
"if (counter > 0) then " + "if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " + "local maxRemainTime = -3; " +
"return 0; " + "for i=counter, 1, -1 do " +
"local remainTime = redis.call('pttl', KEYS[1] .. ':timeout:' .. i); " +
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"if maxRemainTime > 0 then " +
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"else " + "else " +
"redis.call('hdel', KEYS[1], ARGV[3]); " + "redis.call('hdel', KEYS[1], ARGV[2]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " + "if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " + "redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " +
"end; " + "end; " +
"return 1; "+ "end;" +
"return 0; " +
"else " +
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"if (redis.call('hlen', KEYS[1]) == 1) then " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"end; " + "end; " +
"return 1; "+
"end; " + "end; " +
// "end; " + "end; " +
"return nil; ", "return nil; ",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, getLockName(Thread.currentThread().getId()));
if (opStatus == null) { if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: " throw new IllegalMonitorStateException("attempt to unlock read lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId()); + id + " thread-id: " + Thread.currentThread().getId());

@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
*/ */
public class LocalCachedMapOptions { public class LocalCachedMapOptions {
public enum EvictionPolicy {NONE, LRU, LFU}; public enum EvictionPolicy {NONE, LRU, LFU, SOFT};
private boolean invalidateEntryOnChange; private boolean invalidateEntryOnChange;
private EvictionPolicy evictionPolicy; private EvictionPolicy evictionPolicy;
@ -115,6 +115,7 @@ public class LocalCachedMapOptions {
* @param evictionPolicy * @param evictionPolicy
* <p><code>LRU</code> - uses cache with LRU (least recently used) eviction policy. * <p><code>LRU</code> - uses cache with LRU (least recently used) eviction policy.
* <p><code>LFU</code> - uses cache with LFU (least frequently used) eviction policy. * <p><code>LFU</code> - uses cache with LFU (least frequently used) eviction policy.
* <p><code>SOFT</code> - uses cache with soft references. The garbage collector will evict items from the cache when the JVM is running out of memory.
* <p><code>NONE</code> - doesn't use eviction policy, but timeToLive and maxIdleTime params are still working. * <p><code>NONE</code> - doesn't use eviction policy, but timeToLive and maxIdleTime params are still working.
* @return LocalCachedMapOptions instance * @return LocalCachedMapOptions instance
*/ */

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.misc; package org.redisson.cache;
import java.util.AbstractCollection; import java.util.AbstractCollection;
import java.util.AbstractMap.SimpleEntry; import java.util.AbstractMap.SimpleEntry;
@ -37,56 +37,8 @@ import io.netty.util.internal.PlatformDependent;
*/ */
public abstract class AbstractCacheMap<K, V> implements Cache<K, V> { public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
public static class CachedValue {
private final Object key;
private final Object value;
long ttl;
long maxIdleTime;
long creationTime;
long lastAccess;
public CachedValue(Object key, Object value, long ttl, long maxIdleTime) {
this.value = value;
this.ttl = ttl;
this.key = key;
this.maxIdleTime = maxIdleTime;
creationTime = System.currentTimeMillis();
lastAccess = creationTime;
}
public boolean isExpired() {
boolean result = false;
long currentTime = System.currentTimeMillis();
if (ttl != 0 && creationTime + ttl < currentTime) {
result = true;
}
if (maxIdleTime != 0 && lastAccess + maxIdleTime < currentTime) {
result = true;
}
return result;
}
public Object getKey() {
return key;
}
public Object getValue() {
lastAccess = System.currentTimeMillis();
return value;
}
@Override
public String toString() {
return "CachedValue [key=" + key + ", value=" + value + "]";
}
}
final int size; final int size;
final ConcurrentMap<K, CachedValue> map = PlatformDependent.newConcurrentHashMap(); final ConcurrentMap<K, CachedValue<K, V>> map = PlatformDependent.newConcurrentHashMap();
private final long timeToLiveInMillis; private final long timeToLiveInMillis;
private final long maxIdleInMillis; private final long maxIdleInMillis;
@ -100,11 +52,11 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
this.timeToLiveInMillis = timeToLiveInMillis; this.timeToLiveInMillis = timeToLiveInMillis;
} }
protected void onValueRead(CachedValue value) { protected void onValueRead(CachedValue<K, V> value) {
} }
protected void onValueRemove(CachedValue value) { protected void onValueRemove(CachedValue<K, V> value) {
} }
@ -137,7 +89,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
throw new NullPointerException(); throw new NullPointerException();
} }
CachedValue entry = map.get(key); CachedValue<K, V> entry = map.get(key);
if (entry == null) { if (entry == null) {
return false; return false;
} }
@ -161,8 +113,8 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
throw new NullPointerException(); throw new NullPointerException();
} }
for (Map.Entry<K, CachedValue> entry : map.entrySet()) { for (Map.Entry<K, CachedValue<K, V>> entry : map.entrySet()) {
CachedValue cachedValue = entry.getValue(); CachedValue<K, V> cachedValue = entry.getValue();
if (cachedValue.getValue().equals(value)) { if (cachedValue.getValue().equals(value)) {
if (cachedValue.isExpired()) { if (cachedValue.isExpired()) {
if (map.remove(cachedValue.getKey(), cachedValue)) { if (map.remove(cachedValue.getKey(), cachedValue)) {
@ -187,7 +139,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
throw new NullPointerException(); throw new NullPointerException();
} }
CachedValue entry = map.get(key); CachedValue<K, V> entry = map.get(key);
if (entry == null) { if (entry == null) {
return null; return null;
} }
@ -201,8 +153,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
return readValue(entry); return readValue(entry);
} }
@SuppressWarnings("unchecked") protected V readValue(CachedValue<K, V> entry) {
protected V readValue(CachedValue entry) {
onValueRead(entry); onValueRead(entry);
return (V) entry.getValue(); return (V) entry.getValue();
} }
@ -216,17 +167,16 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
return put(key, value, timeToLiveInMillis, TimeUnit.MILLISECONDS, maxIdleInMillis, TimeUnit.MILLISECONDS); return put(key, value, timeToLiveInMillis, TimeUnit.MILLISECONDS, maxIdleInMillis, TimeUnit.MILLISECONDS);
} }
@SuppressWarnings("unchecked")
@Override @Override
public V put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) { public V put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdleTime, TimeUnit maxIdleUnit) {
CachedValue entry = create(key, value, ttlUnit.toMillis(ttl), maxIdleUnit.toMillis(maxIdleTime)); CachedValue<K, V> entry = create(key, value, ttlUnit.toMillis(ttl), maxIdleUnit.toMillis(maxIdleTime));
if (isFull(key)) { if (isFull(key)) {
if (!removeExpiredEntries()) { if (!removeExpiredEntries()) {
onMapFull(); onMapFull();
} }
} }
onValueCreate(entry); onValueCreate(entry);
CachedValue prevCachedValue = map.put(key, entry); CachedValue<K, V> prevCachedValue = map.put(key, entry);
if (prevCachedValue != null) { if (prevCachedValue != null) {
onValueRemove(prevCachedValue); onValueRemove(prevCachedValue);
if (!prevCachedValue.isExpired()) { if (!prevCachedValue.isExpired()) {
@ -236,17 +186,17 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
return null; return null;
} }
protected CachedValue create(K key, V value, long ttl, long maxIdleTime) { protected CachedValue<K, V> create(K key, V value, long ttl, long maxIdleTime) {
return new CachedValue(key, value, ttl, maxIdleTime); return new StdCachedValue<K, V>(key, value, ttl, maxIdleTime);
} }
protected void onValueCreate(CachedValue entry) { protected void onValueCreate(CachedValue<K, V> entry) {
} }
private boolean removeExpiredEntries() { protected boolean removeExpiredEntries() {
boolean removed = false; boolean removed = false;
// TODO optimize // TODO optimize
for (CachedValue value : map.values()) { for (CachedValue<K, V> value : map.values()) {
if (value.isExpired()) { if (value.isExpired()) {
if (map.remove(value.getKey(), value)) { if (map.remove(value.getKey(), value)) {
onValueRemove(value); onValueRemove(value);
@ -280,10 +230,9 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
* (non-Javadoc) * (non-Javadoc)
* @see java.util.Map#remove(java.lang.Object) * @see java.util.Map#remove(java.lang.Object)
*/ */
@SuppressWarnings("unchecked")
@Override @Override
public V remove(Object key) { public V remove(Object key) {
CachedValue entry = map.remove(key); CachedValue<K, V> entry = map.remove(key);
if (entry != null) { if (entry != null) {
onValueRemove(entry); onValueRemove(entry);
if (!entry.isExpired()) { if (!entry.isExpired()) {
@ -346,9 +295,9 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
abstract class MapIterator<M> implements Iterator<M> { abstract class MapIterator<M> implements Iterator<M> {
final Iterator<Map.Entry<K, CachedValue>> keyIterator = map.entrySet().iterator(); final Iterator<Map.Entry<K, CachedValue<K, V>>> keyIterator = map.entrySet().iterator();
Map.Entry<K, CachedValue> mapEntry; Map.Entry<K, CachedValue<K, V>> mapEntry;
@Override @Override
public boolean hasNext() { public boolean hasNext() {
@ -357,7 +306,7 @@ public abstract class AbstractCacheMap<K, V> implements Cache<K, V> {
} }
mapEntry = null; mapEntry = null;
while (keyIterator.hasNext()) { while (keyIterator.hasNext()) {
Map.Entry<K, CachedValue> entry = keyIterator.next(); Map.Entry<K, CachedValue<K, V>> entry = keyIterator.next();
if (entry.getValue().isExpired()) { if (entry.getValue().isExpired()) {
continue; continue;
} }

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.misc; package org.redisson.cache;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;

@ -0,0 +1,27 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
/**
* Created by jribble on 2/20/17.
*/
public interface CachedValue<K, V> {
boolean isExpired();
K getKey();
V getValue();
}

@ -0,0 +1,34 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
public class CachedValueReference<V> extends SoftReference<V> {
private final CachedValue<?, ?> owner;
public CachedValueReference(CachedValue<?, ?> owner, V referent, ReferenceQueue<? super V> q) {
super(referent, q);
this.owner = owner;
}
public CachedValue<?, ?> getOwner() {
return owner;
}
}

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.misc; package org.redisson.cache;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
@ -57,7 +57,7 @@ public class LFUCacheMap<K, V> extends AbstractCacheMap<K, V> {
} }
public static class LFUCachedValue extends CachedValue { public static class LFUCachedValue extends StdCachedValue {
Long id; Long id;
long accessCount; long accessCount;

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.misc; package org.redisson.cache;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.misc; package org.redisson.cache;
/** /**
* *

@ -0,0 +1,55 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.lang.ref.ReferenceQueue;
/**
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class SoftCacheMap<K, V> extends AbstractCacheMap<K, V> {
private final ReferenceQueue<V> queue = new ReferenceQueue<V>();
public SoftCacheMap(long timeToLiveInMillis, long maxIdleInMillis) {
super(0, timeToLiveInMillis, maxIdleInMillis);
}
protected CachedValue<K, V> create(K key, V value, long ttl, long maxIdleTime) {
return new SoftCachedValue<K, V>(key, value, ttl, maxIdleTime, queue);
}
@Override
protected boolean removeExpiredEntries() {
while (true) {
CachedValueReference<V> value = (CachedValueReference<V>) queue.poll();
if (value == null) {
break;
}
map.remove(value.getOwner().getKey(), value.getOwner());
}
return super.removeExpiredEntries();
}
@Override
protected void onMapFull() {
}
}

@ -0,0 +1,39 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
import java.lang.ref.ReferenceQueue;
/**
* Created by jribble on 2/20/17.
*/
public class SoftCachedValue<K, V> extends StdCachedValue<K, V> implements CachedValue<K, V> {
private final CachedValueReference<V> ref;
public SoftCachedValue(K key, V value, long ttl, long maxIdleTime, ReferenceQueue<V> queue) {
super(key, null, ttl, maxIdleTime);
this.ref = new CachedValueReference<V>(this, value, queue);
}
@Override
public V getValue() {
super.getValue();
return ref.get();
}
}

@ -0,0 +1,71 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.cache;
/**
* Created by jribble on 2/20/17.
*/
public class StdCachedValue<K, V> implements CachedValue<K, V> {
protected final K key;
protected final V value;
long ttl;
long maxIdleTime;
long creationTime;
long lastAccess;
public StdCachedValue(K key, V value, long ttl, long maxIdleTime) {
this.value = value;
this.ttl = ttl;
this.key = key;
this.maxIdleTime = maxIdleTime;
creationTime = System.currentTimeMillis();
lastAccess = creationTime;
}
@Override
public boolean isExpired() {
boolean result = false;
long currentTime = System.currentTimeMillis();
if (ttl != 0 && creationTime + ttl < currentTime) {
result = true;
}
if (maxIdleTime != 0 && lastAccess + maxIdleTime < currentTime) {
result = true;
}
return result;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
lastAccess = System.currentTimeMillis();
return value;
}
@Override
public String toString() {
return "CachedValue [key=" + key + ", value=" + value + "]";
}
}

@ -38,6 +38,11 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.ScheduledFuture;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisConnection implements RedisCommands { public class RedisConnection implements RedisCommands {
private static final AttributeKey<RedisConnection> CONNECTION = AttributeKey.valueOf("connection"); private static final AttributeKey<RedisConnection> CONNECTION = AttributeKey.valueOf("connection");

@ -40,6 +40,11 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
/**
*
* @author Nikita Koksharov
*
*/
public class RedisPubSubConnection extends RedisConnection { public class RedisPubSubConnection extends RedisConnection {
final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>(); final Queue<RedisPubSubListener<Object>> listeners = new ConcurrentLinkedQueue<RedisPubSubListener<Object>>();

@ -184,7 +184,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
CommandsData commandBatch) { CommandsData commandBatch) {
int i = state().getBatchIndex(); int i = state().getBatchIndex();
RedisException error = null; Throwable error = null;
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null; CommandData<Object, Object> cmd = null;
try { try {
@ -192,12 +192,12 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state().setBatchIndex(i); state().setBatchIndex(i);
cmd = (CommandData<Object, Object>) commandBatch.getCommands().get(i); cmd = (CommandData<Object, Object>) commandBatch.getCommands().get(i);
decode(in, cmd, null, ctx.channel()); decode(in, cmd, null, ctx.channel());
i++; } catch (Exception e) {
} catch (IOException e) {
cmd.tryFailure(e); cmd.tryFailure(e);
} }
i++;
if (!cmd.isSuccess()) { if (!cmd.isSuccess()) {
error = (RedisException) cmd.cause(); error = cmd.cause();
} }
} }

@ -671,7 +671,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private Collection<ClusterPartition> parsePartitions(List<ClusterNodeInfo> nodes) { private Collection<ClusterPartition> parsePartitions(List<ClusterNodeInfo> nodes) {
Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>(); Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>();
for (ClusterNodeInfo clusterNodeInfo : nodes) { for (ClusterNodeInfo clusterNodeInfo : nodes) {
if (clusterNodeInfo.containsFlag(Flag.NOADDR)) { if (clusterNodeInfo.containsFlag(Flag.NOADDR) || clusterNodeInfo.containsFlag(Flag.HANDSHAKE)) {
// skip it // skip it
continue; continue;
} }

@ -31,16 +31,6 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
*/ */
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer(); private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
/**
* Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node
*/
private int slaveSubscriptionConnectionMinimumIdleSize = 1;
/**
* Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node
*/
private int slaveSubscriptionConnectionPoolSize = 50;
/** /**
* Redis 'slave' node minimum idle connection amount for <b>each</b> slave node * Redis 'slave' node minimum idle connection amount for <b>each</b> slave node
*/ */
@ -62,6 +52,18 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
private int masterConnectionPoolSize = 64; private int masterConnectionPoolSize = 64;
private ReadMode readMode = ReadMode.SLAVE; private ReadMode readMode = ReadMode.SLAVE;
private SubscriptionMode subscriptionMode = SubscriptionMode.SLAVE;
/**
* Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node
*/
private int subscriptionConnectionMinimumIdleSize = 1;
/**
* Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node
*/
private int subscriptionConnectionPoolSize = 50;
public BaseMasterSlaveServersConfig() { public BaseMasterSlaveServersConfig() {
} }
@ -71,11 +73,12 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
setLoadBalancer(config.getLoadBalancer()); setLoadBalancer(config.getLoadBalancer());
setMasterConnectionPoolSize(config.getMasterConnectionPoolSize()); setMasterConnectionPoolSize(config.getMasterConnectionPoolSize());
setSlaveConnectionPoolSize(config.getSlaveConnectionPoolSize()); setSlaveConnectionPoolSize(config.getSlaveConnectionPoolSize());
setSlaveSubscriptionConnectionPoolSize(config.getSlaveSubscriptionConnectionPoolSize()); setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize());
setMasterConnectionMinimumIdleSize(config.getMasterConnectionMinimumIdleSize()); setMasterConnectionMinimumIdleSize(config.getMasterConnectionMinimumIdleSize());
setSlaveConnectionMinimumIdleSize(config.getSlaveConnectionMinimumIdleSize()); setSlaveConnectionMinimumIdleSize(config.getSlaveConnectionMinimumIdleSize());
setSlaveSubscriptionConnectionMinimumIdleSize(config.getSlaveSubscriptionConnectionMinimumIdleSize()); setSubscriptionConnectionMinimumIdleSize(config.getSubscriptionConnectionMinimumIdleSize());
setReadMode(config.getReadMode()); setReadMode(config.getReadMode());
setSubscriptionMode(config.getSubscriptionMode());
} }
/** /**
@ -134,24 +137,40 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return loadBalancer; return loadBalancer;
} }
/**
* @deprecated use {@link #setSubscriptionConnectionPoolSize(int)}
*
* @param slaveSubscriptionConnectionPoolSize - pool size
* @return config
*/
@Deprecated
public T setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) {
return setSubscriptionConnectionPoolSize(slaveSubscriptionConnectionPoolSize);
}
@Deprecated
public int getSlaveSubscriptionConnectionPoolSize() {
return getSubscriptionConnectionPoolSize();
}
/** /**
* Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node * Redis 'slave' node maximum subscription (pub/sub) connection pool size for <b>each</b> slave node
* <p> * <p>
* Default is <code>50</code> * Default is <code>50</code>
* <p> * <p>
* @see #setSlaveSubscriptionConnectionMinimumIdleSize(int) * @see #setSubscriptionConnectionMinimumIdleSize(int)
* *
* @param slaveSubscriptionConnectionPoolSize - pool size * @param subscriptionConnectionPoolSize - pool size
* @return config * @return config
*/ */
public T setSlaveSubscriptionConnectionPoolSize(int slaveSubscriptionConnectionPoolSize) { public T setSubscriptionConnectionPoolSize(int subscriptionConnectionPoolSize) {
this.slaveSubscriptionConnectionPoolSize = slaveSubscriptionConnectionPoolSize; this.subscriptionConnectionPoolSize = subscriptionConnectionPoolSize;
return (T)this; return (T)this;
} }
public int getSlaveSubscriptionConnectionPoolSize() { public int getSubscriptionConnectionPoolSize() {
return slaveSubscriptionConnectionPoolSize; return subscriptionConnectionPoolSize;
} }
/** /**
* Redis 'slave' node minimum idle connection amount for <b>each</b> slave node * Redis 'slave' node minimum idle connection amount for <b>each</b> slave node
* <p> * <p>
@ -188,24 +207,40 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return masterConnectionMinimumIdleSize; return masterConnectionMinimumIdleSize;
} }
/**
* @deprecated use {@link #setSubscriptionConnectionMinimumIdleSize(int)}
*
* @param slaveSubscriptionConnectionMinimumIdleSize - pool size
* @return config
*/
@Deprecated
public T setSlaveSubscriptionConnectionMinimumIdleSize(int slaveSubscriptionConnectionMinimumIdleSize) {
return setSubscriptionConnectionMinimumIdleSize(slaveSubscriptionConnectionMinimumIdleSize);
}
@Deprecated
public int getSlaveSubscriptionConnectionMinimumIdleSize() {
return getSubscriptionConnectionMinimumIdleSize();
}
/** /**
* Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node. * Redis 'slave' node minimum idle subscription (pub/sub) connection amount for <b>each</b> slave node.
* <p> * <p>
* Default is <code>1</code> * Default is <code>1</code>
* <p> * <p>
* @see #setSlaveSubscriptionConnectionPoolSize(int) * @see #setSubscriptionConnectionPoolSize(int)
* *
* @param slaveSubscriptionConnectionMinimumIdleSize - pool size * @param subscriptionConnectionMinimumIdleSize - pool size
* @return config * @return config
*/ */
public T setSlaveSubscriptionConnectionMinimumIdleSize(int slaveSubscriptionConnectionMinimumIdleSize) { public T setSubscriptionConnectionMinimumIdleSize(int subscriptionConnectionMinimumIdleSize) {
this.slaveSubscriptionConnectionMinimumIdleSize = slaveSubscriptionConnectionMinimumIdleSize; this.subscriptionConnectionMinimumIdleSize = subscriptionConnectionMinimumIdleSize;
return (T) this; return (T) this;
} }
public int getSlaveSubscriptionConnectionMinimumIdleSize() { public int getSubscriptionConnectionMinimumIdleSize() {
return slaveSubscriptionConnectionMinimumIdleSize; return subscriptionConnectionMinimumIdleSize;
} }
/** /**
* Set node type used for read operation. * Set node type used for read operation.
* <p> * <p>
@ -222,4 +257,21 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
return readMode; return readMode;
} }
/**
* Set node type used for subscription operation.
* <p>
* Default is <code>SLAVE</code>
*
* @param subscriptionMode param
* @return config
*/
public T setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
return (T) this;
}
public SubscriptionMode getSubscriptionMode() {
return subscriptionMode;
}
} }

@ -556,12 +556,24 @@ public class Config {
* Read config object stored in JSON format from <code>File</code> * Read config object stored in JSON format from <code>File</code>
* *
* @param file object * @param file object
* @param classLoader class loader
* @return config * @return config
* @throws IOException error * @throws IOException error
*/ */
public static Config fromJSON(File file) throws IOException { public static Config fromJSON(File file, ClassLoader classLoader) throws IOException {
ConfigSupport support = new ConfigSupport(); ConfigSupport support = new ConfigSupport();
return support.fromJSON(file, Config.class); return support.fromJSON(file, Config.class, classLoader);
}
/**
* Read config object stored in JSON format from <code>File</code>
*
* @param file object
* @return config
* @throws IOException error
*/
public static Config fromJSON(File file) throws IOException {
return fromJSON(file);
} }
/** /**
@ -634,6 +646,11 @@ public class Config {
ConfigSupport support = new ConfigSupport(); ConfigSupport support = new ConfigSupport();
return support.fromYAML(file, Config.class); return support.fromYAML(file, Config.class);
} }
public static Config fromYAML(File file, ClassLoader classLoader) throws IOException {
ConfigSupport support = new ConfigSupport();
return support.fromYAML(file, Config.class, classLoader);
}
/** /**
* Read config object stored in YAML format from <code>URL</code> * Read config object stored in YAML format from <code>URL</code>

@ -26,13 +26,16 @@ import java.util.List;
import org.redisson.api.RedissonNodeInitializer; import org.redisson.api.RedissonNodeInitializer;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.cluster.ClusterConnectionManager; import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.codec.CodecProvider;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.ElasticacheConnectionManager; import org.redisson.connection.ElasticacheConnectionManager;
import org.redisson.connection.ReplicatedConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.ReplicatedConnectionManager;
import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager; import org.redisson.connection.SingleConnectionManager;
import org.redisson.connection.balancer.LoadBalancer; import org.redisson.connection.balancer.LoadBalancer;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.misc.URLBuilder;
import com.fasterxml.jackson.annotation.JsonFilter; import com.fasterxml.jackson.annotation.JsonFilter;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
@ -45,10 +48,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ser.FilterProvider; import com.fasterxml.jackson.databind.ser.FilterProvider;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.misc.URLBuilder;
/** /**
* *
@ -118,8 +119,8 @@ public class ConfigSupport {
} }
private final ObjectMapper jsonMapper = createMapper(null); private ObjectMapper jsonMapper = createMapper(null, null);
private final ObjectMapper yamlMapper = createMapper(new YAMLFactory()); private ObjectMapper yamlMapper = createMapper(new YAMLFactory(), null);
public <T> T fromJSON(String content, Class<T> configType) throws IOException { public <T> T fromJSON(String content, Class<T> configType) throws IOException {
URLBuilder.replaceURLFactory(); URLBuilder.replaceURLFactory();
@ -131,8 +132,13 @@ public class ConfigSupport {
} }
public <T> T fromJSON(File file, Class<T> configType) throws IOException { public <T> T fromJSON(File file, Class<T> configType) throws IOException {
return fromJSON(file, configType, null);
}
public <T> T fromJSON(File file, Class<T> configType, ClassLoader classLoader) throws IOException {
URLBuilder.replaceURLFactory(); URLBuilder.replaceURLFactory();
try { try {
jsonMapper = createMapper(null, classLoader);
return jsonMapper.readValue(file, configType); return jsonMapper.readValue(file, configType);
} finally { } finally {
URLBuilder.restoreURLFactory(); URLBuilder.restoreURLFactory();
@ -192,6 +198,17 @@ public class ConfigSupport {
URLBuilder.restoreURLFactory(); URLBuilder.restoreURLFactory();
} }
} }
public <T> T fromYAML(File file, Class<T> configType, ClassLoader classLoader) throws IOException {
URLBuilder.replaceURLFactory();
try {
yamlMapper = createMapper(new YAMLFactory(), classLoader);
return yamlMapper.readValue(file, configType);
} finally {
URLBuilder.restoreURLFactory();
}
}
public <T> T fromYAML(URL url, Class<T> configType) throws IOException { public <T> T fromYAML(URL url, Class<T> configType) throws IOException {
URLBuilder.replaceURLFactory(); URLBuilder.replaceURLFactory();
@ -266,12 +283,12 @@ public class ConfigSupport {
if (config.getMasterConnectionPoolSize() < config.getMasterConnectionMinimumIdleSize()) { if (config.getMasterConnectionPoolSize() < config.getMasterConnectionMinimumIdleSize()) {
throw new IllegalArgumentException("masterConnectionPoolSize can't be lower than masterConnectionMinimumIdleSize"); throw new IllegalArgumentException("masterConnectionPoolSize can't be lower than masterConnectionMinimumIdleSize");
} }
if (config.getSlaveSubscriptionConnectionPoolSize() < config.getSlaveSubscriptionConnectionMinimumIdleSize()) { if (config.getSubscriptionConnectionPoolSize() < config.getSubscriptionConnectionMinimumIdleSize()) {
throw new IllegalArgumentException("slaveSubscriptionConnectionMinimumIdleSize can't be lower than slaveSubscriptionConnectionPoolSize"); throw new IllegalArgumentException("slaveSubscriptionConnectionMinimumIdleSize can't be lower than slaveSubscriptionConnectionPoolSize");
} }
} }
private ObjectMapper createMapper(JsonFactory mapping) { private ObjectMapper createMapper(JsonFactory mapping, ClassLoader classLoader) {
ObjectMapper mapper = new ObjectMapper(mapping); ObjectMapper mapper = new ObjectMapper(mapping);
mapper.addMixIn(MasterSlaveServersConfig.class, MasterSlaveServersConfigMixIn.class); mapper.addMixIn(MasterSlaveServersConfig.class, MasterSlaveServersConfigMixIn.class);
mapper.addMixIn(SingleServerConfig.class, SingleSeverConfigMixIn.class); mapper.addMixIn(SingleServerConfig.class, SingleSeverConfigMixIn.class);
@ -285,6 +302,13 @@ public class ConfigSupport {
.addFilter("classFilter", SimpleBeanPropertyFilter.filterOutAllExcept()); .addFilter("classFilter", SimpleBeanPropertyFilter.filterOutAllExcept());
mapper.setFilterProvider(filterProvider); mapper.setFilterProvider(filterProvider);
mapper.setSerializationInclusion(Include.NON_NULL); mapper.setSerializationInclusion(Include.NON_NULL);
if (classLoader != null) {
TypeFactory tf = TypeFactory.defaultInstance()
.withClassLoader(classLoader);
mapper.setTypeFactory(tf);
}
return mapper; return mapper;
} }

@ -15,6 +15,11 @@
*/ */
package org.redisson.config; package org.redisson.config;
/**
*
* @author Nikita Koksharov
*
*/
public enum ReadMode { public enum ReadMode {
/** /**

@ -0,0 +1,35 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.config;
/**
*
* @author Nikita Koksharov
*
*/
public enum SubscriptionMode {
/**
* Subscribe to slave nodes
*/
SLAVE,
/**
* Subscribe to master node
*/
MASTER
}

@ -0,0 +1,64 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class CountListener implements FutureListener<Void> {
private final RPromise<Void> res;
private final AtomicInteger counter;
public static RPromise<Void> create(RFuture<Void>... futures) {
RPromise<Void> result = new RedissonPromise<Void>();
FutureListener<Void> listener = new CountListener(result, futures.length);
for (RFuture<Void> future : futures) {
future.addListener(listener);
}
return result;
}
public CountListener(RPromise<Void> res, int amount) {
super();
this.res = res;
this.counter = new AtomicInteger(amount);
}
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
res.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
res.trySuccess(null);
}
}
}

@ -320,7 +320,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
c.setClientName(cfg.getClientName()); c.setClientName(cfg.getClientName());
c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize()); c.setMasterConnectionPoolSize(cfg.getMasterConnectionPoolSize());
c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize()); c.setSlaveConnectionPoolSize(cfg.getSlaveConnectionPoolSize());
c.setSlaveSubscriptionConnectionPoolSize(cfg.getSlaveSubscriptionConnectionPoolSize()); c.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); c.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
c.setConnectTimeout(cfg.getConnectTimeout()); c.setConnectTimeout(cfg.getConnectTimeout());
c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); c.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
@ -329,8 +329,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
c.setReconnectionTimeout(cfg.getReconnectionTimeout()); c.setReconnectionTimeout(cfg.getReconnectionTimeout());
c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize()); c.setMasterConnectionMinimumIdleSize(cfg.getMasterConnectionMinimumIdleSize());
c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize()); c.setSlaveConnectionMinimumIdleSize(cfg.getSlaveConnectionMinimumIdleSize());
c.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSlaveSubscriptionConnectionMinimumIdleSize()); c.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
c.setReadMode(cfg.getReadMode()); c.setReadMode(cfg.getReadMode());
c.setSubscriptionMode(cfg.getSubscriptionMode());
return c; return c;
} }

@ -37,9 +37,11 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.connection.pool.MasterPubSubConnectionPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -65,6 +67,8 @@ public class MasterSlaveEntry {
final MasterConnectionPool writeConnectionHolder; final MasterConnectionPool writeConnectionHolder;
final Set<Integer> slots = new HashSet<Integer>(); final Set<Integer> slots = new HashSet<Integer>();
final MasterPubSubConnectionPool pubSubConnectionHolder;
final AtomicBoolean active = new AtomicBoolean(true); final AtomicBoolean active = new AtomicBoolean(true);
@ -79,6 +83,7 @@ public class MasterSlaveEntry {
slaveBalancer = new LoadBalancerManager(config, connectionManager, this); slaveBalancer = new LoadBalancerManager(config, connectionManager, this);
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this);
} }
public List<RFuture<Void>> initSlaveBalancer(Collection<URL> disconnectedNodes) { public List<RFuture<Void>> initSlaveBalancer(Collection<URL> disconnectedNodes) {
@ -98,8 +103,20 @@ public class MasterSlaveEntry {
public RFuture<Void> setupMasterEntry(String host, int port) { public RFuture<Void> setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, host, port); RedisClient client = connectionManager.createClient(NodeType.MASTER, host, port);
masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(), masterEntry = new ClientConnectionsEntry(
0, 0, connectionManager, NodeType.MASTER); client,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(),
connectionManager,
NodeType.MASTER);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> f = writeConnectionHolder.add(masterEntry);
RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);
return CountListener.create(s, f);
}
return writeConnectionHolder.add(masterEntry); return writeConnectionHolder.add(masterEntry);
} }
@ -307,8 +324,8 @@ public class MasterSlaveEntry {
ClientConnectionsEntry entry = new ClientConnectionsEntry(client, ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(), this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionMinimumIdleSize(), this.config.getSubscriptionConnectionMinimumIdleSize(),
this.config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, mode); this.config.getSubscriptionConnectionPoolSize(), connectionManager, mode);
if (freezed) { if (freezed) {
synchronized (entry) { synchronized (entry) {
entry.setFreezed(freezed); entry.setFreezed(freezed);
@ -352,6 +369,7 @@ public class MasterSlaveEntry {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
writeConnectionHolder.remove(oldMaster); writeConnectionHolder.remove(oldMaster);
pubSubConnectionHolder.remove(oldMaster);
slaveDown(oldMaster, FreezeReason.MANAGER); slaveDown(oldMaster, FreezeReason.MANAGER);
// more than one slave available, so master can be removed from slaves // more than one slave available, so master can be removed from slaves
@ -406,10 +424,18 @@ public class MasterSlaveEntry {
} }
RFuture<RedisPubSubConnection> nextPubSubConnection() { RFuture<RedisPubSubConnection> nextPubSubConnection() {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
return pubSubConnectionHolder.get();
}
return slaveBalancer.nextPubSubConnection(); return slaveBalancer.nextPubSubConnection();
} }
public void returnPubSubConnection(PubSubConnectionEntry entry) { public void returnPubSubConnection(PubSubConnectionEntry entry) {
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection());
return;
}
slaveBalancer.returnPubSubConnection(entry.getConnection()); slaveBalancer.returnPubSubConnection(entry.getConnection());
} }

@ -25,6 +25,7 @@ import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
import org.redisson.config.SingleServerConfig; import org.redisson.config.SingleServerConfig;
import org.redisson.config.SubscriptionMode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -71,15 +72,16 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig.setMasterAddress(addr); newconfig.setMasterAddress(addr);
newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize()); newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize());
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());
newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize()); newconfig.setSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
newconfig.setConnectTimeout(cfg.getConnectTimeout()); newconfig.setConnectTimeout(cfg.getConnectTimeout());
newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout());
newconfig.setFailedAttempts(cfg.getFailedAttempts()); newconfig.setFailedAttempts(cfg.getFailedAttempts());
newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout()); newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout());
newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize()); newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize());
newconfig.setSlaveSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize());
newconfig.setReadMode(ReadMode.MASTER); newconfig.setReadMode(ReadMode.MASTER);
newconfig.setSubscriptionMode(SubscriptionMode.MASTER);
return newconfig; return newconfig;
} }

@ -17,69 +17,22 @@ package org.redisson.connection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SinglePubSubConnectionPool;
import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
/**
*
* @author Nikita Koksharov
*
*/
public class SingleEntry extends MasterSlaveEntry { public class SingleEntry extends MasterSlaveEntry {
final PubSubConnectionPool pubSubConnectionHolder;
public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(slotRanges, connectionManager, config); super(slotRanges, connectionManager, config);
pubSubConnectionHolder = new SinglePubSubConnectionPool(config, connectionManager, this);
}
@Override
public RFuture<Void> setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(NodeType.MASTER, host, port);
masterEntry = new ClientConnectionsEntry(masterClient,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
config.getSlaveConnectionMinimumIdleSize(),
config.getSlaveSubscriptionConnectionPoolSize(), connectionManager, NodeType.MASTER);
final RPromise<Void> res = connectionManager.newPromise();
RFuture<Void> f = writeConnectionHolder.add(masterEntry);
RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);
FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2);
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
res.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
res.trySuccess(null);
}
}
};
f.addListener(listener);
s.addListener(listener);
return res;
}
@Override
RFuture<RedisPubSubConnection> nextPubSubConnection() {
return pubSubConnectionHolder.get();
}
@Override
public void returnPubSubConnection(PubSubConnectionEntry entry) {
pubSubConnectionHolder.returnConnection(masterEntry, entry.getConnection());
} }
@Override @Override

@ -26,15 +26,20 @@ import org.redisson.connection.MasterSlaveEntry;
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public class SinglePubSubConnectionPool extends PubSubConnectionPool { public class MasterPubSubConnectionPool extends PubSubConnectionPool {
public SinglePubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, public MasterPubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager,
MasterSlaveEntry masterSlaveEntry) { MasterSlaveEntry masterSlaveEntry) {
super(config, connectionManager, masterSlaveEntry); super(config, connectionManager, masterSlaveEntry);
} }
@Override
protected ClientConnectionsEntry getEntry() { protected ClientConnectionsEntry getEntry() {
return entries.get(0); return entries.get(0);
} }
public void remove(ClientConnectionsEntry entry) {
entries.remove(entry);
}
} }

@ -47,7 +47,7 @@ public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection>
@Override @Override
protected int getMinimumIdleSize(ClientConnectionsEntry entry) { protected int getMinimumIdleSize(ClientConnectionsEntry entry) {
return config.getSlaveSubscriptionConnectionMinimumIdleSize(); return config.getSubscriptionConnectionMinimumIdleSize();
} }
@Override @Override

@ -33,6 +33,35 @@ public class URLBuilder {
private static URLStreamHandlerFactory currentFactory; private static URLStreamHandlerFactory currentFactory;
private final static URLStreamHandlerFactory newFactory = new URLStreamHandlerFactory() {
@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
if ("redis".equals(protocol)) {
return new URLStreamHandler() {
@Override
protected URLConnection openConnection(URL u) throws IOException {
throw new UnsupportedOperationException();
};
@Override
protected boolean equals(URL u1, URL u2) {
return u1.toString().equals(u2.toString());
}
@Override
protected int hashCode(URL u) {
return u.toString().hashCode();
}
};
}
if (currentFactory != null) {
return currentFactory.createURLStreamHandler(protocol);
}
return null;
}
};
public static void restoreURLFactory() { public static void restoreURLFactory() {
try { try {
Field field = URL.class.getDeclaredField("factory"); Field field = URL.class.getDeclaredField("factory");
@ -48,38 +77,15 @@ public class URLBuilder {
Field field = URL.class.getDeclaredField("factory"); Field field = URL.class.getDeclaredField("factory");
field.setAccessible(true); field.setAccessible(true);
currentFactory = (URLStreamHandlerFactory) field.get(null); currentFactory = (URLStreamHandlerFactory) field.get(null);
if (currentFactory != null) { if (currentFactory != null && currentFactory != newFactory) {
field.set(null, null); field.set(null, null);
} }
URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() { if (currentFactory != newFactory) {
@Override URL.setURLStreamHandlerFactory(newFactory);
public URLStreamHandler createURLStreamHandler(String protocol) { } else {
if ("redis".equals(protocol)) { currentFactory = null;
return new URLStreamHandler() { }
@Override
protected URLConnection openConnection(URL u) throws IOException {
throw new UnsupportedOperationException();
};
@Override
protected boolean equals(URL u1, URL u2) {
return u1.toString().equals(u2.toString());
}
@Override
protected int hashCode(URL u) {
return u.toString().hashCode();
}
};
}
if (currentFactory != null) {
return currentFactory.createURLStreamHandler(protocol);
}
return null;
}
});
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }

@ -34,7 +34,6 @@ import io.netty.util.internal.PlatformDependent;
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
* @param <E>
*/ */
abstract class PublishSubscribe<E extends PubSubEntry<E>> { abstract class PublishSubscribe<E extends PubSubEntry<E>> {

@ -18,10 +18,10 @@ import org.redisson.RedissonMapTest.SimpleKey;
import org.redisson.RedissonMapTest.SimpleValue; import org.redisson.RedissonMapTest.SimpleValue;
import org.redisson.api.LocalCachedMapOptions; import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy; import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.cache.Cache;
import org.redisson.api.RLocalCachedMap; import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.misc.Cache;
import mockit.Deencapsulation; import mockit.Deencapsulation;

@ -16,6 +16,22 @@ import org.redisson.api.RReadWriteLock;
public class RedissonReadWriteLockTest extends BaseConcurrentTest { public class RedissonReadWriteLockTest extends BaseConcurrentTest {
@Test
public void testReadLockLeaseTimeout() throws InterruptedException {
RLock readLock = redisson.getReadWriteLock("my_read_write_lock").readLock();
Assert.assertTrue(readLock.tryLock(1, 4, TimeUnit.SECONDS));
Thread.sleep(3000);
RLock readLock2 = redisson.getReadWriteLock("my_read_write_lock").readLock();
Assert.assertTrue(readLock2.tryLock(1, 4, TimeUnit.SECONDS));
readLock2.unlock();
Thread.sleep(2000);
RLock writeLock = redisson.getReadWriteLock("my_read_write_lock").writeLock();
Assert.assertTrue(writeLock.tryLock());
}
@Test @Test
public void testWriteReadReentrancy() throws InterruptedException { public void testWriteReadReentrancy() throws InterruptedException {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("TEST"); RReadWriteLock readWriteLock = redisson.getReadWriteLock("TEST");

@ -5,6 +5,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.Test; import org.junit.Test;
import org.redisson.cache.Cache;
import org.redisson.cache.LFUCacheMap;
public class LFUCacheMapTest { public class LFUCacheMapTest {

@ -5,6 +5,8 @@ import static org.assertj.core.api.Assertions.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.Test; import org.junit.Test;
import org.redisson.cache.Cache;
import org.redisson.cache.LRUCacheMap;
public class LRUCacheMapTest { public class LRUCacheMapTest {

@ -1,10 +1,15 @@
package org.redisson.misc; package org.redisson.misc;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.Test; import org.junit.Test;
import org.redisson.cache.Cache;
import org.redisson.cache.NoneCacheMap;
public class NoneCacheMapTest { public class NoneCacheMapTest {

@ -0,0 +1,68 @@
package org.redisson.misc;
import org.junit.Test;
import org.redisson.cache.Cache;
import org.redisson.cache.SoftCacheMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
public class SoftCacheMapTest {
@Test
public void testMaxIdleTimeEviction() throws InterruptedException {
Cache<Integer, Integer> map = new SoftCacheMap<Integer, Integer>(0, 0);
map.put(1, 0, 0, TimeUnit.MILLISECONDS, 400, TimeUnit.MILLISECONDS);
assertThat(map.get(1)).isEqualTo(0);
Thread.sleep(200);
assertThat(map.get(1)).isEqualTo(0);
Thread.sleep(200);
assertThat(map.get(1)).isEqualTo(0);
Thread.sleep(200);
assertThat(map.get(1)).isEqualTo(0);
Thread.sleep(410);
assertThat(map.keySet()).isEmpty();
}
@Test
public void testTTLEviction() throws InterruptedException {
Cache<Integer, Integer> map = new SoftCacheMap<Integer, Integer>(0, 0);
map.put(1, 0, 500, TimeUnit.MILLISECONDS, 0, TimeUnit.MILLISECONDS);
assertThat(map.get(1)).isEqualTo(0);
Thread.sleep(100);
assertThat(map.get(1)).isEqualTo(0);
assertThat(map.keySet()).containsOnly(1);
Thread.sleep(500);
assertThat(map.keySet()).isEmpty();
}
@Test
public void testSizeEviction() {
Cache<Integer, Integer> map = new SoftCacheMap<Integer, Integer>(0, 0);
map.put(1, 0);
map.put(2, 0);
assertThat(map.keySet()).containsOnly(1, 2);
map.put(3, 0);
map.put(4, 0);
assertThat(map.keySet()).containsOnly(1, 2, 3, 4);
}
// This test requires using -XX:SoftRefLRUPolicyMSPerMB=0 to pass
@Test
public void testSoftReferences() {
Cache<Integer, Integer> map = new SoftCacheMap<Integer, Integer>(0, 0);
for(int i=0;i<100000;i++) {
map.put(i, new Integer(i));
}
assertThat(map.values().stream().filter(Objects::nonNull).count()).isEqualTo(100000);
System.gc();
assertThat(map.values().stream().filter(Objects::nonNull).count()).isZero();
assertThat(map.values().size()).isZero();
}
}
Loading…
Cancel
Save