diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5860dc4d4..9300b6675 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,24 @@ Redisson Releases History
================================
####Please Note: trunk is current development branch.
+####24-Aug-2016 - version 2.3.0 released
+Starting from this version Redisson could be run as standalone node to execute distributed tasks. More features will be added to it in future. Read docs about it [here](https://github.com/mrniko/redisson/wiki/12.-Standalone-node)
+
+Feature - __new service added__ `RExecutorService`. More info about it [here](https://github.com/mrniko/redisson/wiki/9.-distributed-services/#93-distributed-executor-service)
+Feature - __new service added__ `RScheduledExecutorService`. More info about it
+[here](https://github.com/mrniko/redisson/wiki/9.-distributed-services#94-distributed-scheduled-executor-service)
+Feature - __new service added__ `RLiveObjectService`. More info about it
+[here](https://github.com/mrniko/redisson/wiki/9.-distributed-services/#92-live-object-service) (big thanks to Rui Gu for this amazing feature)
+Feature - __new object added__ `RBoundedBlockingQueue`. More info about it [here](https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#711-bounded-blocking-queue)
+Feature - __Redis deployment tool__. More info about it
+[here](https://github.com/mrniko/redisson/wiki/13.-Tools#131-redis-deployment-tool)
+Feature - __Cluster management tool__. More info about it [here](https://github.com/mrniko/redisson/wiki/13.-Tools#132-cluster-management-tool)
+Feature - Avro and Smile codecs added
+__Breaking api change__ - all config classes moved to `org.redisson.config` package
+__Breaking api change__ - all classes moved from `org.redisson.core` to `org.redisson.api` package
+__Breaking api change__ - switched from `io.netty.util.concurrent.Future` to `org.redisson.api.RFuture` interface
+Fixed - division by zero in WeightedRoundRobinBalancer (thanks to Shailender R Bathula)
+
####08-Aug-2016 - version 2.2.24 released
Fixed - PubSub connection in cluster mode should be connected to node according slot derived from channel name
Fixed - `RLock.tryLock` could block forever under some conditions
diff --git a/README.md b/README.md
index e393e6243..ca9347815 100644
--- a/README.md
+++ b/README.md
@@ -3,12 +3,9 @@ Redis based In-Memory Data Grid for Java. Redisson.
[![Maven Central](https://img.shields.io/maven-central/v/org.redisson/redisson.svg?style=flat-square)](https://maven-badges.herokuapp.com/maven-central/org.redisson/redisson/)
-Use familiar Java data structures with power of [Redis](http://redis.io).
-
-Based on high-performance async and lock-free Java Redis client and [Netty 4](http://netty.io) framework.
+Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
Redis 2.8+ and JDK 1.6+ compatible.
-
Please read [documentation](https://github.com/mrniko/redisson/wiki) for more details.
Redisson [releases history](https://github.com/mrniko/redisson/blob/master/CHANGELOG.md).
@@ -35,45 +32,39 @@ Features
3. automatic new slave servers discovery
4. automatic slave servers offline/online discovery
5. automatic sentinel servers discovery
-* Master with Slave servers mode
-* Single server mode
-* Distributed implementation of `java.util.BitSet`
-* Distributed implementation of `java.util.List`
-* Distributed implementation of `java.util.Set` with TTL support for each entry
-* Distributed implementation of `java.util.SortedSet`
-* Distributed implementation of `java.util.Queue`
-* Distributed implementation of `java.util.concurrent.BlockingQueue`
-* Distributed implementation of `java.util.Deque`
-* Distributed implementation of `java.util.concurrent.BlockingDeque`
-* Distributed implementation of `java.util.Map` with TTL support for each entry
-* Distributed implementation of `java.util.concurrent.ConcurrentMap` with TTL support for each entry
-* Distributed implementation of reentrant `java.util.concurrent.locks.Lock` with TTL support
-* Distributed implementation of reentrant `java.util.concurrent.locks.ReadWriteLock` with TTL support
-* Distributed alternative to the `java.util.concurrent.atomic.AtomicLong`
-* Distributed alternative to the `java.util.concurrent.CountDownLatch`
-* Distributed alternative to the `java.util.concurrent.Semaphore`
-* Distributed publish/subscribe messaging via `org.redisson.core.RTopic`
-* Distributed HyperLogLog via `org.redisson.core.RHyperLogLog`
+* Master with Slave servers mode
+* Single server mode
* Asynchronous interface for each object
* Asynchronous connection pool
* Thread-safe implementation
-* All commands executes in an atomic way
* Lua scripting
-* [Spring cache](https://github.com/mrniko/redisson/wiki/10.-additional-features/#104-spring-cache-integration) integration
-* Supports [Reactive Streams](http://www.reactive-streams.org)
-* Supports [Redis pipelining](http://redis.io/topics/pipelining) (command batches)
-* Supports [Remote services](https://github.com/mrniko/redisson/wiki/9.-distributed-services/#91-remote-service)
+* [Distributed objects](https://github.com/mrniko/redisson/wiki/6.-Distributed-objects)
+* [Distributed collections](https://github.com/mrniko/redisson/wiki/7.-Distributed-collections)
+* [Distributed locks and synchronizers](https://github.com/mrniko/redisson/wiki/8.-Distributed-locks-and-synchronizers)
+* [Distributed services](https://github.com/mrniko/redisson/wiki/9.-distributed-services)
+* [Scheduler service](https://github.com/mrniko/redisson/wiki/9.-distributed-services/#94-scheduled-executor-service)
+* [Spring cache](https://github.com/mrniko/redisson/wiki/14.-Integration%20with%20frameworks/#141-spring-cache) integration
+* [Hibernate](https://github.com/mrniko/redisson/wiki/14.-Integration%20with%20frameworks/#142-hibernate) integration
+* [Reactive Streams](https://github.com/mrniko/redisson/wiki/3.-operations-execution#32-reactive-way)
+* [Redis pipelining](https://github.com/mrniko/redisson/wiki/10.-additional-features#102-execution-batches-of-commands) (command batches)
* Supports Android platform
* Supports auto-reconnect
* Supports failed to send command auto-retry
* Supports OSGi
* Supports many popular codecs ([Jackson JSON](https://github.com/FasterXML/jackson), [Avro](http://avro.apache.org/), [Smile](http://wiki.fasterxml.com/SmileFormatSpec), [CBOR](http://cbor.io/), [MsgPack](http://msgpack.org/), [Kryo](https://github.com/EsotericSoftware/kryo), [FST](https://github.com/RuedigerMoeller/fast-serialization), [LZ4](https://github.com/jpountz/lz4-java), [Snappy](https://github.com/xerial/snappy-java) and JDK Serialization)
-* With over 500 unit tests
+* With over 900 unit tests
Projects using Redisson
================================
[Setronica](http://setronica.com/), [Monits](http://monits.com/), [Brookhaven National Laboratory](http://bnl.gov/), [Netflix Dyno client] (https://github.com/Netflix/dyno), [武林Q传](http://www.nbrpg.com/), [Ocous](http://www.ocous.com/), [Invaluable](http://www.invaluable.com/), [Clover](https://www.clover.com/) , [Apache Karaf Decanter](https://karaf.apache.org/projects.html#decanter), [Atmosphere Framework](http://async-io.org/)
+### Articles
+
+[Java data structures powered by Redis. Introduction to Redisson. PDF](http://redisson.org/Redisson.pdf)
+[Introducing Redisson Live Objects (Object Hash Mapping)](https://dzone.com/articles/introducing-redisson-live-object-object-hash-mappi)
+[Java Remote Method Invocation with Redisson](https://dzone.com/articles/java-remote-method-invocation-with-redisson)
+[Java Multimaps With Redis](https://dzone.com/articles/multimaps-with-redis)
+
### Maven
Include the following to your dependency list:
@@ -81,12 +72,17 @@ Include the following to your dependency list:
org.redisson
redisson
- 2.2.24
+ 2.3.0
### Gradle
- compile 'org.redisson:redisson:2.2.24'
+ compile 'org.redisson:redisson:2.3.0'
+
+### Downloads
+
+[Redisson 2.3.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.3.0&e=jar)
+[Redisson node 2.3.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.3.0&e=jar)
### Supported by
diff --git a/pom.xml b/pom.xml
index 379061757..e1f0daf9a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
org.redisson
redisson-parent
- 0.9.1-SNAPSHOT
+ 2.3.1-SNAPSHOT
pom
Redisson
diff --git a/redisson-all/README.md b/redisson-all/README.md
new file mode 100644
index 000000000..3d734b00c
--- /dev/null
+++ b/redisson-all/README.md
@@ -0,0 +1,5 @@
+## Redisson standalone node
+
+Redisson offers ability to run as standalone node and participate in distributed computing. Such standalone nodes could be used to run [ExecutorService](./9.-distributed-services#93-executor-service), [ScheduledExecutorService](https://github.com/mrniko/redisson/wiki/9.-distributed-services#94-scheduled-executor-service) tasks or [RemoteService](./9.-distributed-services#91-remote-service) services. It's just a single jar and could be downloaded from [here](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.3.0&e=jar)
+
+[Documentation](https://github.com/mrniko/redisson/wiki/12.-Standalone-node) about Redisson standalone node.
diff --git a/redisson-all/pom.xml b/redisson-all/pom.xml
index 5abfda8c5..1259ecce2 100644
--- a/redisson-all/pom.xml
+++ b/redisson-all/pom.xml
@@ -4,7 +4,7 @@
org.redisson
redisson-parent
- 0.9.1-SNAPSHOT
+ 2.3.1-SNAPSHOT
../
diff --git a/redisson/pom.xml b/redisson/pom.xml
index 1ae7b84dd..ca9da989f 100644
--- a/redisson/pom.xml
+++ b/redisson/pom.xml
@@ -4,7 +4,7 @@
org.redisson
redisson-parent
- 2.3.0
+ 2.3.1-SNAPSHOT
../
@@ -41,33 +41,33 @@
io.netty
netty-transport-native-epoll
- 4.0.40.Final
+ 4.0.41.Final
provided
io.netty
netty-common
- 4.0.40.Final
+ 4.0.41.Final
io.netty
netty-codec
- 4.0.40.Final
+ 4.0.41.Final
io.netty
netty-buffer
- 4.0.40.Final
+ 4.0.41.Final
io.netty
netty-transport
- 4.0.40.Final
+ 4.0.41.Final
io.netty
netty-handler
- 4.0.40.Final
+ 4.0.41.Final
@@ -106,6 +106,12 @@
1.7.12
test
+
+ org.jmockit
+ jmockit
+ 1.27
+ test
+
net.jpountz.lz4
diff --git a/redisson/src/main/java/org/redisson/BaseRemoteService.java b/redisson/src/main/java/org/redisson/BaseRemoteService.java
index a0fcf2d0a..60eae501d 100644
--- a/redisson/src/main/java/org/redisson/BaseRemoteService.java
+++ b/redisson/src/main/java/org/redisson/BaseRemoteService.java
@@ -33,6 +33,7 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.executor.RemotePromise;
+import org.redisson.misc.RPromise;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceAckTimeoutException;
@@ -47,7 +48,6 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
import io.netty.util.internal.ThreadLocalRandom;
/**
@@ -200,7 +200,7 @@ public abstract class BaseRemoteService {
}
if (optionsCopy.isAckExpected()) {
- Future future = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE,
+ RFuture future = commandExecutor.evalWriteAsync(responseName, LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
@@ -250,7 +250,7 @@ public abstract class BaseRemoteService {
result.setRequestId(requestId);
- Future addFuture = addAsync(requestQueue, request, result);
+ RFuture addFuture = addAsync(requestQueue, request, result);
addFuture.addListener(new FutureListener() {
@Override
@@ -262,7 +262,7 @@ public abstract class BaseRemoteService {
if (optionsCopy.isAckExpected()) {
final RBlockingQueue responseQueue = redisson.getBlockingQueue(responseName, getCodec());
- Future ackFuture = responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
+ RFuture ackFuture = responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
ackFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
@@ -273,7 +273,7 @@ public abstract class BaseRemoteService {
RemoteServiceAck ack = future.getNow();
if (ack == null) {
- Future ackFutureAttempt =
+ RFuture ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, responseQueue, ackName);
ackFutureAttempt.addListener(new FutureListener() {
@@ -318,7 +318,7 @@ public abstract class BaseRemoteService {
private void awaitResultAsync(final RemoteInvocationOptions optionsCopy, final RemotePromise result,
final RemoteServiceRequest request, final String responseName, final String ackName) {
- Future deleteFuture = redisson.getBucket(ackName).deleteAsync();
+ RFuture deleteFuture = redisson.getBucket(ackName).deleteAsync();
deleteFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
@@ -340,7 +340,7 @@ public abstract class BaseRemoteService {
}
RBlockingQueue responseQueue = redisson.getBlockingQueue(responseName, getCodec());
- Future responseFuture = responseQueue
+ RFuture responseFuture = responseQueue
.pollAsync(optionsCopy.getExecutionTimeoutInMillis(), TimeUnit.MILLISECONDS);
responseFuture.addListener(new FutureListener() {
@@ -448,7 +448,7 @@ public abstract class BaseRemoteService {
private RemoteServiceAck tryPollAckAgain(RemoteInvocationOptions optionsCopy,
RBlockingQueue extends RRemoteServiceResponse> responseQueue, String ackName)
throws InterruptedException {
- Future ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
+ RFuture ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;"
@@ -464,11 +464,11 @@ public abstract class BaseRemoteService {
return null;
}
- private Future tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy,
+ private RFuture tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy,
final RBlockingQueue responseQueue, String ackName)
throws InterruptedException {
- final Promise promise = commandExecutor.getConnectionManager().newPromise();
- Future ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
+ final RPromise promise = commandExecutor.getConnectionManager().newPromise();
+ RFuture ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[1]);"
+ "return 0;"
@@ -485,7 +485,7 @@ public abstract class BaseRemoteService {
}
if (future.getNow()) {
- Future pollFuture = responseQueue.pollAsync();
+ RFuture pollFuture = responseQueue.pollAsync();
pollFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
@@ -512,9 +512,9 @@ public abstract class BaseRemoteService {
return ByteBufUtil.hexDump(id);
}
- protected Future addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request,
+ protected RFuture addAsync(RBlockingQueue requestQueue, RemoteServiceRequest request,
RemotePromise result) {
- Future future = requestQueue.addAsync(request);
+ RFuture future = requestQueue.addAsync(request);
result.setAddFuture(future);
return future;
}
diff --git a/redisson/src/main/java/org/redisson/EvictionScheduler.java b/redisson/src/main/java/org/redisson/EvictionScheduler.java
index f72d72cb3..dee26b056 100644
--- a/redisson/src/main/java/org/redisson/EvictionScheduler.java
+++ b/redisson/src/main/java/org/redisson/EvictionScheduler.java
@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import org.redisson.api.RFuture;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
@@ -70,7 +71,7 @@ public class EvictionScheduler {
@Override
public void run() {
- Future future = cleanupExpiredEntires(name, timeoutSetName, maxIdleSetName, keysLimit, multimap);
+ RFuture future = cleanupExpiredEntires(name, timeoutSetName, maxIdleSetName, keysLimit, multimap);
future.addListener(new FutureListener() {
@Override
@@ -169,7 +170,7 @@ public class EvictionScheduler {
return;
}
- Future future = cleanupExpiredEntires(name, timeoutSetName, null, valuesAmountToClean, false);
+ RFuture future = cleanupExpiredEntires(name, timeoutSetName, null, valuesAmountToClean, false);
future.addListener(new FutureListener() {
@Override
@@ -189,7 +190,7 @@ public class EvictionScheduler {
});
}
- private Future cleanupExpiredEntires(String name, String timeoutSetName, String maxIdleSetName, int keysLimit, boolean multimap) {
+ private RFuture cleanupExpiredEntires(String name, String timeoutSetName, String maxIdleSetName, int keysLimit, boolean multimap) {
if (multimap) {
return executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
diff --git a/redisson/src/main/java/org/redisson/PubSubEntry.java b/redisson/src/main/java/org/redisson/PubSubEntry.java
index 5685d2a3d..a11915eb1 100644
--- a/redisson/src/main/java/org/redisson/PubSubEntry.java
+++ b/redisson/src/main/java/org/redisson/PubSubEntry.java
@@ -15,7 +15,7 @@
*/
package org.redisson;
-import io.netty.util.concurrent.Promise;
+import org.redisson.misc.RPromise;
public interface PubSubEntry {
@@ -23,6 +23,6 @@ public interface PubSubEntry {
int release();
- Promise getPromise();
+ RPromise getPromise();
}
diff --git a/redisson/src/main/java/org/redisson/RedisNodes.java b/redisson/src/main/java/org/redisson/RedisNodes.java
index 057790008..c953b66c6 100644
--- a/redisson/src/main/java/org/redisson/RedisNodes.java
+++ b/redisson/src/main/java/org/redisson/RedisNodes.java
@@ -26,15 +26,16 @@ import java.util.concurrent.CountDownLatch;
import org.redisson.api.Node;
import org.redisson.api.NodeType;
import org.redisson.api.NodesGroup;
+import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.RedisClientEntry;
+import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
public class RedisNodes implements NodesGroup {
@@ -65,21 +66,21 @@ public class RedisNodes implements NodesGroup {
@Override
public boolean pingAll() {
List clients = new ArrayList(connectionManager.getClients());
- final Map> result = new ConcurrentHashMap>(clients.size());
+ final Map> result = new ConcurrentHashMap>(clients.size());
final CountDownLatch latch = new CountDownLatch(clients.size());
for (RedisClientEntry entry : clients) {
- Future f = entry.getClient().connectAsync();
+ RFuture f = entry.getClient().connectAsync();
f.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
final RedisConnection c = future.getNow();
- Promise connectionFuture = connectionManager.newPromise();
+ RPromise connectionFuture = connectionManager.newPromise();
connectionManager.getConnectListener().onConnect(connectionFuture, c, null, connectionManager.getConfig());
connectionFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
- Future r = c.async(connectionManager.getConfig().getPingTimeout(), RedisCommands.PING);
+ RFuture r = c.async(connectionManager.getConfig().getPingTimeout(), RedisCommands.PING);
result.put(c, r);
latch.countDown();
}
@@ -99,7 +100,7 @@ public class RedisNodes implements NodesGroup {
}
if (System.currentTimeMillis() - time >= connectionManager.getConfig().getConnectTimeout()) {
- for (Entry> entry : result.entrySet()) {
+ for (Entry> entry : result.entrySet()) {
entry.getKey().closeAsync();
}
return false;
@@ -107,8 +108,8 @@ public class RedisNodes implements NodesGroup {
time = System.currentTimeMillis();
boolean res = true;
- for (Entry> entry : result.entrySet()) {
- Future f = entry.getValue();
+ for (Entry> entry : result.entrySet()) {
+ RFuture f = entry.getValue();
f.awaitUninterruptibly();
if (!"PONG".equals(f.getNow())) {
res = false;
diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java
index 0e3974ac4..957fca848 100755
--- a/redisson/src/main/java/org/redisson/Redisson.java
+++ b/redisson/src/main/java/org/redisson/Redisson.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.api.ClusterNodesGroup;
+import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.Node;
import org.redisson.api.NodesGroup;
import org.redisson.api.RAtomicDouble;
@@ -42,6 +43,7 @@ import org.redisson.api.RList;
import org.redisson.api.RListMultimap;
import org.redisson.api.RListMultimapCache;
import org.redisson.api.RLiveObjectService;
+import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
@@ -222,6 +224,16 @@ public class Redisson implements RedissonClient {
return new RedissonListMultimap(codec, commandExecutor, name);
}
+ @Override
+ public RLocalCachedMap getLocalCachedMap(String name, LocalCachedMapOptions options) {
+ return new RedissonLocalCachedMap(this, commandExecutor, name, options);
+ }
+
+ @Override
+ public RLocalCachedMap getLocalCachedMap(String name, Codec codec, LocalCachedMapOptions options) {
+ return new RedissonLocalCachedMap(this, codec, commandExecutor, name, options);
+ }
+
@Override
public RMap getMap(String name) {
return new RedissonMap(commandExecutor, name);
diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java
index 554e9366b..dbc6698bf 100644
--- a/redisson/src/main/java/org/redisson/RedissonBatch.java
+++ b/redisson/src/main/java/org/redisson/RedissonBatch.java
@@ -45,8 +45,6 @@ import org.redisson.client.codec.Codec;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
-import io.netty.util.concurrent.Future;
-
/**
*
*
diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java
index 997ea8bf4..a11919f58 100644
--- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java
+++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java
@@ -93,7 +93,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock
*/
@Override
public V take() throws InterruptedException {
- Future res = takeAsync();
+ RFuture res = takeAsync();
return res.await().getNow();
}
@@ -108,7 +108,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock
*/
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
- Future res = pollAsync(timeout, unit);
+ RFuture res = pollAsync(timeout, unit);
return res.await().getNow();
}
@@ -118,7 +118,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock
*/
@Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
- Future res = pollFromAnyAsync(timeout, unit, queueNames);
+ RFuture res = pollFromAnyAsync(timeout, unit, queueNames);
return res.await().getNow();
}
@@ -144,7 +144,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock
@Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
- Future res = pollLastAndOfferFirstToAsync(queueName, timeout, unit);
+ RFuture res = pollLastAndOfferFirstToAsync(queueName, timeout, unit);
return res.await().getNow();
}
diff --git a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java
index 430889c7c..7609707e2 100644
--- a/redisson/src/main/java/org/redisson/RedissonBloomFilter.java
+++ b/redisson/src/main/java/org/redisson/RedissonBloomFilter.java
@@ -178,9 +178,9 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF
@Override
public int count() {
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
- Future> configFuture = executorService.readAsync(getConfigName(), StringCodec.INSTANCE,
+ RFuture> configFuture = executorService.readAsync(getConfigName(), StringCodec.INSTANCE,
new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), getConfigName());
- Future cardinalityFuture = executorService.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName());
+ RFuture cardinalityFuture = executorService.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName());
executorService.execute();
readConfig(configFuture.getNow());
@@ -194,7 +194,7 @@ public class RedissonBloomFilter extends RedissonExpirable implements RBloomF
}
private void readConfig() {
- Future> future = commandExecutor.readAsync(getConfigName(), StringCodec.INSTANCE,
+ RFuture> future = commandExecutor.readAsync(getConfigName(), StringCodec.INSTANCE,
new RedisCommand>("HGETALL", new ObjectMapReplayDecoder()), getConfigName());
Map config = commandExecutor.get(future);
diff --git a/redisson/src/main/java/org/redisson/RedissonBuckets.java b/redisson/src/main/java/org/redisson/RedissonBuckets.java
index 492dc0c31..0abe91b5b 100644
--- a/redisson/src/main/java/org/redisson/RedissonBuckets.java
+++ b/redisson/src/main/java/org/redisson/RedissonBuckets.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
+import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DelegateDecoderCodec;
import org.redisson.client.protocol.RedisCommand;
@@ -34,8 +35,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
-import io.netty.util.concurrent.Future;
-
public class RedissonBuckets implements RBuckets {
private final Codec codec;
@@ -73,7 +72,7 @@ public class RedissonBuckets implements RBuckets {
}
RedisCommand> command = new RedisCommand>("MGET", new MapGetAllDecoder(Arrays.asList(keys), 0), ValueType.OBJECTS);
- Future> future = commandExecutor.readAsync(keys[0], new DelegateDecoderCodec(codec), command, keys);
+ RFuture> future = commandExecutor.readAsync(keys[0], new DelegateDecoderCodec(codec), command, keys);
return commandExecutor.get(future);
}
diff --git a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java
index 6eddbf960..b27c2dfb6 100644
--- a/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java
+++ b/redisson/src/main/java/org/redisson/RedissonCountDownLatch.java
@@ -26,8 +26,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.pubsub.CountDownLatchPubSub;
-import io.netty.util.concurrent.Future;
-
/**
* Distributed alternative to the {@link java.util.concurrent.CountDownLatch}
*
@@ -52,7 +50,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
public void await() throws InterruptedException {
- Future promise = subscribe();
+ RFuture promise = subscribe();
try {
get(promise);
@@ -70,7 +68,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
- Future promise = subscribe();
+ RFuture promise = subscribe();
try {
if (!await(promise, time, unit)) {
return false;
@@ -102,11 +100,11 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
return PUBSUB.getEntry(getEntryName());
}
- private Future subscribe() {
+ private RFuture subscribe() {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}
- private void unsubscribe(Future future) {
+ private void unsubscribe(RFuture future) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}
diff --git a/redisson/src/main/java/org/redisson/RedissonCountDownLatchEntry.java b/redisson/src/main/java/org/redisson/RedissonCountDownLatchEntry.java
index 2e16d115f..e0f5be1f8 100644
--- a/redisson/src/main/java/org/redisson/RedissonCountDownLatchEntry.java
+++ b/redisson/src/main/java/org/redisson/RedissonCountDownLatchEntry.java
@@ -15,18 +15,17 @@
*/
package org.redisson;
+import org.redisson.misc.RPromise;
import org.redisson.misc.ReclosableLatch;
-import io.netty.util.concurrent.Promise;
-
public class RedissonCountDownLatchEntry implements PubSubEntry {
private int counter;
private final ReclosableLatch latch;
- private final Promise promise;
+ private final RPromise promise;
- public RedissonCountDownLatchEntry(Promise promise) {
+ public RedissonCountDownLatchEntry(RPromise promise) {
super();
this.latch = new ReclosableLatch();
this.promise = promise;
@@ -40,7 +39,7 @@ public class RedissonCountDownLatchEntry implements PubSubEntry getPromise() {
+ public RPromise getPromise() {
return promise;
}
diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java
index c978c5695..8447e51fd 100644
--- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java
+++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java
@@ -19,6 +19,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -67,7 +68,6 @@ import org.slf4j.LoggerFactory;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
@@ -439,10 +439,14 @@ public class RedissonExecutorService implements RScheduledExecutorService {
if (task.getClass().isAnonymousClass()) {
throw new IllegalArgumentException("Task can't be created using anonymous class");
}
+ if (task.getClass().isMemberClass()
+ && !Modifier.isStatic(task.getClass().getModifiers())) {
+ throw new IllegalArgumentException("Task class is an inner class and it should be static");
+ }
}
private void execute(RemotePromise promise) {
- io.netty.util.concurrent.Future addFuture = promise.getAddFuture();
+ RFuture addFuture = promise.getAddFuture();
addFuture.syncUninterruptibly();
Boolean res = addFuture.getNow();
if (!res) {
@@ -451,9 +455,9 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
@Override
- public Future submit(Runnable task, final T result) {
- final Promise resultFuture = connectionManager.newPromise();
- io.netty.util.concurrent.Future future = (io.netty.util.concurrent.Future) submit(task);
+ public RFuture submit(Runnable task, final T result) {
+ final RPromise resultFuture = connectionManager.newPromise();
+ RFuture future = (RFuture) submit(task);
future.addListener(new FutureListener() {
@Override
public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
@@ -487,7 +491,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public ScheduledFuture> schedule(Runnable task, long delay, TimeUnit unit) {
RedissonScheduledFuture> future = (RedissonScheduledFuture>) scheduleAsync(task, delay, unit);
- execute((RemotePromise>)future.getInnerFuture());
+ execute((RemotePromise>)future.getInnerPromise());
return future;
}
@@ -505,7 +509,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public ScheduledFuture schedule(Callable task, long delay, TimeUnit unit) {
RedissonScheduledFuture future = (RedissonScheduledFuture) scheduleAsync(task, delay, unit);
- execute((RemotePromise)future.getInnerFuture());
+ execute((RemotePromise)future.getInnerPromise());
return future;
}
@@ -523,7 +527,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public ScheduledFuture> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
RedissonScheduledFuture> future = (RedissonScheduledFuture>) scheduleAtFixedRateAsync(task, initialDelay, period, unit);
- execute((RemotePromise>)future.getInnerFuture());
+ execute((RemotePromise>)future.getInnerPromise());
return future;
}
@@ -541,7 +545,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RScheduledFuture> schedule(Runnable task, CronSchedule cronSchedule) {
RedissonScheduledFuture> future = (RedissonScheduledFuture>) scheduleAsync(task, cronSchedule);
- execute((RemotePromise>)future.getInnerFuture());
+ execute((RemotePromise>)future.getInnerPromise());
return future;
}
@@ -564,7 +568,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public ScheduledFuture> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
RedissonScheduledFuture> future = (RedissonScheduledFuture>) scheduleWithFixedDelayAsync(task, initialDelay, delay, unit);
- execute((RemotePromise>)future.getInnerFuture());
+ execute((RemotePromise>)future.getInnerPromise());
return future;
}
@@ -661,7 +665,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
};
for (Future future : futures) {
- io.netty.util.concurrent.Future f = (io.netty.util.concurrent.Future) future;
+ RFuture f = (RFuture) future;
f.addListener(listener);
}
@@ -672,7 +676,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
for (Future future : futures) {
- io.netty.util.concurrent.Future f = (io.netty.util.concurrent.Future) future;
+ RFuture f = (RFuture) future;
f.removeListener(listener);
}
@@ -760,7 +764,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
if (millis <= 0) {
int remainFutures = tasks.size() - futures.size();
for (int i = 0; i < remainFutures; i++) {
- Promise cancelledFuture = connectionManager.newPromise();
+ RPromise cancelledFuture = connectionManager.newPromise();
cancelledFuture.cancel(true);
futures.add(cancelledFuture);
diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java
index 824deb5a7..14e23e2b7 100644
--- a/redisson/src/main/java/org/redisson/RedissonFairLock.java
+++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java
@@ -28,8 +28,6 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor;
import org.redisson.pubsub.LockPubSub;
-import io.netty.util.concurrent.Future;
-
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.
@@ -63,13 +61,13 @@ public class RedissonFairLock extends RedissonLock implements RLock {
}
@Override
- protected Future subscribe(long threadId) {
+ protected RFuture subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
}
@Override
- protected void unsubscribe(Future future, long threadId) {
+ protected void unsubscribe(RFuture future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
}
diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java
index e726deffc..f19108910 100644
--- a/redisson/src/main/java/org/redisson/RedissonKeys.java
+++ b/redisson/src/main/java/org/redisson/RedissonKeys.java
@@ -43,7 +43,6 @@ import org.redisson.misc.RPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.Promise;
public class RedissonKeys implements RKeys {
@@ -255,7 +254,7 @@ public class RedissonKeys implements RKeys {
executorService.writeAsync(entry.getKey(), null, RedisCommands.DEL, key);
}
- Future> future = executorService.executeAsync();
+ RFuture> future = executorService.executeAsync();
future.addListener(listener);
}
@@ -303,7 +302,7 @@ public class RedissonKeys implements RKeys {
return commandExecutor.writeAllAsync(RedisCommands.FLUSHALL);
}
- private void checkExecution(final Promise result, final AtomicReference failed,
+ private void checkExecution(final RPromise result, final AtomicReference failed,
final AtomicLong count, final AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
diff --git a/redisson/src/main/java/org/redisson/RedissonList.java b/redisson/src/main/java/org/redisson/RedissonList.java
index af1f3a911..6020d77bf 100644
--- a/redisson/src/main/java/org/redisson/RedissonList.java
+++ b/redisson/src/main/java/org/redisson/RedissonList.java
@@ -339,19 +339,26 @@ public class RedissonList extends RedissonExpirable implements RList {
@Override
public V remove(int index) {
+ return remove((long) index);
+ }
+
+ public V remove(long index) {
+ return get(removeAsync(index));
+ }
+
+ public RFuture removeAsync(long index) {
if (index == 0) {
- RFuture f = commandExecutor.writeAsync(getName(), codec, LPOP, getName());
- return get(f);
+ return commandExecutor.writeAsync(getName(), codec, LPOP, getName());
}
- RFuture f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT,
+ return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" +
"return v",
Collections.singletonList(getName()), index);
- return get(f);
}
+
@Override
public void fastRemove(int index) {
@@ -359,7 +366,7 @@ public class RedissonList extends RedissonExpirable implements RList {
}
@Override
- public RFuture fastRemoveAsync(int index) {
+ public RFuture fastRemoveAsync(long index) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');",
@@ -376,7 +383,7 @@ public class RedissonList extends RedissonExpirable implements RList {
return indexOfAsync(o, new BooleanNumberReplayConvertor(-1L));
}
- private RFuture indexOfAsync(Object o, Convertor convertor) {
+ public RFuture indexOfAsync(Object o, Convertor convertor) {
return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand("EVAL", convertor, 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
@@ -414,6 +421,20 @@ public class RedissonList extends RedissonExpirable implements RList {
"return -1",
Collections.singletonList(getName()), o);
}
+
+ public RFuture lastIndexOfAsync(Object o, Convertor convertor) {
+ return commandExecutor.evalReadAsync(getName(), codec, new RedisCommand("EVAL", convertor, 4),
+ "local key = KEYS[1] " +
+ "local obj = ARGV[1] " +
+ "local items = redis.call('lrange', key, 0, -1) " +
+ "for i = #items, 1, -1 do " +
+ "if items[i] == obj then " +
+ "return i - 1 " +
+ "end " +
+ "end " +
+ "return -1",
+ Collections.singletonList(getName()), o);
+ }
@Override
public void trim(int fromIndex, int toIndex) {
@@ -421,7 +442,7 @@ public class RedissonList extends RedissonExpirable implements RList {
}
@Override
- public RFuture trimAsync(int fromIndex, int toIndex) {
+ public RFuture trimAsync(long fromIndex, long toIndex) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.LTRIM, getName(), fromIndex, toIndex);
}
diff --git a/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java
index a84e1a197..1a43bba4d 100644
--- a/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java
+++ b/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java
@@ -432,27 +432,30 @@ public class RedissonListMultimapValues extends RedissonExpirable implements
@Override
public V remove(int index) {
+ return get(removeAsync(index));
+ }
+
+ @Override
+ public RFuture removeAsync(long index) {
if (index == 0) {
- RFuture f = commandExecutor.writeAsync(getName(), codec, LPOP, getName());
- return get(f);
+ return commandExecutor.writeAsync(getName(), codec, LPOP, getName());
}
- RFuture f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT,
+ return commandExecutor.evalWriteAsync(getName(), codec, EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');" +
"return v",
Collections.singletonList(getName()), index);
- return get(f);
}
@Override
public void fastRemove(int index) {
- get(fastRemoveAsync(index));
+ get(fastRemoveAsync((long)index));
}
@Override
- public RFuture fastRemoveAsync(int index) {
+ public RFuture fastRemoveAsync(long index) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');",
@@ -528,7 +531,7 @@ public class RedissonListMultimapValues extends RedissonExpirable implements
}
@Override
- public RFuture trimAsync(int fromIndex, int toIndex) {
+ public RFuture trimAsync(long fromIndex, long toIndex) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.LTRIM, getName(), fromIndex, toIndex);
}
diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java
new file mode 100644
index 000000000..503085e21
--- /dev/null
+++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java
@@ -0,0 +1,690 @@
+/**
+ * Copyright 2016 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.redisson;
+
+import java.util.AbstractCollection;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.redisson.api.LocalCachedMapOptions;
+import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
+import org.redisson.api.RFuture;
+import org.redisson.api.RLocalCachedMap;
+import org.redisson.api.RMap;
+import org.redisson.api.RTopic;
+import org.redisson.api.RedissonClient;
+import org.redisson.api.listener.MessageListener;
+import org.redisson.client.codec.Codec;
+import org.redisson.client.codec.LongCodec;
+import org.redisson.client.protocol.RedisCommand;
+import org.redisson.client.protocol.RedisCommand.ValueType;
+import org.redisson.client.protocol.RedisCommands;
+import org.redisson.command.CommandAsyncExecutor;
+import org.redisson.misc.Cache;
+import org.redisson.misc.Hash;
+import org.redisson.misc.LFUCacheMap;
+import org.redisson.misc.LRUCacheMap;
+import org.redisson.misc.NoneCacheMap;
+
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
+/**
+ *
+ * @author Nikita Koksharov
+ *
+ */
+public class RedissonLocalCachedMap extends RedissonExpirable implements RLocalCachedMap {
+
+ public static class LocalCachedMapClear {
+
+ }
+
+ public static class LocalCachedMapInvalidate {
+
+ private byte[] keyHash;
+
+ public LocalCachedMapInvalidate() {
+ }
+
+ public LocalCachedMapInvalidate(byte[] keyHash) {
+ super();
+ this.keyHash = keyHash;
+ }
+
+ public byte[] getKeyHash() {
+ return keyHash;
+ }
+
+ }
+
+ public static class CacheKey {
+
+ private final byte[] keyHash;
+
+ public CacheKey(byte[] keyHash) {
+ super();
+ this.keyHash = keyHash;
+ }
+
+ public byte[] getKeyHash() {
+ return keyHash;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(keyHash);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CacheKey other = (CacheKey) obj;
+ if (!Arrays.equals(keyHash, other.keyHash))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "CacheKey [keyHash=" + Arrays.toString(keyHash) + "]";
+ }
+
+ }
+
+ public static class CacheValue {
+
+ private final Object key;
+ private final Object value;
+
+ public CacheValue(Object key, Object value) {
+ super();
+ this.key = key;
+ this.value = value;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CacheValue other = (CacheValue) obj;
+ if (value == null) {
+ if (other.value != null)
+ return false;
+ } else if (!value.equals(other.value))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "CacheValue [key=" + key + ", value=" + value + "]";
+ }
+
+ }
+
+ private static final RedisCommand EVAL_PUT = new RedisCommand("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
+ private static final RedisCommand EVAL_REMOVE = new RedisCommand("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
+
+ private RTopic invalidationTopic;
+ private RMap map;
+ private Cache cache;
+ private int invalidateEntryOnChange;
+
+ protected RedissonLocalCachedMap(RedissonClient redisson, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options) {
+ super(commandExecutor, name);
+ init(redisson, name, options);
+ }
+
+ protected RedissonLocalCachedMap(RedissonClient redisson, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options) {
+ super(codec, connectionManager, name);
+ init(redisson, name, options);
+ }
+
+ private void init(RedissonClient redisson, String name, LocalCachedMapOptions options) {
+ map = redisson.getMap(name);
+
+ if (options.isInvalidateEntryOnChange()) {
+ invalidateEntryOnChange = 1;
+ }
+ if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
+ cache = new NoneCacheMap(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
+ }
+ if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
+ cache = new LRUCacheMap(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
+ }
+ if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
+ cache = new LFUCacheMap(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
+ }
+
+ invalidationTopic = redisson.getTopic(name + ":topic");
+ invalidationTopic.addListener(new MessageListener() {
+ @Override
+ public void onMessage(String channel, Object msg) {
+ if (msg instanceof LocalCachedMapClear) {
+ cache.clear();
+ }
+ if (msg instanceof LocalCachedMapInvalidate) {
+ CacheKey key = new CacheKey(((LocalCachedMapInvalidate)msg).getKeyHash());
+ cache.remove(key);
+ }
+ }
+ });
+ }
+
+ @Override
+ public int size() {
+ return get(sizeAsync());
+ }
+
+ @Override
+ public RFuture sizeAsync() {
+ return map.sizeAsync();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return get(containsKeyAsync(key));
+ }
+
+ private CacheKey toCacheKey(Object key) {
+ byte[] encoded = encodeMapKey(key);
+ return toCacheKey(encoded);
+ }
+
+ private CacheKey toCacheKey(byte[] encodedKey) {
+ return new CacheKey(Hash.hash(encodedKey));
+ }
+
+ @Override
+ public RFuture containsKeyAsync(Object key) {
+ CacheKey cacheKey = toCacheKey(key);
+ if (!cache.containsKey(cacheKey)) {
+ return map.containsKeyAsync(key);
+ }
+ return newSucceededFuture(true);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return get(containsValueAsync(value));
+ }
+
+ @Override
+ public RFuture containsValueAsync(Object value) {
+ CacheValue cacheValue = new CacheValue(null, value);
+ if (!cache.containsValue(cacheValue)) {
+ return map.containsValueAsync(value);
+ }
+ return newSucceededFuture(true);
+ }
+
+ @Override
+ public V get(Object key) {
+ return get(getAsync(key));
+ }
+
+ @Override
+ public RFuture getAsync(final Object key) {
+ if (key == null) {
+ throw new NullPointerException();
+ }
+
+ final CacheKey cacheKey = toCacheKey(key);
+ CacheValue cacheValue = cache.get(cacheKey);
+ if (cacheValue != null && cacheValue.getValue() != null) {
+ return newSucceededFuture((V)cacheValue.getValue());
+ }
+
+ RFuture future = map.getAsync((K)key);
+ future.addListener(new FutureListener() {
+ @Override
+ public void operationComplete(Future future) throws Exception {
+ if (!future.isSuccess()) {
+ return;
+ }
+
+ V value = future.getNow();
+ if (value != null) {
+ cache.put(cacheKey, new CacheValue(key, value));
+ }
+ }
+ });
+ return future;
+ }
+
+
+ @Override
+ public V put(K key, V value) {
+ return get(putAsync(key, value));
+ }
+
+ @Override
+ public RFuture putAsync(K key, V value) {
+ if (key == null) {
+ throw new NullPointerException();
+ }
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ byte[] mapKey = encodeMapKey(key);
+ CacheKey cacheKey = toCacheKey(mapKey);
+ byte[] msg = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
+ CacheValue cacheValue = new CacheValue(key, value);
+ cache.put(cacheKey, cacheValue);
+ return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ + "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 and ARGV[4] == '1' then "
+ + "redis.call('publish', KEYS[2], ARGV[3]); "
+ + "end; "
+ + "return v; ",
+ Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)),
+ mapKey, encodeMapValue(value), msg, invalidateEntryOnChange);
+ }
+
+ @Override
+ public boolean fastPut(K key, V value) {
+ return get(fastPutAsync(key, value));
+ }
+
+ @Override
+ public RFuture fastPutAsync(K key, V value) {
+ if (key == null) {
+ throw new NullPointerException();
+ }
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ byte[] encodedKey = encodeMapKey(key);
+ byte[] encodedValue = encodeMapKey(value);
+ CacheKey cacheKey = toCacheKey(encodedKey);
+ byte[] msg = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
+ CacheValue cacheValue = new CacheValue(key, value);
+ cache.put(cacheKey, cacheValue);
+ return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ + "if ARGV[4] == '1' then "
+ + "redis.call('publish', KEYS[2], ARGV[3]); "
+ + "end;"
+ + "return 0; "
+ + "end; "
+ + "return 1; ",
+ Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)),
+ encodedKey, encodedValue, msg, invalidateEntryOnChange);
+ }
+
+ @Override
+ public V remove(Object key) {
+ return get(removeAsync((K)key));
+ }
+
+ @Override
+ public RFuture removeAsync(K key) {
+ if (key == null) {
+ throw new NullPointerException();
+ }
+
+ byte[] keyEncoded = encodeMapKey(key);
+ CacheKey cacheKey = toCacheKey(keyEncoded);
+ byte[] msgEncoded = encode(new LocalCachedMapInvalidate(cacheKey.getKeyHash()));
+ cache.remove(cacheKey);
+ return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ + "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 and ARGV[3] == '1' then "
+ + "redis.call('publish', KEYS[2], ARGV[2]); "
+ + "end; "
+ + "return v",
+ Arrays.asList(getName(), invalidationTopic.getChannelNames().get(0)),
+ keyEncoded, msgEncoded, invalidateEntryOnChange);
+ }
+
+ @Override
+ public boolean fastRemove(Object key) {
+ return get(fastRemoveAsync((K)key));
+ }
+
+ @Override
+ public RFuture