Merge branch 'master' into 3.0.0

# Conflicts:
#	pom.xml
#	redisson-all/pom.xml
#	redisson-tomcat/pom.xml
#	redisson-tomcat/redisson-tomcat-6/pom.xml
#	redisson-tomcat/redisson-tomcat-7/pom.xml
#	redisson-tomcat/redisson-tomcat-8/pom.xml
#	redisson/pom.xml
pull/802/merge
Nikita 8 years ago
commit efbdcbc4d4

@ -2,14 +2,26 @@ Redisson Releases History
================================
####Please Note: trunk is current development branch.
####07-Nov-2016 - version 2.6.0 and 3.1.0 released
####26-Nov-2016 - versions 2.7.0 and 3.2.0 released
Feature - __Spring Session implementation__. More details [here](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#145-spring-session)
Feature - __Tomcat Session Manager implementation__. More details [here](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#144-tomcat-redis-session-manager)
Feature - __RDelayedQueue object added__. More details [here](https://github.com/redisson/redisson/wiki/7.-distributed-collections/#714-delayed-queue)
Feature - __RBlockingFairQueue object added__. More details [here](https://github.com/redisson/redisson/wiki/7.-distributed-collections/#713-blocking-fair-queue)
Feature - `RSortedSet.readAll` and `RQueue.readAll` methods added
Fixed - `RMap.getAll` doesn't not preserve the order of elements
Fixed - Wrong nodes parsing in result of cluster info command
Fixed - NullPointerException in CommandDecoder.handleResult
Fixed - Redisson shutdown status should be checked during async command invocation
####07-Nov-2016 - versions 2.6.0 and 3.1.0 released
Feature - __JCache API (JSR-107) implementation__ . More info about it [here](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#143-jcache-api-jsr-107-implementation)
Feature - __new object added__ `RBinaryStream`. More info about it [here](https://github.com/redisson/redisson/wiki/6.-distributed-objects/#62-binary-stream-holder)
Improvement - limit Payload String on RedisTimeoutException
Improvement - Elasticache master node change detection process optimization
####27-Oct-2016 - version 2.5.1 and 3.0.1 released
####27-Oct-2016 - versions 2.5.1 and 3.0.1 released
Include all code changes from __2.2.27__ version

@ -6,8 +6,10 @@ Redis 2.8+ compatible.
| Stable Release Version | JDK Version compatibility | Release Date |
| ------------- | ------------- | ------------|
| 3.1.0 | 1.8+ | 07.11.2016 |
| 2.6.0 | 1.6+ | 07.11.2016 |
| 3.2.0 | 1.8+ | 26.11.2016 |
| 2.7.0 | 1.6+ | 26.11.2016 |
__NOTE__: Both version lines have same features except `CompletionStage` interface support added in 3.x.x
Please read [documentation](https://github.com/redisson/redisson/wiki) for more details.
Redisson [releases history](https://github.com/redisson/redisson/blob/master/CHANGELOG.md)
@ -48,14 +50,19 @@ Features
* Asynchronous connection pool
* Thread-safe implementation
* Lua scripting
* [Distributed objects](https://github.com/redisson/redisson/wiki/6.-Distributed-objects)
* [Distributed collections](https://github.com/redisson/redisson/wiki/7.-Distributed-collections)
* [Distributed locks and synchronizers](https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers)
* [Distributed services](https://github.com/redisson/redisson/wiki/9.-distributed-services)
* [Spring cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#141-spring-cache) integration
* [Hibernate](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#142-hibernate) integration
* [Tomcat Session Manager](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks#144-tomcat-redis-session-manager)
* [JCache API (JSR-107)](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#143-jcache-api-jsr-107-implementation) implementation
* [Distributed objects](https://github.com/redisson/redisson/wiki/6.-Distributed-objects)
Object holder, Binary stream holder, Geospatial holder, BitSet, AtomicLong, AtomicDouble, PublishSubscribe,
Bloom filter, HyperLogLog
* [Distributed collections](https://github.com/redisson/redisson/wiki/7.-Distributed-collections)
Map, Multimap, Set, List, SortedSet, ScoredSortedSet, LexSortedSet, Queue, Deque, Blocking Queue, Bounded Blocking Queue, Blocking Deque, Delayed Queue
* [Distributed locks and synchronizers](https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers)
Lock, FairLock, MultiLock, RedLock, ReadWriteLock, Semaphore, PermitExpirableSemaphore, CountDownLatch
* [Distributed services](https://github.com/redisson/redisson/wiki/9.-distributed-services)
Remote service, Live Object service, Executor service, Scheduler service
* [Spring Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#141-spring-cache) implementation
* [Hibernate Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#142-hibernate-cache) implementation
* [Tomcat Session Manager](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks#144-tomcat-redis-session-manager) implementation
* [Spring Session](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#145-spring-session) implementation
* [Reactive Streams](https://github.com/redisson/redisson/wiki/3.-operations-execution#32-reactive-way)
* [Redis pipelining](https://github.com/redisson/redisson/wiki/10.-additional-features#102-execution-batches-of-commands) (command batches)
* Supports Android platform
@ -65,9 +72,9 @@ Features
* Supports many popular codecs ([Jackson JSON](https://github.com/FasterXML/jackson), [Avro](http://avro.apache.org/), [Smile](http://wiki.fasterxml.com/SmileFormatSpec), [CBOR](http://cbor.io/), [MsgPack](http://msgpack.org/), [Kryo](https://github.com/EsotericSoftware/kryo), [FST](https://github.com/RuedigerMoeller/fast-serialization), [LZ4](https://github.com/jpountz/lz4-java), [Snappy](https://github.com/xerial/snappy-java) and JDK Serialization)
* With over 1000 unit tests
Projects using Redisson
Who uses Redisson
================================
[Infor](http://www.infor.com/), [New Relic Synthetics](https://newrelic.com/synthetics), [Singtel](http://singtel.com), [Setronica](http://setronica.com/), [Monits](http://monits.com/), [Brookhaven National Laboratory](http://bnl.gov/), [Netflix Dyno client] (https://github.com/Netflix/dyno), [武林Q传](http://www.nbrpg.com/), [Ocous](http://www.ocous.com/), [Invaluable](http://www.invaluable.com/), [Clover](https://www.clover.com/) , [Apache Karaf Decanter](https://karaf.apache.org/projects.html#decanter), [Atmosphere Framework](http://async-io.org/), [BrandsEye](http://brandseye.com), [Datorama](http://datorama.com/), [BrightCloud](http://brightcloud.com/)
[Infor](http://www.infor.com/), [New Relic Synthetics](https://newrelic.com/synthetics), [Singtel](http://singtel.com), [Setronica](http://setronica.com/), [Monits](http://monits.com/), [Brookhaven National Laboratory](http://bnl.gov/), [Netflix Dyno client] (https://github.com/Netflix/dyno), [武林Q传](http://www.nbrpg.com/), [Ocous](http://www.ocous.com/), [Invaluable](http://www.invaluable.com/), [Clover](https://www.clover.com/) , [Apache Karaf Decanter](https://karaf.apache.org/projects.html#decanter), [Atmosphere Framework](http://async-io.org/), [BrandsEye](http://brandseye.com), [Datorama](http://datorama.com/), [BrightCloud](http://brightcloud.com/), [Azar](http://azarlive.com/)
Articles
================================
@ -88,23 +95,23 @@ Quick start
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.1.0</version>
<version>3.2.0</version>
</dependency>
<!-- JDK 1.6+ compatible -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.6.0</version>
<version>2.7.0</version>
</dependency>
#### Gradle
// JDK 1.8+ compatible
compile 'org.redisson:redisson:3.1.0'
compile 'org.redisson:redisson:3.2.0'
// JDK 1.6+ compatible
compile 'org.redisson:redisson:2.6.0'
compile 'org.redisson:redisson:2.7.0'
#### Java
@ -129,11 +136,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService");
Downloads
===============================
[Redisson 3.1.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.1.0&e=jar),
[Redisson node 3.1.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.1.0&e=jar)
[Redisson 3.2.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.2.0&e=jar),
[Redisson node 3.2.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.2.0&e=jar)
[Redisson 2.6.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.6.0&e=jar),
[Redisson node 2.6.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.6.0&e=jar)
[Redisson 2.7.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.7.0&e=jar),
[Redisson node 2.7.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.7.0&e=jar)
### Supported by

@ -3,7 +3,6 @@
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.2.1-SNAPSHOT</version>
<packaging>pom</packaging>

@ -4,7 +4,6 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
@ -88,7 +87,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<version>4.0.41.Final</version>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>

@ -31,42 +31,36 @@
</profiles>
<dependencies>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.0.42.Final</version>
<version>4.1.6.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.0.42.Final</version>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>4.0.42.Final</version>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.0.42.Final</version>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.0.42.Final</version>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.0.42.Final</version>
<version>4.1.6.Final</version>
</dependency>
<dependency>
@ -400,7 +394,7 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.0.1</version>
<version>3.2.0</version>
<extensions>true</extensions>
<configuration>
<instructions>

@ -231,7 +231,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
for (Object name : queueNames) {
params.add(name);
}
params.add(unit.toSeconds(timeout));
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOP_VALUE, params.toArray());
}
@ -243,7 +243,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public RFuture<V> pollLastAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOP_VALUE, getName(), unit.toSeconds(timeout));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOP_VALUE, getName(), toSeconds(timeout, unit));
}
@Override

@ -84,7 +84,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override
public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), toSeconds(timeout, unit));
}
/*
@ -118,13 +118,13 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
for (Object name : queueNames) {
params.add(name);
}
params.add(unit.toSeconds(timeout));
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray());
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, toSeconds(timeout, unit));
}
@Override

@ -50,9 +50,9 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
public void await() throws InterruptedException {
RFuture<RedissonCountDownLatchEntry> promise = subscribe();
RFuture<RedissonCountDownLatchEntry> future = subscribe();
try {
get(promise);
commandExecutor.syncSubscription(future);
while (getCount() > 0) {
// waiting for open state
@ -62,7 +62,7 @@ public class RedissonCountDownLatch extends RedissonObject implements RCountDown
}
}
} finally {
unsubscribe(promise);
unsubscribe(future);
}
}

@ -118,7 +118,7 @@ public class RedissonLock extends RedissonExpirable implements RLock {
long threadId = Thread.currentThread().getId();
RFuture<RedissonLockEntry> future = subscribe(threadId);
get(future);
commandExecutor.syncSubscription(future);
try {
while (true) {

@ -64,7 +64,7 @@ public class RedissonPatternTopic<M> implements RPatternTopic<M> {
private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().psubscribe(name, codec, pubSubListener);
future.syncUninterruptibly();
commandExecutor.syncSubscription(future);
return System.identityHashCode(pubSubListener);
}

@ -91,7 +91,7 @@ public class RedissonPermitExpirableSemaphore extends RedissonExpirable implemen
}
RFuture<RedissonLockEntry> future = subscribe();
get(future);
commandExecutor.syncSubscription(future);
try {
while (true) {
final Long nearestTimeout;

@ -16,6 +16,7 @@
package org.redisson;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RQueue;
@ -65,7 +66,15 @@ public class RedissonQueue<V> extends RedissonList<V> implements RQueue<V> {
}
return value;
}
protected long toSeconds(long timeout, TimeUnit unit) {
long seconds = unit.toSeconds(timeout);
if (timeout != 0 && seconds == 0) {
seconds = 1;
}
return seconds;
}
@Override
public V remove() {
return removeFirst();

@ -79,7 +79,7 @@ public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
}
RFuture<RedissonLockEntry> future = subscribe();
get(future);
commandExecutor.syncSubscription(future);
try {
while (true) {
if (tryAcquire(permits)) {

@ -143,6 +143,16 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP_SINGLE, getName());
}
@Override
public Set<V> removeRandom(int amount) {
return get(removeRandomAsync(amount));
}
@Override
public RFuture<Set<V>> removeRandomAsync(int amount) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP, getName(), amount);
}
@Override
public V random() {
return get(randomAsync());

@ -243,6 +243,16 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP_SINGLE, getName());
}
@Override
public Set<V> removeRandom(int amount) {
return get(removeRandomAsync(amount));
}
@Override
public RFuture<Set<V>> removeRandomAsync(int amount) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SPOP, getName(), amount);
}
@Override
public V random() {
return get(randomAsync());

@ -79,7 +79,7 @@ public class RedissonTopic<M> implements RTopic<M> {
private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = commandExecutor.getConnectionManager().subscribe(codec, name, pubSubListener);
future.syncUninterruptibly();
commandExecutor.syncSubscription(future);
return System.identityHashCode(pubSubListener);
}

@ -26,6 +26,14 @@ import java.util.Set;
*/
public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V> {
/**
* Removes and returns random elements from set
*
* @param amount of random values
* @return random values
*/
Set<V> removeRandom(int amount);
/**
* Removes and returns random element from set
*

@ -26,6 +26,15 @@ import java.util.Set;
*/
public interface RSetAsync<V> extends RCollectionAsync<V> {
/**
* Removes and returns random elements from set
* in async mode
*
* @param amount of random values
* @return random values
*/
RFuture<Set<V>> removeRandomAsync(int amount);
/**
* Removes and returns random element from set
* in async mode

@ -507,6 +507,7 @@ public interface RedissonClient {
/**
* Returns unbounded fair queue instance by name.
*
* @param <V> type of value
* @param name of queue
* @return queue
*/
@ -529,6 +530,7 @@ public interface RedissonClient {
* Could be attached to destination queue only.
* All elements are inserted with transfer delay to destination queue.
*
* @param <V> type of value
* @param destinationQueue - destination queue
* @return delayed queue
*/

@ -16,7 +16,7 @@
package org.redisson.client;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -31,7 +31,7 @@ import org.redisson.client.handler.ConnectionWatchdog;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -70,15 +70,15 @@ public class RedisClient {
private boolean hasOwnGroup;
public RedisClient(String address) {
this(URIBuilder.create(address));
this(URLBuilder.create(address));
}
public RedisClient(URI address) {
public RedisClient(URL address) {
this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address);
hasOwnGroup = true;
}
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) {
public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URL address) {
this(timer, executor, group, address.getHost(), address.getPort());
}

@ -83,17 +83,17 @@ public class RedisPubSubConnection extends RedisConnection {
}
public void subscribe(Codec codec, String ... channel) {
async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
for (String ch : channel) {
channels.put(ch, codec);
}
async(new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channel);
}
public void psubscribe(Codec codec, String ... channel) {
async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
for (String ch : channel) {
patternChannels.put(ch, codec);
}
async(new PubSubPatternMessageDecoder(codec.getValueDecoder()), RedisCommands.PSUBSCRIBE, channel);
}
public void unsubscribe(final String ... channels) {

@ -389,6 +389,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
private MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts, Channel channel) {
if (data == null) {
if (parts.isEmpty()) {
return null;
}
String command = parts.get(0).toString();
if (Arrays.asList("subscribe", "psubscribe", "punsubscribe", "unsubscribe").contains(command)) {
String channelName = parts.get(1).toString();

@ -136,6 +136,7 @@ public interface RedisCommands {
RedisCommand<Boolean> SADD_BOOL = new RedisCommand<Boolean>("SADD", new BooleanAmountReplayConvertor(), 2, ValueType.OBJECTS);
RedisStrictCommand<Long> SADD = new RedisStrictCommand<Long>("SADD", 2, ValueType.OBJECTS);
RedisCommand<Set<Object>> SPOP = new RedisCommand<Set<Object>>("SPOP", new ObjectSetReplayDecoder<Object>());
RedisCommand<Object> SPOP_SINGLE = new RedisCommand<Object>("SPOP");
RedisCommand<Boolean> SADD_SINGLE = new RedisCommand<Boolean>("SADD", new BooleanReplayConvertor(), 2);
RedisCommand<Boolean> SREM_SINGLE = new RedisCommand<Boolean>("SREM", new BooleanAmountReplayConvertor(), 2, ValueType.OBJECTS);

@ -15,7 +15,7 @@
*/
package org.redisson.cluster;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -66,13 +66,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<URI, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
private final Map<URL, RedisConnection> nodeConnections = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = PlatformDependent.newConcurrentHashMap();
private ScheduledFuture<?> monitorFuture;
private volatile URI lastClusterNode;
private volatile URL lastClusterNode;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config);
@ -84,7 +84,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (URI addr : cfg.getNodeAddresses()) {
for (URL addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
@ -158,7 +158,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
private RFuture<RedisConnection> connect(ClusterServersConfig cfg, final URL addr) {
RedisConnection connection = nodeConnections.get(addr);
if (connection != null) {
return newSucceededFuture(connection);
@ -308,22 +308,22 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private void scheduleClusterChangeCheck(final ClusterServersConfig cfg, final Iterator<URI> iterator) {
private void scheduleClusterChangeCheck(final ClusterServersConfig cfg, final Iterator<URL> iterator) {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<URI> nodesIterator = iterator;
Iterator<URL> nodesIterator = iterator;
if (nodesIterator == null) {
List<URI> nodes = new ArrayList<URI>();
List<URI> slaves = new ArrayList<URI>();
List<URL> nodes = new ArrayList<URL>();
List<URL> slaves = new ArrayList<URL>();
for (ClusterPartition partition : getLastPartitions()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
}
Set<URI> partitionSlaves = new HashSet<URI>(partition.getSlaveAddresses());
Set<URL> partitionSlaves = new HashSet<URL>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
}
@ -339,19 +339,23 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URI> iterator, final AtomicReference<Throwable> lastException) {
private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URL> iterator, final AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) {
log.error("Can't update cluster state", lastException.get());
scheduleClusterChangeCheck(cfg, null);
return;
}
final URI uri = iterator.next();
if (!getShutdownLatch().acquire()) {
return;
}
final URL uri = iterator.next();
RFuture<RedisConnection> connectionFuture = connect(cfg, uri);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
lastException.set(future.cause());
getShutdownLatch().release();
checkClusterState(cfg, iterator, lastException);
return;
}
@ -362,7 +366,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
});
}
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator, final URI uri) {
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URL> iterator, final URL uri) {
RFuture<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
future.addListener(new FutureListener<List<ClusterNodeInfo>>() {
@Override
@ -370,6 +374,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (!future.isSuccess()) {
log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause());
close(connection);
getShutdownLatch().release();
scheduleClusterChangeCheck(cfg, iterator);
return;
}
@ -393,6 +398,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public void operationComplete(Future<Void> future) throws Exception {
checkSlotsMigration(newPartitions, nodesValue.toString());
checkSlotsChange(cfg, newPartitions, nodesValue.toString());
getShutdownLatch().release();
scheduleClusterChangeCheck(cfg, null);
}
});
@ -409,7 +415,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
MasterSlaveEntry entry = getEntry(currentPart.getMasterAddr());
// should be invoked first in order to remove stale failedSlaveAddresses
Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
Set<URL> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
// Do some slaves have changed state from failed to alive?
upDownSlaves(entry, currentPart, newPart, addedSlaves);
@ -418,20 +424,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart, Set<URI> addedSlaves) {
Set<URI> aliveSlaves = new HashSet<URI>(currentPart.getFailedSlaveAddresses());
private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart, Set<URL> addedSlaves) {
Set<URL> aliveSlaves = new HashSet<URL>(currentPart.getFailedSlaveAddresses());
aliveSlaves.removeAll(addedSlaves);
aliveSlaves.removeAll(newPart.getFailedSlaveAddresses());
for (URI uri : aliveSlaves) {
for (URL uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri);
if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
Set<URI> failedSlaves = new HashSet<URI>(newPart.getFailedSlaveAddresses());
Set<URL> failedSlaves = new HashSet<URL>(newPart.getFailedSlaveAddresses());
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) {
for (URL uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
@ -439,11 +445,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
private Set<URI> addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) {
Set<URI> removedSlaves = new HashSet<URI>(currentPart.getSlaveAddresses());
private Set<URL> addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) {
Set<URL> removedSlaves = new HashSet<URL>(currentPart.getSlaveAddresses());
removedSlaves.removeAll(newPart.getSlaveAddresses());
for (URI uri : removedSlaves) {
for (URL uri : removedSlaves) {
currentPart.removeSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
@ -451,9 +457,9 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
}
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
Set<URL> addedSlaves = new HashSet<URL>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses());
for (final URI uri : addedSlaves) {
for (final URL uri : addedSlaves) {
RFuture<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
future.addListener(new FutureListener<Void>() {
@Override
@ -510,8 +516,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
log.info("changing master from {} to {} for {}",
currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot);
URI newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress();
URL newUri = newMasterPart.getMasterAddress();
URL oldUri = currentPart.getMasterAddress();
changeMaster(slot, newUri.getHost(), newUri.getPort());
@ -714,7 +720,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
@Override
public URI getLastClusterNode() {
public URL getLastClusterNode() {
return lastClusterNode;
}

@ -16,10 +16,11 @@
package org.redisson.cluster;
import java.net.URI;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
@ -33,7 +34,7 @@ public class ClusterNodeInfo {
private final String nodeInfo;
private String nodeId;
private URI address;
private URL address;
private final Set<Flag> flags = new HashSet<Flag>();
private String slaveOf;
@ -50,11 +51,11 @@ public class ClusterNodeInfo {
this.nodeId = nodeId;
}
public URI getAddress() {
public URL getAddress() {
return address;
}
public void setAddress(String address) {
this.address = URIBuilder.create(address);
this.address = URLBuilder.create(address);
}
public void addSlotRange(ClusterSlotRange range) {

@ -16,20 +16,25 @@
package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class ClusterPartition {
private final String nodeId;
private boolean masterFail;
private URI masterAddress;
private final Set<URI> slaveAddresses = new HashSet<URI>();
private final Set<URI> failedSlaves = new HashSet<URI>();
private URL masterAddress;
private final Set<URL> slaveAddresses = new HashSet<URL>();
private final Set<URL> failedSlaves = new HashSet<URL>();
private final Set<Integer> slots = new HashSet<Integer>();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();
@ -85,33 +90,33 @@ public class ClusterPartition {
return new InetSocketAddress(masterAddress.getHost(), masterAddress.getPort());
}
public URI getMasterAddress() {
public URL getMasterAddress() {
return masterAddress;
}
public void setMasterAddress(String masterAddress) {
setMasterAddress(URIBuilder.create(masterAddress));
setMasterAddress(URLBuilder.create(masterAddress));
}
public void setMasterAddress(URI masterAddress) {
public void setMasterAddress(URL masterAddress) {
this.masterAddress = masterAddress;
}
public void addFailedSlaveAddress(URI address) {
public void addFailedSlaveAddress(URL address) {
failedSlaves.add(address);
}
public Set<URI> getFailedSlaveAddresses() {
public Set<URL> getFailedSlaveAddresses() {
return Collections.unmodifiableSet(failedSlaves);
}
public void removeFailedSlaveAddress(URI uri) {
public void removeFailedSlaveAddress(URL uri) {
failedSlaves.remove(uri);
}
public void addSlaveAddress(URI address) {
public void addSlaveAddress(URL address) {
slaveAddresses.add(address);
}
public Set<URI> getSlaveAddresses() {
public Set<URL> getSlaveAddresses() {
return Collections.unmodifiableSet(slaveAddresses);
}
public void removeSlaveAddress(URI uri) {
public void removeSlaveAddress(URL uri) {
slaveAddresses.remove(uri);
failedSlaves.remove(uri);
}

@ -50,6 +50,8 @@ public interface CommandAsyncExecutor {
boolean await(RFuture<?> RFuture, long timeout, TimeUnit timeoutUnit) throws InterruptedException;
void syncSubscription(RFuture<?> future);
<V> V get(RFuture<V> RFuture);
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);

@ -48,6 +48,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
@ -70,6 +71,7 @@ import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RedissonObjectFactory;
/**
@ -117,6 +119,20 @@ public class CommandAsyncService implements CommandAsyncExecutor {
return redisson != null || redissonReactive != null;
}
@Override
public void syncSubscription(RFuture<?> future) {
MasterSlaveServersConfig config = connectionManager.getConfig();
try {
int timeout = config.getTimeout() + config.getRetryInterval()*config.getRetryAttempts();
if (!future.await(timeout)) {
throw new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
future.syncUninterruptibly();
}
@Override
public <V> V get(RFuture<V> future) {
if (!future.isDone()) {

@ -44,22 +44,22 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
/**
* Redis 'slave' node minimum idle connection amount for <b>each</b> slave node
*/
private int slaveConnectionMinimumIdleSize = 5;
private int slaveConnectionMinimumIdleSize = 10;
/**
* Redis 'slave' node maximum connection pool size for <b>each</b> slave node
*/
private int slaveConnectionPoolSize = 250;
private int slaveConnectionPoolSize = 64;
/**
* Redis 'master' node minimum idle connection amount for <b>each</b> slave node
*/
private int masterConnectionMinimumIdleSize = 5;
private int masterConnectionMinimumIdleSize = 10;
/**
* Redis 'master' node maximum connection pool size
*/
private int masterConnectionPoolSize = 250;
private int masterConnectionPoolSize = 64;
private ReadMode readMode = ReadMode.SLAVE;
@ -81,7 +81,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
/**
* Redis 'slave' servers connection pool size for <b>each</b> slave node.
* <p>
* Default is <code>250</code>
* Default is <code>64</code>
* <p>
* @see #setSlaveConnectionMinimumIdleSize(int)
*
@ -99,7 +99,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
/**
* Redis 'master' server connection pool size.
* <p>
* Default is <code>250</code>
* Default is <code>64</code>
*
* @see #setMasterConnectionMinimumIdleSize(int)
*
@ -155,7 +155,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
/**
* Redis 'slave' node minimum idle connection amount for <b>each</b> slave node
* <p>
* Default is <code>5</code>
* Default is <code>10</code>
* <p>
* @see #setSlaveConnectionPoolSize(int)
*
@ -173,7 +173,7 @@ public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig
/**
* Redis 'master' node minimum idle connection amount for <b>each</b> slave node
* <p>
* Default is <code>5</code>
* Default is <code>10</code>
* <p>
* @see #setMasterConnectionPoolSize(int)
*

@ -15,18 +15,23 @@
*/
package org.redisson.config;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterServersConfig> {
/**
* Redis cluster node urls list
*/
private List<URI> nodeAddresses = new ArrayList<URI>();
private List<URL> nodeAddresses = new ArrayList<URL>();
/**
* Redis cluster scan interval in milliseconds
@ -50,14 +55,14 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
*/
public ClusterServersConfig addNodeAddress(String ... addresses) {
for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address));
nodeAddresses.add(URLBuilder.create(address));
}
return this;
}
public List<URI> getNodeAddresses() {
public List<URL> getNodeAddresses() {
return nodeAddresses;
}
void setNodeAddresses(List<URI> nodeAddresses) {
void setNodeAddresses(List<URL> nodeAddresses) {
this.nodeAddresses = nodeAddresses;
}

@ -47,7 +47,13 @@ import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.redisson.codec.CodecProvider;
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class ConfigSupport {
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "class")
@ -111,6 +117,10 @@ public class ConfigSupport {
private final ObjectMapper jsonMapper = createMapper(null);
private final ObjectMapper yamlMapper = createMapper(new YAMLFactory());
public ConfigSupport() {
URLBuilder.init();
}
public <T> T fromJSON(String content, Class<T> configType) throws IOException {
return jsonMapper.readValue(content, configType);
}

@ -15,24 +15,25 @@
*/
package org.redisson.config;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
* Configuration for an AWS ElastiCache replication group. A replication group is composed
* of a single master endpoint and multiple read slaves.
*
* @author Steve Ungerer
* @author Nikita Koksharov
*/
public class ElasticacheServersConfig extends BaseMasterSlaveServersConfig<ElasticacheServersConfig> {
/**
* Replication group node urls list
*/
private List<URI> nodeAddresses = new ArrayList<URI>();
private List<URL> nodeAddresses = new ArrayList<URL>();
/**
* Replication group scan interval in milliseconds
@ -62,14 +63,14 @@ public class ElasticacheServersConfig extends BaseMasterSlaveServersConfig<Elast
*/
public ElasticacheServersConfig addNodeAddress(String ... addresses) {
for (String address : addresses) {
nodeAddresses.add(URIBuilder.create(address));
nodeAddresses.add(URLBuilder.create(address));
}
return this;
}
public List<URI> getNodeAddresses() {
public List<URL> getNodeAddresses() {
return nodeAddresses;
}
void setNodeAddresses(List<URI> nodeAddresses) {
void setNodeAddresses(List<URL> nodeAddresses) {
this.nodeAddresses = nodeAddresses;
}

@ -15,25 +15,28 @@
*/
package org.redisson.config;
import java.net.URI;
import java.util.Collections;
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<MasterSlaveServersConfig> {
/**
* Redis slave servers addresses
*/
private Set<URI> slaveAddresses = new HashSet<URI>();
private Set<URL> slaveAddresses = new HashSet<URL>();
/**
* Redis master server address
*/
private List<URI> masterAddress;
private URL masterAddress;
/**
* Database index used for Redis connection
@ -59,19 +62,19 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
*/
public MasterSlaveServersConfig setMasterAddress(String masterAddress) {
if (masterAddress != null) {
this.masterAddress = Collections.singletonList(URIBuilder.create(masterAddress));
this.masterAddress = URLBuilder.create(masterAddress);
}
return this;
}
public URI getMasterAddress() {
public URL getMasterAddress() {
if (masterAddress != null) {
return masterAddress.get(0);
return masterAddress;
}
return null;
}
public void setMasterAddress(URI masterAddress) {
public void setMasterAddress(URL masterAddress) {
if (masterAddress != null) {
this.masterAddress = Collections.singletonList(masterAddress);
this.masterAddress = masterAddress;
}
}
@ -83,18 +86,18 @@ public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<Maste
*/
public MasterSlaveServersConfig addSlaveAddress(String ... addresses) {
for (String address : addresses) {
slaveAddresses.add(URIBuilder.create(address));
slaveAddresses.add(URLBuilder.create(address));
}
return this;
}
public MasterSlaveServersConfig addSlaveAddress(URI slaveAddress) {
public MasterSlaveServersConfig addSlaveAddress(URL slaveAddress) {
slaveAddresses.add(slaveAddress);
return this;
}
public Set<URI> getSlaveAddresses() {
public Set<URL> getSlaveAddresses() {
return slaveAddresses;
}
public void setSlaveAddresses(Set<URI> readAddresses) {
public void setSlaveAddresses(Set<URL> readAddresses) {
this.slaveAddresses = readAddresses;
}

@ -15,15 +15,20 @@
*/
package org.redisson.config;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
* @author Nikita Koksharov
*
*/
public class SentinelServersConfig extends BaseMasterSlaveServersConfig<SentinelServersConfig> {
private List<URI> sentinelAddresses = new ArrayList<URI>();
private List<URL> sentinelAddresses = new ArrayList<URL>();
private String masterName;
@ -64,14 +69,14 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
*/
public SentinelServersConfig addSentinelAddress(String ... addresses) {
for (String address : addresses) {
sentinelAddresses.add(URIBuilder.create(address));
sentinelAddresses.add(URLBuilder.create(address));
}
return this;
}
public List<URI> getSentinelAddresses() {
public List<URL> getSentinelAddresses() {
return sentinelAddresses;
}
void setSentinelAddresses(List<URI> sentinelAddresses) {
void setSentinelAddresses(List<URL> sentinelAddresses) {
this.sentinelAddresses = sentinelAddresses;
}

@ -15,11 +15,9 @@
*/
package org.redisson.config;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.net.URL;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
/**
*
@ -32,7 +30,7 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
* Redis server address
*
*/
private List<URI> address;
private URL address;
/**
* Minimum idle subscription connection amount
@ -48,12 +46,12 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
/**
* Minimum idle Redis connection amount
*/
private int connectionMinimumIdleSize = 5;
private int connectionMinimumIdleSize = 10;
/**
* Redis connection maximum pool size
*/
private int connectionPoolSize = 250;
private int connectionPoolSize = 64;
/**
* Database index used for Redis connection
@ -92,7 +90,7 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
/**
* Redis connection pool size
* <p>
* Default is <code>250</code>
* Default is <code>64</code>
*
* @param connectionPoolSize - pool size
* @return config
@ -129,19 +127,19 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
*/
public SingleServerConfig setAddress(String address) {
if (address != null) {
this.address = Collections.singletonList(URIBuilder.create(address));
this.address = URLBuilder.create(address);
}
return this;
}
public URI getAddress() {
public URL getAddress() {
if (address != null) {
return address.get(0);
return address;
}
return null;
}
void setAddress(URI address) {
void setAddress(URL address) {
if (address != null) {
this.address = Collections.singletonList(address);
this.address = address;
}
}
@ -197,7 +195,7 @@ public class SingleServerConfig extends BaseConfig<SingleServerConfig> {
/**
* Minimum idle Redis connection amount.
* <p>
* Default is 5
* Default is <code>10</code>
*
* @param connectionMinimumIdleSize - connections amount
* @return config

@ -16,7 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@ -47,7 +47,7 @@ public interface ConnectionManager {
ExecutorService getExecutor();
URI getLastClusterNode();
URL getLastClusterNode();
boolean isClusterMode();

@ -15,7 +15,7 @@
*/
package org.redisson.connection;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -55,9 +55,9 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private AtomicReference<URI> currentMaster = new AtomicReference<URI>();
private AtomicReference<URL> currentMaster = new AtomicReference<URL>();
private final Map<URI, RedisConnection> nodeConnections = new HashMap<URI, RedisConnection>();
private final Map<URL, RedisConnection> nodeConnections = new HashMap<URL, RedisConnection>();
private ScheduledFuture<?> monitorFuture;
@ -72,7 +72,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
this.config = create(cfg);
initTimer(this.config);
for (URI addr : cfg.getNodeAddresses()) {
for (URL addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
connectionFuture.awaitUninterruptibly();
RedisConnection connection = connectionFuture.getNow();
@ -110,7 +110,7 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
return res;
}
private RFuture<RedisConnection> connect(BaseMasterSlaveServersConfig<?> cfg, final URI addr) {
private RFuture<RedisConnection> connect(BaseMasterSlaveServersConfig<?> cfg, final URL addr) {
RedisConnection connection = nodeConnections.get(addr);
if (connection != null) {
return newSucceededFuture(connection);
@ -158,11 +158,11 @@ public class ElasticacheConnectionManager extends MasterSlaveConnectionManager {
monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override
public void run() {
final URI master = currentMaster.get();
final URL master = currentMaster.get();
log.debug("Current master: {}", master);
final AtomicInteger count = new AtomicInteger(cfg.getNodeAddresses().size());
for (final URI addr : cfg.getNodeAddresses()) {
for (final URL addr : cfg.getNodeAddresses()) {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override

@ -16,7 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -280,7 +280,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config,
HashSet<ClusterSlotRange> slots) {
MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URL>emptySet());
for (RFuture<Void> future : fs) {
future.syncUninterruptibly();
}
@ -744,7 +744,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
for (MasterSlaveEntry entry : entries.values()) {
entry.shutdown();
}
timer.stop();
if (!sharedExecutor) {
executor.shutdown();
@ -758,6 +757,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (!sharedEventLoopGroup) {
group.shutdownGracefully(quietPeriod, timeout, unit).syncUninterruptibly();
}
timer.stop();
}
@Override
@ -835,7 +835,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return executor;
}
public URI getLastClusterNode() {
public URL getLastClusterNode() {
return null;
}
}

@ -16,7 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@ -37,7 +37,6 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl;
import org.redisson.connection.pool.MasterConnectionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -76,11 +75,11 @@ public class MasterSlaveEntry {
this.connectionManager = connectionManager;
this.config = config;
slaveBalancer = new LoadBalancerManagerImpl(config, connectionManager, this);
slaveBalancer = new LoadBalancerManager(config, connectionManager, this);
writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);
}
public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {
public List<RFuture<Void>> initSlaveBalancer(Collection<URL> disconnectedNodes) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty()
&& config.getReadMode() == ReadMode.SLAVE
&& disconnectedNodes.size() < config.getSlaveAddresses().size();
@ -88,7 +87,7 @@ public class MasterSlaveEntry {
List<RFuture<Void>> result = new LinkedList<RFuture<Void>>();
RFuture<Void> f = addSlave(config.getMasterAddress().getHost(), config.getMasterAddress().getPort(), freezeMasterAsSlave, NodeType.MASTER);
result.add(f);
for (URI address : config.getSlaveAddresses()) {
for (URL address : config.getSlaveAddresses()) {
f = addSlave(address.getHost(), address.getPort(), disconnectedNodes.contains(address), NodeType.SLAVE);
result.add(f);
}

@ -16,7 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -41,7 +41,7 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.config.SentinelServersConfig;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,7 +62,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();
private final Set<URI> disconnectedSlaves = new HashSet<URI>();
private final Set<URL> disconnectedSlaves = new HashSet<URL>();
public SentinelConnectionManager(SentinelServersConfig cfg, Config config) {
super(config);
@ -70,7 +70,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
final MasterSlaveServersConfig c = create(cfg);
initTimer(c);
for (URI addr : cfg.getSentinelAddresses()) {
for (URL addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
try {
RedisConnection connection = client.connect();
@ -104,7 +104,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.info("slave: {} added", host);
if (flags.contains("s_down") || flags.contains("disconnected")) {
URI url = URIBuilder.create(host);
URL url = URLBuilder.create(host);
disconnectedSlaves.add(url);
log.warn("slave: {} is down", host);
}
@ -123,7 +123,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
init(c);
List<RFuture<RedisPubSubConnection>> connectionFutures = new ArrayList<RFuture<RedisPubSubConnection>>(cfg.getSentinelAddresses().size());
for (URI addr : cfg.getSentinelAddresses()) {
for (URL addr : cfg.getSentinelAddresses()) {
RFuture<RedisPubSubConnection> future = registerSentinel(cfg, addr, c);
connectionFutures.add(future);
}
@ -146,7 +146,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return entry;
}
private RFuture<RedisPubSubConnection> registerSentinel(final SentinelServersConfig cfg, final URI addr, final MasterSlaveServersConfig c) {
private RFuture<RedisPubSubConnection> registerSentinel(final SentinelServersConfig cfg, final URL addr, final MasterSlaveServersConfig c) {
RedisClient client = createClient(addr.getHost(), addr.getPort(), c.getConnectTimeout(), c.getRetryInterval() * c.getRetryAttempts());
RedisClient oldClient = sentinels.putIfAbsent(addr.getHost() + ":" + addr.getPort(), client);
if (oldClient != null) {
@ -208,12 +208,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = parts[3];
String addr = ip + ":" + port;
URI uri = URIBuilder.create(addr);
URL uri = URLBuilder.create(addr);
registerSentinel(cfg, uri, c);
}
}
protected void onSlaveAdded(URI addr, String msg) {
protected void onSlaveAdded(URL addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 4
@ -250,7 +250,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onNodeDown(URI sentinelAddr, String msg) {
private void onNodeDown(URL sentinelAddr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
@ -298,7 +298,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onNodeUp(URI addr, String msg) {
private void onNodeUp(URL addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {
@ -337,7 +337,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
private void onMasterChange(SentinelServersConfig cfg, URL addr, String msg) {
String[] parts = msg.split(" ");
if (parts.length > 3) {

@ -16,37 +16,164 @@
package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.RPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public interface LoadBalancerManager {
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
RFuture<RedisConnection> getConnection(InetSocketAddress addr);
public class LoadBalancerManager {
int getAvailableClients();
private final Logger log = LoggerFactory.getLogger(getClass());
void shutdownAsync();
private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool;
void shutdown();
public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager;
slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);
pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
}
boolean unfreeze(String host, int port, FreezeReason freezeReason);
public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> result = connectionManager.newPromise();
FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2);
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
addr2Entry.put(entry.getClient().getAddr(), entry);
result.trySuccess(null);
}
}
};
ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason);
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
slaveFuture.addListener(listener);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
pubSubFuture.addListener(listener);
return result;
}
public int getAvailableClients() {
int count = 0;
for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) {
if (!connectionEntry.isFreezed()) {
count++;
}
}
return count;
}
public boolean unfreeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry == null) {
throw new IllegalStateException("Can't find " + addr + " in slaves!");
}
synchronized (entry) {
if (!entry.isFreezed()) {
return false;
}
if ((freezeReason == FreezeReason.RECONNECT
&& entry.getFreezeReason() == FreezeReason.RECONNECT)
|| freezeReason != FreezeReason.RECONNECT) {
entry.resetFailedAttempts();
entry.setFreezed(false);
entry.setFreezeReason(null);
return true;
}
}
return false;
}
ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason);
public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr);
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) {
return null;
}
synchronized (connectionEntry) {
// only RECONNECT freeze reason could be replaced
if (connectionEntry.getFreezeReason() == null
|| connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) {
connectionEntry.setFreezed(true);
connectionEntry.setFreezeReason(freezeReason);
return connectionEntry;
}
if (connectionEntry.isFreezed()) {
return null;
}
}
return connectionEntry;
}
public RFuture<RedisPubSubConnection> nextPubSubConnection() {
return pubSubConnectionPool.get();
}
public RFuture<RedisConnection> getConnection(InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry != null) {
return slaveConnectionPool.get(entry);
}
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return connectionManager.newFailedFuture(exception);
}
RFuture<Void> add(ClientConnectionsEntry entry);
public RFuture<RedisConnection> nextConnection() {
return slaveConnectionPool.get();
}
RFuture<RedisConnection> nextConnection();
public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
pubSubConnectionPool.returnConnection(entry, connection);
}
RFuture<RedisPubSubConnection> nextPubSubConnection();
public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
slaveConnectionPool.returnConnection(entry, connection);
}
void returnConnection(RedisConnection connection);
public void shutdown() {
for (ClientConnectionsEntry entry : addr2Entry.values()) {
entry.getClient().shutdown();
}
}
void returnPubSubConnection(RedisPubSubConnection connection);
public void shutdownAsync() {
for (ClientConnectionsEntry entry : addr2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient());
}
}
}

@ -1,179 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.RPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;
public class LoadBalancerManagerImpl implements LoadBalancerManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool;
public LoadBalancerManagerImpl(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager;
slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);
pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
}
public RFuture<Void> add(final ClientConnectionsEntry entry) {
final RPromise<Void> result = connectionManager.newPromise();
FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2);
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
addr2Entry.put(entry.getClient().getAddr(), entry);
result.trySuccess(null);
}
}
};
RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);
slaveFuture.addListener(listener);
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);
pubSubFuture.addListener(listener);
return result;
}
public int getAvailableClients() {
int count = 0;
for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) {
if (!connectionEntry.isFreezed()) {
count++;
}
}
return count;
}
public boolean unfreeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry == null) {
throw new IllegalStateException("Can't find " + addr + " in slaves!");
}
synchronized (entry) {
if (!entry.isFreezed()) {
return false;
}
if ((freezeReason == FreezeReason.RECONNECT
&& entry.getFreezeReason() == FreezeReason.RECONNECT)
|| freezeReason != FreezeReason.RECONNECT) {
entry.resetFailedAttempts();
entry.setFreezed(false);
entry.setFreezeReason(null);
return true;
}
}
return false;
}
public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr);
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) {
return null;
}
synchronized (connectionEntry) {
// only RECONNECT freeze reason could be replaced
if (connectionEntry.getFreezeReason() == null
|| connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) {
connectionEntry.setFreezed(true);
connectionEntry.setFreezeReason(freezeReason);
return connectionEntry;
}
if (connectionEntry.isFreezed()) {
return null;
}
}
return connectionEntry;
}
public RFuture<RedisPubSubConnection> nextPubSubConnection() {
return pubSubConnectionPool.get();
}
public RFuture<RedisConnection> getConnection(InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry != null) {
return slaveConnectionPool.get(entry);
}
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return connectionManager.newFailedFuture(exception);
}
public RFuture<RedisConnection> nextConnection() {
return slaveConnectionPool.get();
}
public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
pubSubConnectionPool.returnConnection(entry, connection);
}
public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
slaveConnectionPool.returnConnection(entry, connection);
}
public void shutdown() {
for (ClientConnectionsEntry entry : addr2Entry.values()) {
entry.getClient().shutdown();
}
}
public void shutdownAsync() {
for (ClientConnectionsEntry entry : addr2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient());
}
}
}

@ -16,7 +16,7 @@
package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -28,7 +28,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.misc.URIBuilder;
import org.redisson.misc.URLBuilder;
import io.netty.util.internal.PlatformDependent;
@ -78,7 +78,7 @@ public class WeightedRoundRobinBalancer implements LoadBalancer {
*/
public WeightedRoundRobinBalancer(Map<String, Integer> weights, int defaultWeight) {
for (Entry<String, Integer> entry : weights.entrySet()) {
URI uri = URIBuilder.create(entry.getKey());
URL uri = URLBuilder.create(entry.getKey());
InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
if (entry.getValue() <= 0) {
throw new IllegalArgumentException("Weight can't be less than or equal zero");

File diff suppressed because it is too large Load Diff

@ -1,57 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache;
import javax.cache.Cache;
/**
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class JCacheEntry<K, V> implements Cache.Entry<K, V> {
private final K key;
private final V value;
public JCacheEntry(K key, V value) {
super();
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public <T> T unwrap(Class<T> clazz) {
if (clazz.isAssignableFrom(getClass())) {
return clazz.cast(this);
}
return null;
}
}

@ -1,74 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.EventType;
/**
* Entry event element passed to EventListener of JCache object
*
* @author Nikita Koksharov
*
* @param <K> key
* @param <V> value
*/
public class JCacheEntryEvent<K, V> extends CacheEntryEvent<K, V> {
private static final long serialVersionUID = -4601376694286796662L;
private final Object key;
private final Object value;
public JCacheEntryEvent(Cache<K, V> source, EventType eventType, Object key, Object value) {
super(source, eventType);
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return (K) key;
}
@Override
public V getValue() {
return (V) value;
}
@Override
public <T> T unwrap(Class<T> clazz) {
if (clazz.isAssignableFrom(getClass())) {
return clazz.cast(this);
}
return null;
}
@Override
public V getOldValue() {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isOldValueAvailable() {
// TODO Auto-generated method stub
return false;
}
}

@ -1,90 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache;
import java.io.IOException;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
*/
public class JCacheEventCodec implements Codec {
private final Codec codec;
private final Decoder<Object> decoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
List<Object> result = new ArrayList<Object>();
int keyLen = buf.order(ByteOrder.LITTLE_ENDIAN).readInt();
ByteBuf keyBuf = buf.readSlice(keyLen);
Object key = codec.getMapKeyDecoder().decode(keyBuf, state);
result.add(key);
int valueLen = buf.order(ByteOrder.LITTLE_ENDIAN).readInt();
ByteBuf valueBuf = buf.readSlice(valueLen);
Object value = codec.getMapValueDecoder().decode(valueBuf, state);
result.add(value);
return result;
}
};
public JCacheEventCodec(Codec codec) {
super();
this.codec = codec;
}
@Override
public Decoder<Object> getMapValueDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapValueEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
throw new UnsupportedOperationException();
}
@Override
public Encoder getMapKeyEncoder() {
throw new UnsupportedOperationException();
}
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
throw new UnsupportedOperationException();
}
}

@ -1,364 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.CacheManager;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.Configuration;
import javax.cache.spi.CachingProvider;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import org.redisson.Redisson;
import org.redisson.jcache.bean.EmptyStatisticsMXBean;
import org.redisson.jcache.bean.JCacheManagementMXBean;
import org.redisson.jcache.bean.JCacheStatisticsMXBean;
import org.redisson.jcache.configuration.JCacheConfiguration;
/**
*
* @author Nikita Koksharov
*
*/
public class JCacheManager implements CacheManager {
private static final EmptyStatisticsMXBean EMPTY_INSTANCE = new EmptyStatisticsMXBean();
private static MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
private final ClassLoader classLoader;
private final CachingProvider cacheProvider;
private final Properties properties;
private final URI uri;
private final ConcurrentMap<String, JCache<?, ?>> caches = new ConcurrentHashMap<String, JCache<?, ?>>();
private final ConcurrentMap<JCache<?, ?>, JCacheStatisticsMXBean> statBeans = new ConcurrentHashMap<JCache<?, ?>, JCacheStatisticsMXBean>();
private final ConcurrentMap<JCache<?, ?>, JCacheManagementMXBean> managementBeans = new ConcurrentHashMap<JCache<?, ?>, JCacheManagementMXBean>();
private volatile boolean closed;
private final Redisson redisson;
public JCacheManager(Redisson redisson, ClassLoader classLoader, CachingProvider cacheProvider, Properties properties, URI uri) {
super();
this.classLoader = classLoader;
this.cacheProvider = cacheProvider;
this.properties = properties;
this.uri = uri;
this.redisson = redisson;
}
@Override
public CachingProvider getCachingProvider() {
return cacheProvider;
}
@Override
public URI getURI() {
return uri;
}
@Override
public ClassLoader getClassLoader() {
return classLoader;
}
@Override
public Properties getProperties() {
return properties;
}
private void checkNotClosed() {
if (closed) {
throw new IllegalStateException();
}
}
@Override
public <K, V, C extends Configuration<K, V>> Cache<K, V> createCache(String cacheName, C configuration)
throws IllegalArgumentException {
checkNotClosed();
if (cacheName == null) {
throw new NullPointerException();
}
if (configuration == null) {
throw new NullPointerException();
}
JCacheConfiguration<K, V> cfg = new JCacheConfiguration<K, V>(configuration);
JCache<K, V> cache = new JCache<K, V>(this, redisson, cacheName, cfg);
JCache<?, ?> oldCache = caches.putIfAbsent(cacheName, cache);
if (oldCache != null) {
throw new CacheException("Cache " + cacheName + " already exists");
}
if (cfg.isStatisticsEnabled()) {
enableStatistics(cacheName, true);
}
if (cfg.isManagementEnabled()) {
enableManagement(cacheName, true);
}
return cache;
}
@Override
public <K, V> Cache<K, V> getCache(String cacheName, Class<K> keyType, Class<V> valueType) {
checkNotClosed();
if (cacheName == null) {
throw new NullPointerException();
}
if (keyType == null) {
throw new NullPointerException();
}
if (valueType == null) {
throw new NullPointerException();
}
JCache<?, ?> cache = caches.get(cacheName);
if (cache == null) {
return null;
}
if (!keyType.isAssignableFrom(cache.getConfiguration(CompleteConfiguration.class).getKeyType())) {
throw new ClassCastException("Wrong type of key for " + cacheName);
}
if (!valueType.isAssignableFrom(cache.getConfiguration(CompleteConfiguration.class).getValueType())) {
throw new ClassCastException("Wrong type of value for " + cacheName);
}
return (Cache<K, V>) cache;
}
@Override
public <K, V> Cache<K, V> getCache(String cacheName) {
checkNotClosed();
Cache<K, V> cache = (Cache<K, V>) getCache(cacheName, Object.class, Object.class);
if (cache != null) {
if (cache.getConfiguration(CompleteConfiguration.class).getKeyType() != Object.class) {
throw new IllegalArgumentException("Wrong type of key for " + cacheName);
}
if (cache.getConfiguration(CompleteConfiguration.class).getValueType() != Object.class) {
throw new IllegalArgumentException("Wrong type of value for " + cacheName);
}
}
return cache;
}
@Override
public Iterable<String> getCacheNames() {
return Collections.unmodifiableSet(new HashSet<String>(caches.keySet()));
}
@Override
public void destroyCache(String cacheName) {
checkNotClosed();
if (cacheName == null) {
throw new NullPointerException();
}
JCache<?, ?> cache = caches.get(cacheName);
if (cache != null) {
cache.clear();
cache.close();
}
}
public void closeCache(JCache<?, ?> cache) {
caches.remove(cache.getName());
unregisterStatisticsBean(cache);
unregisterManagementBean(cache);
}
@Override
public void enableManagement(String cacheName, boolean enabled) {
checkNotClosed();
if (cacheName == null) {
throw new NullPointerException();
}
JCache<?, ?> cache = caches.get(cacheName);
if (cache == null) {
throw new NullPointerException();
}
if (enabled) {
JCacheManagementMXBean statBean = managementBeans.get(cache);
if (statBean == null) {
statBean = new JCacheManagementMXBean(cache);
JCacheManagementMXBean oldBean = managementBeans.putIfAbsent(cache, statBean);
if (oldBean != null) {
statBean = oldBean;
}
}
try {
ObjectName name = new ObjectName(getName("Management", cache));
if (mBeanServer.queryNames(name, null).isEmpty()) {
mBeanServer.registerMBean(statBean, name);
}
} catch (MalformedObjectNameException e) {
throw new CacheException(e);
} catch (InstanceAlreadyExistsException e) {
throw new CacheException(e);
} catch (MBeanRegistrationException e) {
throw new CacheException(e);
} catch (NotCompliantMBeanException e) {
throw new CacheException(e);
}
} else {
unregisterManagementBean(cache);
}
cache.getConfiguration(JCacheConfiguration.class).setManagementEnabled(enabled);
}
private void unregisterManagementBean(JCache<?, ?> cache) {
JCacheManagementMXBean statBean = managementBeans.remove(cache);
if (statBean != null) {
try {
ObjectName name = new ObjectName(getName("Management", cache));
for (ObjectName objectName : mBeanServer.queryNames(name, null)) {
mBeanServer.unregisterMBean(objectName);
}
} catch (MalformedObjectNameException e) {
throw new CacheException(e);
} catch (MBeanRegistrationException e) {
throw new CacheException(e);
} catch (InstanceNotFoundException e) {
throw new CacheException(e);
}
}
}
public JCacheStatisticsMXBean getStatBean(JCache<?, ?> cache) {
JCacheStatisticsMXBean bean = statBeans.get(cache);
if (bean != null) {
return bean;
}
return EMPTY_INSTANCE;
}
private String getName(String name, JCache<?, ?> cache) {
return "javax.cache:type=Cache" + name + ",CacheManager="
+ cache.getCacheManager().getURI().toString().replaceAll(",|:|=|\n", ".")
+ ",Cache=" + cache.getName().replaceAll(",|:|=|\n", ".");
}
@Override
public void enableStatistics(String cacheName, boolean enabled) {
checkNotClosed();
if (cacheName == null) {
throw new NullPointerException();
}
JCache<?, ?> cache = caches.get(cacheName);
if (cache == null) {
throw new NullPointerException();
}
if (enabled) {
JCacheStatisticsMXBean statBean = statBeans.get(cache);
if (statBean == null) {
statBean = new JCacheStatisticsMXBean();
JCacheStatisticsMXBean oldBean = statBeans.putIfAbsent(cache, statBean);
if (oldBean != null) {
statBean = oldBean;
}
}
try {
ObjectName name = new ObjectName(getName("Statistics", cache));
if (!mBeanServer.isRegistered(name)) {
mBeanServer.registerMBean(statBean, name);
}
} catch (MalformedObjectNameException e) {
throw new CacheException(e);
} catch (InstanceAlreadyExistsException e) {
throw new CacheException(e);
} catch (MBeanRegistrationException e) {
throw new CacheException(e);
} catch (NotCompliantMBeanException e) {
throw new CacheException(e);
}
} else {
unregisterStatisticsBean(cache);
}
cache.getConfiguration(JCacheConfiguration.class).setStatisticsEnabled(enabled);
}
private void unregisterStatisticsBean(JCache<?, ?> cache) {
JCacheStatisticsMXBean statBean = statBeans.remove(cache);
if (statBean != null) {
try {
ObjectName name = new ObjectName(getName("Statistics", cache));
for (ObjectName objectName : mBeanServer.queryNames(name, null)) {
mBeanServer.unregisterMBean(objectName);
}
} catch (MalformedObjectNameException e) {
throw new CacheException(e);
} catch (MBeanRegistrationException e) {
throw new CacheException(e);
} catch (InstanceNotFoundException e) {
throw new CacheException(e);
}
}
}
@Override
public void close() {
if (isClosed()) {
return;
}
synchronized (cacheProvider) {
if (!isClosed()) {
cacheProvider.close(uri, classLoader);
for (Cache<?, ?> cache : caches.values()) {
try {
cache.close();
} catch (Exception e) {
// skip
}
}
redisson.shutdown();
closed = true;
}
}
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public <T> T unwrap(Class<T> clazz) {
if (clazz.isAssignableFrom(getClass())) {
return clazz.cast(this);
}
throw new IllegalArgumentException();
}
}

@ -1,165 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.CacheManager;
import javax.cache.configuration.OptionalFeature;
import javax.cache.spi.CachingProvider;
import org.redisson.Redisson;
import org.redisson.config.Config;
/**
*
* @author Nikita Koksharov
*
*/
public class JCachingProvider implements CachingProvider {
private final ConcurrentMap<ClassLoader, ConcurrentMap<URI, CacheManager>> managers =
new ConcurrentHashMap<ClassLoader, ConcurrentMap<URI, CacheManager>>();
private static URI DEFAULT_URI;
static {
try {
DEFAULT_URI = JCachingProvider.class.getResource("/redisson-jcache.json").toURI();
} catch (Exception e) {
// trying next format
try {
DEFAULT_URI = JCachingProvider.class.getResource("/redisson-jcache.yaml").toURI();
} catch (Exception e1) {
// skip
}
}
}
@Override
public CacheManager getCacheManager(URI uri, ClassLoader classLoader, Properties properties) {
if (uri == null) {
uri = getDefaultURI();
}
if (classLoader == null) {
classLoader = getDefaultClassLoader();
}
ConcurrentMap<URI, CacheManager> value = new ConcurrentHashMap<URI, CacheManager>();
ConcurrentMap<URI, CacheManager> oldValue = managers.putIfAbsent(classLoader, value);
if (oldValue != null) {
value = oldValue;
}
CacheManager manager = value.get(uri);
if (manager != null) {
return manager;
}
Config config = null;
try {
config = Config.fromJSON(uri.toURL());
} catch (IOException e) {
try {
config = Config.fromYAML(uri.toURL());
} catch (IOException e2) {
throw new IllegalArgumentException("Can't parse config " + uri, e2);
}
}
Redisson redisson = (Redisson) Redisson.create(config);
manager = new JCacheManager(redisson, classLoader, this, properties, uri);
CacheManager oldManager = value.putIfAbsent(uri, manager);
if (oldManager != null) {
redisson.shutdown();
manager = oldManager;
}
return manager;
}
@Override
public ClassLoader getDefaultClassLoader() {
return getClass().getClassLoader();
}
@Override
public URI getDefaultURI() {
return DEFAULT_URI;
}
@Override
public Properties getDefaultProperties() {
return new Properties();
}
@Override
public CacheManager getCacheManager(URI uri, ClassLoader classLoader) {
return getCacheManager(uri, classLoader, getDefaultProperties());
}
@Override
public CacheManager getCacheManager() {
return getCacheManager(getDefaultURI(), getDefaultClassLoader());
}
@Override
public void close() {
synchronized (managers) {
for (ClassLoader classLoader : managers.keySet()) {
close(classLoader);
}
}
}
@Override
public void close(ClassLoader classLoader) {
Map<URI, CacheManager> uri2manager = managers.remove(classLoader);
if (uri2manager != null) {
for (CacheManager manager : uri2manager.values()) {
manager.close();
}
}
}
@Override
public void close(URI uri, ClassLoader classLoader) {
Map<URI, CacheManager> uri2manager = managers.get(classLoader);
if (uri2manager == null) {
return;
}
CacheManager manager = uri2manager.remove(uri);
if (manager == null) {
return;
}
manager.close();
if (uri2manager.isEmpty()) {
managers.remove(classLoader, Collections.emptyMap());
}
}
@Override
public boolean isSupported(OptionalFeature optionalFeature) {
// TODO implement support of store_by_reference
return false;
}
}

@ -1,108 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache;
import javax.cache.processor.MutableEntry;
/**
*
* @author Nikita Koksharov
*
* @param <K>
* @param <V>
*/
public class JMutableEntry<K, V> implements MutableEntry<K, V> {
public enum Action {CREATED, READ, UPDATED, DELETED, LOADED, SKIPPED}
final JCache<K, V> jCache;
final K key;
boolean isReadThrough;
Action action = Action.SKIPPED;
V value;
public JMutableEntry(JCache<K, V> jCache, V value, K key, boolean isReadThrough) {
super();
this.jCache = jCache;
this.value = value;
this.key = key;
this.isReadThrough = isReadThrough;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
if (action != Action.SKIPPED) {
return value;
}
if (value != null) {
action = Action.READ;
} else if (isReadThrough) {
value = jCache.load(key);
if (value != null) {
action = Action.LOADED;
}
isReadThrough = false;
}
return value;
}
@Override
public <T> T unwrap(Class<T> clazz) {
return (T)this;
}
@Override
public boolean exists() {
return getValue() != null;
}
@Override
public void remove() {
if (action == Action.CREATED) {
action = Action.SKIPPED;
} else {
action = Action.DELETED;
}
value = null;
}
@Override
public void setValue(V value) {
if (value == null) {
throw new NullPointerException();
}
if (action != Action.CREATED) {
if (exists()) {
action = Action.UPDATED;
} else {
action = Action.CREATED;
}
}
this.value = value;
}
public Action getAction() {
return action;
}
}

@ -1,57 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache.bean;
/**
*
* @author Nikita Koksharov
*
*/
public class EmptyStatisticsMXBean extends JCacheStatisticsMXBean {
@Override
public void addEvictions(long value) {
}
@Override
public void addGetTime(long value) {
}
@Override
public void addHits(long value) {
}
@Override
public void addMisses(long value) {
}
@Override
public void addPuts(long value) {
}
@Override
public void addPutTime(long value) {
}
@Override
public void addRemovals(long value) {
}
@Override
public void addRemoveTime(long value) {
}
}

@ -1,72 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache.bean;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.management.CacheMXBean;
import org.redisson.jcache.JCache;
/**
*
* @author Nikita Koksharov
*
*/
public class JCacheManagementMXBean implements CacheMXBean {
private final JCache<?, ?> cache;
public JCacheManagementMXBean(JCache<?, ?> cache) {
super();
this.cache = cache;
}
@Override
public String getKeyType() {
return cache.getConfiguration(CompleteConfiguration.class).getKeyType().getName();
}
@Override
public String getValueType() {
return cache.getConfiguration(CompleteConfiguration.class).getValueType().getName();
}
@Override
public boolean isReadThrough() {
return cache.getConfiguration(CompleteConfiguration.class).isReadThrough();
}
@Override
public boolean isWriteThrough() {
return cache.getConfiguration(CompleteConfiguration.class).isWriteThrough();
}
@Override
public boolean isStoreByValue() {
return cache.getConfiguration(CompleteConfiguration.class).isStoreByValue();
}
@Override
public boolean isStatisticsEnabled() {
return cache.getConfiguration(CompleteConfiguration.class).isStatisticsEnabled();
}
@Override
public boolean isManagementEnabled() {
return cache.getConfiguration(CompleteConfiguration.class).isManagementEnabled();
}
}

@ -1,157 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache.bean;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.management.CacheStatisticsMXBean;
/**
*
* @author Nikita Koksharov
*
*/
public class JCacheStatisticsMXBean implements CacheStatisticsMXBean {
private final AtomicLong removals = new AtomicLong();
private final AtomicLong hits = new AtomicLong();
private final AtomicLong puts = new AtomicLong();
private final AtomicLong misses = new AtomicLong();
private final AtomicLong evictions = new AtomicLong();
private final AtomicLong removeTime = new AtomicLong();
private final AtomicLong getTime = new AtomicLong();
private final AtomicLong putTime = new AtomicLong();
@Override
public void clear() {
removals.set(0);
hits.set(0);
puts.set(0);
misses.set(0);
evictions.set(0);
removeTime.set(0);
getTime.set(0);
putTime.set(0);
}
public void addHits(long value) {
hits.addAndGet(value);
}
@Override
public long getCacheHits() {
return hits.get();
}
@Override
public float getCacheHitPercentage() {
long gets = getCacheGets();
if (gets == 0) {
return 0;
}
return (getCacheHits() * 100) / (float) gets;
}
public void addMisses(long value) {
misses.addAndGet(value);
}
@Override
public long getCacheMisses() {
return misses.get();
}
@Override
public float getCacheMissPercentage() {
long gets = getCacheGets();
if (gets == 0) {
return 0;
}
return (getCacheMisses() * 100) / (float) gets;
}
@Override
public long getCacheGets() {
return hits.get() + misses.get();
}
public void addPuts(long value) {
puts.addAndGet(value);
}
@Override
public long getCachePuts() {
return puts.get();
}
public void addRemovals(long value) {
removals.addAndGet(value);
}
@Override
public long getCacheRemovals() {
return removals.get();
}
public void addEvictions(long value) {
evictions.addAndGet(value);
}
@Override
public long getCacheEvictions() {
return evictions.get();
}
private float get(long value, long timeInNanos) {
if (value == 0 || timeInNanos == 0) {
return 0;
}
long timeInMicrosec = TimeUnit.NANOSECONDS.toMicros(timeInNanos);
return timeInMicrosec / value;
}
public void addGetTime(long value) {
getTime.addAndGet(value);
}
@Override
public float getAverageGetTime() {
return get(getCacheGets(), getTime.get());
}
public void addPutTime(long value) {
putTime.addAndGet(value);
}
@Override
public float getAveragePutTime() {
return get(getCachePuts(), putTime.get());
}
public void addRemoveTime(long value) {
removeTime.addAndGet(value);
}
@Override
public float getAverageRemoveTime() {
return get(getCachePuts(), removeTime.get());
}
}

@ -1,147 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.jcache.configuration;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.Configuration;
import javax.cache.configuration.Factory;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheWriter;
/**
* Configuration object for JCache {@link javax.cache.Cache}
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public class JCacheConfiguration<K, V> implements CompleteConfiguration<K, V> {
private static final long serialVersionUID = -7861479608049089078L;
private final ExpiryPolicy expiryPolicy;
private final MutableConfiguration<K, V> delegate;
public JCacheConfiguration(Configuration<K, V> configuration) {
this(configuration, configuration.getKeyType(), configuration.getValueType());
}
public JCacheConfiguration(Configuration<K, V> configuration, Class<K> keyType, Class<V> valueType) {
if (configuration != null) {
if (configuration instanceof CompleteConfiguration) {
delegate = new MutableConfiguration<K, V>((CompleteConfiguration<K, V>)configuration);
} else {
delegate = new MutableConfiguration<K, V>();
delegate.setStoreByValue(configuration.isStoreByValue());
delegate.setTypes(configuration.getKeyType(), configuration.getValueType());
}
} else {
delegate = new MutableConfiguration<K, V>();
}
this.expiryPolicy = delegate.getExpiryPolicyFactory().create();
}
@Override
public Class<K> getKeyType() {
if (delegate.getKeyType() == null) {
return (Class<K>) Object.class;
}
return delegate.getKeyType();
}
@Override
public Class<V> getValueType() {
if (delegate.getValueType() == null) {
return (Class<V>) Object.class;
}
return delegate.getValueType();
}
@Override
public boolean isStoreByValue() {
return delegate.isStoreByValue();
}
@Override
public boolean isReadThrough() {
return delegate.isReadThrough();
}
@Override
public boolean isWriteThrough() {
return delegate.isWriteThrough();
}
@Override
public boolean isStatisticsEnabled() {
return delegate.isStatisticsEnabled();
}
public void setStatisticsEnabled(boolean enabled) {
delegate.setStatisticsEnabled(enabled);
}
public void setManagementEnabled(boolean enabled) {
delegate.setManagementEnabled(enabled);
}
@Override
public boolean isManagementEnabled() {
return delegate.isManagementEnabled();
}
@Override
public Iterable<CacheEntryListenerConfiguration<K, V>> getCacheEntryListenerConfigurations() {
return delegate.getCacheEntryListenerConfigurations();
}
public void addCacheEntryListenerConfiguration(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
delegate.addCacheEntryListenerConfiguration(cacheEntryListenerConfiguration);
}
public void removeCacheEntryListenerConfiguration(
CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) {
delegate.removeCacheEntryListenerConfiguration(cacheEntryListenerConfiguration);
}
@Override
public Factory<CacheLoader<K, V>> getCacheLoaderFactory() {
return delegate.getCacheLoaderFactory();
}
@Override
public Factory<CacheWriter<? super K, ? super V>> getCacheWriterFactory() {
return delegate.getCacheWriterFactory();
}
@Override
public Factory<ExpiryPolicy> getExpiryPolicyFactory() {
return delegate.getExpiryPolicyFactory();
}
public ExpiryPolicy getExpiryPolicy() {
return expiryPolicy;
}
}

@ -1,37 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.net.URI;
public class URIBuilder {
public static URI create(String uri) {
String[] parts = uri.split(":");
if (parts.length-1 >= 3) {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
uri = "[" + uri.replace(":" + port, "") + "]:" + newPort;
} else {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
uri = uri.replace(":" + port, "") + ":" + newPort;
}
return URI.create("//" + uri);
}
}

@ -0,0 +1,89 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
/**
*
* @author Nikita Koksharov
*
*/
public class URLBuilder {
private static volatile boolean init = false;
static {
init();
}
public static void init() {
if (init) {
return;
}
init = true;
URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() {
@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
if ("redis".equals(protocol)) {
return new URLStreamHandler() {
@Override
protected URLConnection openConnection(URL u) throws IOException {
throw new UnsupportedOperationException();
};
@Override
protected boolean equals(URL u1, URL u2) {
return u1.toString().equals(u2.toString());
}
@Override
protected int hashCode(URL u) {
return u.toString().hashCode();
}
};
}
return null;
}
});
}
public static URL create(String url) {
try {
String[] parts = url.split(":");
if (parts.length-1 >= 3) {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
String host = url.replace(":" + port, "");
return new URL("redis://[" + host + "]:" + newPort);
} else {
String port = parts[parts.length-1];
String newPort = port.split("[^\\d]")[0];
String host = url.replace(":" + port, "");
return new URL("redis://" + host + ":" + newPort);
}
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
}
}

@ -48,17 +48,10 @@ public class AsyncSemaphore {
Thread.currentThread().interrupt();
}
}
public boolean tryAcquire() {
public int queueSize() {
synchronized (this) {
if (counter == 0) {
return false;
}
if (counter > 0) {
counter--;
return true;
}
throw new IllegalStateException();
return listeners.size();
}
}

@ -11,6 +11,13 @@ import org.redisson.api.RBlockingDeque;
public class RedissonBlockingDequeTest extends BaseTest {
@Test(timeout = 3000)
public void testShortPoll() throws InterruptedException {
RBlockingDeque<Integer> queue = redisson.getBlockingDeque("queue:pollany");
queue.pollLastAsync(500, TimeUnit.MILLISECONDS);
queue.pollFirstAsync(10, TimeUnit.MICROSECONDS);
}
@Test
public void testPollLastFromAny() throws InterruptedException {
final RBlockingDeque<Integer> queue1 = redisson.getBlockingDeque("deque:pollany");

@ -6,45 +6,70 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RBlockingFairQueue;
import org.redisson.api.RBlockingQueue;
public class RedissonBlockingFairQueueTest extends BaseTest {
@Test
public void testPollTimeout() throws InterruptedException {
int size = 100;
RBlockingFairQueue<String> queue = redisson.getBlockingFairQueue("test");
public void testFairness() throws InterruptedException {
int size = 10000;
RBlockingQueue<String> queue = redisson.getBlockingQueue("test");
CountDownLatch latch = new CountDownLatch(size);
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < size; i++) {
final int j = i;
Thread t = new Thread() {
public void run() {
AtomicInteger t1Counter = new AtomicInteger();
AtomicInteger t2Counter = new AtomicInteger();
Thread t1 = new Thread("test-thread1") {
public void run() {
while (true) {
try {
String value = queue.poll(1, TimeUnit.SECONDS);
assertThat(value).isEqualTo("" + j);
String a = queue.poll(1, TimeUnit.SECONDS);
if (a == null) {
break;
}
latch.countDown();
t1Counter.incrementAndGet();
} catch (InterruptedException e) {
}
};
}
};
threads.add(t);
}
for (Thread thread : threads) {
thread.start();
thread.join(5);
}
};
Thread t2 = new Thread("test-thread1") {
public void run() {
while (true) {
try {
String a = queue.poll(1, TimeUnit.SECONDS);
if (a == null) {
break;
}
Thread.sleep(5);
latch.countDown();
t2Counter.incrementAndGet();
} catch (InterruptedException e) {
}
}
};
};
for (int i = 0; i < size; i++) {
queue.add("" + i);
}
t1.start();
t2.start();
t2.join();
t1.join();
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
System.out.println("t1: " + t1Counter.get());
System.out.println("t2: " + t2Counter.get());
}
}

@ -52,6 +52,13 @@ public class RedissonBlockingQueueTest extends BaseTest {
redisson.shutdown();
}
@Test(timeout = 3000)
public void testShortPoll() throws InterruptedException {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("queue:pollany");
queue.poll(500, TimeUnit.MILLISECONDS);
queue.poll(10, TimeUnit.MICROSECONDS);
}
@Test
public void testPollReattach() throws InterruptedException, IOException, ExecutionException, TimeoutException {
RedisProcess runner = new RedisRunner()

@ -13,6 +13,14 @@ import org.redisson.api.RSemaphore;
public class RedissonSemaphoreTest extends BaseConcurrentTest {
@Test
public void testAcquireWithoutSetPermits() throws InterruptedException {
RSemaphore s = redisson.getSemaphore("test");
s.release();
s.release();
s.acquire(2);
}
@Test
public void testTrySetPermits() {
RSemaphore s = redisson.getSemaphore("test");

@ -43,6 +43,23 @@ public class RedissonSetTest extends BaseTest {
assertThat(set.removeRandom()).isIn(1, 2, 3);
assertThat(set.removeRandom()).isNull();
}
@Test
public void testRemoveRandomAmount() {
RSet<Integer> set = redisson.getSet("simple");
set.add(1);
set.add(2);
set.add(3);
set.add(4);
set.add(5);
set.add(6);
assertThat(set.removeRandom(3)).isSubsetOf(1, 2, 3, 4, 5, 6).hasSize(3);
assertThat(set.removeRandom(2)).isSubsetOf(1, 2, 3, 4, 5, 6).hasSize(2);
assertThat(set.removeRandom(1)).isSubsetOf(1, 2, 3, 4, 5, 6).hasSize(1);
assertThat(set.removeRandom(4)).isEmpty();
}
@Test
public void testRandom() {

@ -1,5 +1,6 @@
package org.redisson;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
@ -40,7 +41,7 @@ public class RedissonTest {
protected RedissonClient redisson;
protected static RedissonClient defaultRedisson;
@Test
public void testSmallPool() throws InterruptedException {
Config config = new Config();
@ -311,23 +312,40 @@ public class RedissonTest {
}
@Test
public void testSingleConfig() throws IOException {
public void testSingleConfigJSON() throws IOException {
RedissonClient r = BaseTest.createInstance();
String t = r.getConfig().toJSON();
Config c = Config.fromJSON(t);
assertThat(c.toJSON()).isEqualTo(t);
}
@Test
public void testSingleConfigYAML() throws IOException {
RedissonClient r = BaseTest.createInstance();
String t = r.getConfig().toYAML();
Config c = Config.fromYAML(t);
assertThat(c.toYAML()).isEqualTo(t);
}
@Test
public void testMasterSlaveConfig() throws IOException {
public void testMasterSlaveConfigJSON() throws IOException {
Config c2 = new Config();
c2.useMasterSlaveServers().setMasterAddress("123.1.1.1:1231").addSlaveAddress("82.12.47.12:1028");
String t = c2.toJSON();
Config c = Config.fromJSON(t);
assertThat(c.toJSON()).isEqualTo(t);
}
@Test
public void testMasterSlaveConfigYAML() throws IOException {
Config c2 = new Config();
c2.useMasterSlaveServers().setMasterAddress("123.1.1.1:1231").addSlaveAddress("82.12.47.12:1028");
String t = c2.toYAML();
Config c = Config.fromYAML(t);
assertThat(c.toYAML()).isEqualTo(t);
}
// @Test
public void testCluster() {
NodesGroup<ClusterNode> nodes = redisson.getClusterNodesGroup();

@ -28,6 +28,7 @@ public class ClusterNodesDecoderTest {
buf.writeBytes(src);
List<ClusterNodeInfo> nodes = decoder.decode(buf, null);
ClusterNodeInfo node = nodes.get(0);
Assert.assertEquals("192.168.234.129", node.getAddress().getHost());
Assert.assertEquals(7001, node.getAddress().getPort());
}

@ -1,81 +0,0 @@
package org.redisson.jcache;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.Caching;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.BaseTest;
public class JCacheTest extends BaseTest {
@Test
public void test() throws InterruptedException, IllegalArgumentException, URISyntaxException {
MutableConfiguration<String, String> config = new MutableConfiguration<>();
config.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 1)));
config.setStoreByValue(true);
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager(getClass().getResource("redisson-jcache.json").toURI(), null)
.createCache("test", config);
CountDownLatch latch = new CountDownLatch(1);
String key = "123";
ExpiredListener clientListener = new ExpiredListener(latch, key, "90");
MutableCacheEntryListenerConfiguration<String, String> listenerConfiguration =
new MutableCacheEntryListenerConfiguration<String, String>(FactoryBuilder.factoryOf(clientListener), null, true, true);
cache.registerCacheEntryListener(listenerConfiguration);
cache.put(key, "90");
Assert.assertNotNull(cache.get(key));
latch.await();
Assert.assertNull(cache.get(key));
}
public static class ExpiredListener implements CacheEntryExpiredListener<String, String>, Serializable {
private Object key;
private Object value;
private CountDownLatch latch;
public ExpiredListener(CountDownLatch latch, Object key, Object value) {
super();
this.latch = latch;
this.key = key;
this.value = value;
}
@Override
public void onExpired(Iterable<CacheEntryEvent<? extends String, ? extends String>> events)
throws CacheEntryListenerException {
CacheEntryEvent<? extends String, ? extends String> entry = events.iterator().next();
assertThat(entry.getKey()).isEqualTo(key);
assertThat(entry.getValue()).isEqualTo(value);
latch.countDown();
}
}
}
Loading…
Cancel
Save