Merge branch 'master' into 3.0.0

# Conflicts:
#	pom.xml
#	redisson-all/pom.xml
#	redisson-hibernate/pom.xml
#	redisson-hibernate/redisson-hibernate-4/pom.xml
#	redisson-hibernate/redisson-hibernate-5/pom.xml
#	redisson-hibernate/redisson-hibernate-52/pom.xml
#	redisson-hibernate/redisson-hibernate-53/pom.xml
#	redisson-spring-boot-starter/pom.xml
#	redisson-spring-data/pom.xml
#	redisson-spring-data/redisson-spring-data-16/pom.xml
#	redisson-spring-data/redisson-spring-data-17/pom.xml
#	redisson-spring-data/redisson-spring-data-18/pom.xml
#	redisson-spring-data/redisson-spring-data-20/pom.xml
#	redisson-tomcat/pom.xml
#	redisson-tomcat/redisson-tomcat-6/pom.xml
#	redisson-tomcat/redisson-tomcat-7/pom.xml
#	redisson-tomcat/redisson-tomcat-8/pom.xml
#	redisson-tomcat/redisson-tomcat-9/pom.xml
#	redisson/pom.xml
pull/1933/head
Nikita Koksharov 6 years ago
commit d940781561

@ -2,7 +2,45 @@ Redisson Releases History
================================
### Please Note: trunk is current development branch.
Please consider __[Redisson PRO](https://redisson.pro)__ version for advanced features and support by SLA.
Сonsider __[Redisson PRO](https://redisson.pro)__ version for advanced features and support by SLA.
### 27-Dec-2018 - versions 2.15.0 and 3.10.0 released
Feature - new __[Hibernate cache](https://github.com/redisson/redisson/tree/master/redisson-hibernate) implementation__
Feature - __Hibernate 5.3__ support
Feature - [TypedJsonJacksonCodec](https://github.com/redisson/redisson/blob/master/redisson/src/main/java/org/redisson/codec/TypedJsonJacksonCodec.java) added
Feature - `getCountDownLatch`, `getSemaphore`, `getPermitExpirableSemaphore`, `getFairLock` methods added to `RMap` object
Feature - `getCountDownLatch`, `getSemaphore`, `getPermitExpirableSemaphore`, `getFairLock` methods added to `RSet` object
Feature - `RTopic.countSubscribers` method added
Feature - `JndiRedissonFactory` and Tomcat `JndiRedissonSessionManager` added
Feature - Hibernate Region Factories with JNDI support
Feature - ability to use Environmental Variables in config files
Feature - Spring Data Redis 2.1.x support added
Feature - Spring Boot Starter 2.1.x support added
Feature - Spring Data Redis 2.0.x and 2.1.x integrations support `ReactiveRedisTemplate`
Feature - Support of [Different monitoring systems](https://github.com/redisson/redisson/wiki/14.-Integration-with-frameworks#1410-statistics-monitoring-jmx-and-other-systems)
Improvement - RGeo.radius methods use GEORADIUS_RO and GEORADIUSBYMEMBER_RO commands
Improvement - restored implementation of DnsAddressResolverGroupFactory
Improvement - RedisConnectionClosedException removed
Improvement - __default codec changed to FSTCodec__
Fixed - `RMap.getAll` throws `ClassCastException` during transaction execution
Fixed - `pingConnectionInterval` and `lock-watchdog-timeout` parameters added to `redisson.xsd`
Fixed - zRevRangeWithScores does not work properly in Spring RedisTemplate
Fixed - `CommandDecoder` throws `IndexOutOfBoundsException` if `pingConnectionInterval` param is used
Fixed - NPE in `CommandDecoder`
Fixed - error during channel initialization is not logged
Fixed - `RBitSet` object couldn't be used as nested object
Fixed - use `keyPrefix` for topic object used in Tomcat Session Manager
Fixed - unable connect to Redis on Android
Fixed - `RMapCache` element expiration doesn't work with map size = 1
Fixed - MOVED handling
Fixed - Pooled connection closed after MOVED redirection
Fixed - Master node shouldn't be shutdown on slave down event in Sentinel mode
Fixed - `RoundRobinLoadBalancer` doesn't distribute load equally if one of slave nodes failed
Fixed - Spring Session `keyPrefix` setting isn't used in session name
Fixed - failed Redis Master node is not shutdown properly
Fixed - Redisson shouldn't be shutdown in Spring Data RedissonConnectionFactory
Fixed - Redisson Spring Boot doesn't start properly without lettuce or jedis in classpath
Fixed - validation of filled out Redis node address in Config
### 21-Nov-2018 - versions 2.14.1 and 3.9.1 released
Feature - `takeFirstElements` and `takeLastElements` streaming methods added to `RBlockingDequeRx`

@ -1,13 +1,13 @@
Redisson: Redis based In-Memory Data Grid for Java.<br/> State of the Art Redis Java client
====
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.8.2) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)**
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.9.1) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)**
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
| Stable <br/> Release Version | Release Date | JDK Version<br/> compatibility | `CompletionStage` <br/> support | `ProjectReactor` version<br/> compatibility |
| ------------- | ------------- | ------------| -----------| -----------|
| 3.9.1 | 21.11.2018 | 1.8 - 11, Android | Yes | 3.x.x |
| 2.14.1 | 21.11.2018 | 1.6 - 11, Android | No | 2.0.8 |
| 3.10.0 | 27.12.2018 | 1.8 - 11, Android | Yes | 3.x.x |
| 2.15.0 | 27.12.2018 | 1.6 - 11, Android | No | 2.0.8 |
Features
@ -110,23 +110,23 @@ Quick start
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.9.1</version>
<version>3.10.0</version>
</dependency>
<!-- JDK 1.6+ compatible -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.14.1</version>
<version>2.15.0</version>
</dependency>
#### Gradle
// JDK 1.8+ compatible
compile 'org.redisson:redisson:3.9.1'
compile 'org.redisson:redisson:3.10.0'
// JDK 1.6+ compatible
compile 'org.redisson:redisson:2.14.1'
compile 'org.redisson:redisson:2.15.0'
#### Java
@ -153,11 +153,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService");
Downloads
===============================
[Redisson 3.9.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.9.1&e=jar),
[Redisson node 3.9.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.9.1&e=jar)
[Redisson 3.10.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.10.0&e=jar),
[Redisson node 3.10.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.10.0&e=jar)
[Redisson 2.14.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.14.1&e=jar),
[Redisson node 2.14.1](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.14.1&e=jar)
[Redisson 2.15.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.15.0&e=jar),
[Redisson node 2.15.0](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.15.0&e=jar)
FAQs
===============================

@ -67,5 +67,5 @@ spring.redis.sentinel.nodes=
spring.redis.redisson.config=classpath:redisson.yaml
```
### 3. Get access to Redisson through spring bean with `RedissonClient` interface
### 3. Use Redisson through spring bean with `RedissonClient` interface or `RedisTemplate`/`ReactiveRedisTemplate` objects

@ -28,22 +28,22 @@ Usage
<artifactId>redisson-spring-data-20</artifactId>
<!-- for Spring Data Redis v.2.1.x -->
<artifactId>redisson-spring-data-21</artifactId>
<version>3.9.1</version>
<version>3.10.0</version>
</dependency>
```
Gradle
```java
// for Spring Data Redis v.1.6.x
compile 'org.redisson:redisson-spring-data-16:3.9.1'
compile 'org.redisson:redisson-spring-data-16:3.10.0'
// for Spring Data Redis v.1.7.x
compile 'org.redisson:redisson-spring-data-17:3.9.1'
compile 'org.redisson:redisson-spring-data-17:3.10.0'
// for Spring Data Redis v.1.8.x
compile 'org.redisson:redisson-spring-data-18:3.9.1'
compile 'org.redisson:redisson-spring-data-18:3.10.0'
// for Spring Data Redis v.2.0.x
compile 'org.redisson:redisson-spring-data-20:3.9.1'
compile 'org.redisson:redisson-spring-data-20:3.10.0'
// for Spring Data Redis v.2.1.x
compile 'org.redisson:redisson-spring-data-21:3.9.1'
compile 'org.redisson:redisson-spring-data-21:3.10.0'
```
2. __For JDK 1.6+__
@ -58,18 +58,18 @@ Usage
<artifactId>redisson-spring-data-17</artifactId>
<!-- for Spring Data Redis v.1.8.x -->
<artifactId>redisson-spring-data-18</artifactId>
<version>2.14.1</version>
<version>2.15.0</version>
</dependency>
```
Gradle
```java
// for Spring Data Redis v.1.6.x
compile 'org.redisson:redisson-spring-data-16:2.14.1'
compile 'org.redisson:redisson-spring-data-16:2.15.0'
// for Spring Data Redis v.1.7.x
compile 'org.redisson:redisson-spring-data-17:2.14.1'
compile 'org.redisson:redisson-spring-data-17:2.15.0'
// for Spring Data Redis v.1.8.x
compile 'org.redisson:redisson-spring-data-18:2.14.1'
compile 'org.redisson:redisson-spring-data-18:2.15.0'
```

@ -66,25 +66,25 @@ Each RedissonSessionManager created per Web Application and thus creates own Red
**2** Copy two jars into `TOMCAT_BASE/lib` directory:
1. __For JDK 1.8+__
[redisson-all-3.9.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.9.1&e=jar)
[redisson-all-3.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.10.0&e=jar)
for Tomcat 6.x
[redisson-tomcat-6-3.9.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.9.1&e=jar)
[redisson-tomcat-6-3.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.10.0&e=jar)
for Tomcat 7.x
[redisson-tomcat-7-3.9.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.9.1&e=jar)
[redisson-tomcat-7-3.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.10.0&e=jar)
for Tomcat 8.x
[redisson-tomcat-8-3.9.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.9.1&e=jar)
[redisson-tomcat-8-3.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.10.0&e=jar)
for Tomcat 9.x
[redisson-tomcat-9-3.9.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-9&v=3.9.1&e=jar)
[redisson-tomcat-9-3.10.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-9&v=3.10.0&e=jar)
2. __For JDK 1.6+__
[redisson-all-2.14.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.14.1&e=jar)
[redisson-all-2.15.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.15.0&e=jar)
for Tomcat 6.x
[redisson-tomcat-6-2.14.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.14.1&e=jar)
[redisson-tomcat-6-2.15.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.15.0&e=jar)
for Tomcat 7.x
[redisson-tomcat-7-2.14.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.14.1&e=jar)
[redisson-tomcat-7-2.15.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.15.0&e=jar)
for Tomcat 8.x
[redisson-tomcat-8-2.14.1.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.14.1&e=jar)
[redisson-tomcat-8-2.15.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.15.0&e=jar)

@ -85,7 +85,6 @@ import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.remote.ResponseEntry;
import org.redisson.transaction.RedissonTransaction;
@ -102,7 +101,6 @@ import io.netty.util.internal.PlatformDependent;
public class Redisson implements RedissonClient {
static {
RedissonObjectFactory.warmUp();
RedissonReference.warmUp();
}
@ -657,7 +655,7 @@ public class Redisson implements RedissonClient {
@Override
public RLiveObjectService getLiveObjectService() {
return new RedissonLiveObjectService(this, liveObjectClassCache);
return new RedissonLiveObjectService(this, liveObjectClassCache, connectionManager.getCommandExecutor());
}
@Override

@ -233,13 +233,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOP_VALUE, params.toArray());
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BRPOP_VALUE, toSeconds(timeout, unit), queueNames);
}
@Override

@ -15,10 +15,8 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
@ -111,13 +109,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray());
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames);
}
@Override

@ -15,7 +15,6 @@
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -247,13 +246,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
*/
@Override
public RFuture<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(unit.toSeconds(timeout));
RFuture<V> takeFuture = commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray());
RFuture<V> takeFuture = commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BLPOP_VALUE, toSeconds(timeout, unit), queueNames);
return wrapTakeFuture(takeFuture);
}

@ -55,6 +55,10 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
private static final MultiDecoder<Map<Object, Object>> postitionDecoder = new ListMultiDecoder(new CodecDecoder(), new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoMapReplayDecoder());
private static final MultiDecoder<Map<Object, Object>> distanceDecoder = new ListMultiDecoder(new GeoDistanceDecoder(), new GeoMapReplayDecoder());
private static final RedisCommand<Map<Object, Object>> GEORADIUS_RO_DISTANCE = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", distanceDecoder);
private static final RedisCommand<Map<Object, Object>> GEORADIUS_RO_POS = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", postitionDecoder);
private static final RedisCommand<Map<Object, Object>> GEORADIUSBYMEMBER_RO_DISTANCE = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER_RO", distanceDecoder);
private static final RedisCommand<Map<Object, Object>> GEORADIUSBYMEMBER_RO_POS = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER_RO", postitionDecoder);
public RedissonGeo(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name, redisson);
@ -134,7 +138,9 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
params.add(encode(member));
}
MultiDecoder<Map<Object, Object>> decoder = new ListMultiDecoder(new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoPositionMapDecoder((List<Object>)Arrays.asList(members)));
MultiDecoder<Map<Object, Object>> decoder = new ListMultiDecoder(0, new GeoPositionDecoder(),
// new ObjectListReplayDecoder(ListMultiDecoder.RESET),
new GeoPositionMapDecoder((List<Object>)Arrays.asList(members)));
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOPOS", decoder);
return commandExecutor.readAsync(getName(), StringCodec.INSTANCE, command, params.toArray());
}
@ -189,8 +195,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, Double>> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude),
return commandExecutor.readAsync(getName(), codec, GEORADIUS_RO_DISTANCE, getName(), convert(longitude), convert(latitude),
radius, geoUnit, "WITHDIST");
}
@ -201,8 +206,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, Double>> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit, int count) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude),
return commandExecutor.readAsync(getName(), codec, GEORADIUS_RO_DISTANCE, getName(), convert(longitude), convert(latitude),
radius, geoUnit, "WITHDIST", "COUNT", count);
}
@ -213,8 +217,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, Double>> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude),
return commandExecutor.readAsync(getName(), codec, GEORADIUS_RO_DISTANCE, getName(), convert(longitude), convert(latitude),
radius, geoUnit, "WITHDIST", geoOrder);
}
@ -225,8 +228,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, Double>> radiusWithDistanceAsync(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude),
return commandExecutor.readAsync(getName(), codec, GEORADIUS_RO_DISTANCE, getName(), convert(longitude), convert(latitude),
radius, geoUnit, "WITHDIST", "COUNT", count, geoOrder);
}
@ -237,8 +239,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHCOORD");
return commandExecutor.readAsync(getName(), codec, GEORADIUS_RO_POS, getName(), convert(longitude), convert(latitude), radius, geoUnit, "WITHCOORD");
}
@Override
@ -248,8 +249,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit, int count) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude),
return commandExecutor.readAsync(getName(), codec, GEORADIUS_RO_POS, getName(), convert(longitude), convert(latitude),
radius, geoUnit, "WITHCOORD", "COUNT", count);
}
@ -260,8 +260,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude),
return commandExecutor.readAsync(getName(), codec, GEORADIUS_RO_POS, getName(), convert(longitude), convert(latitude),
radius, geoUnit, "WITHCOORD", geoOrder);
}
@ -272,8 +271,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> radiusWithPositionAsync(double longitude, double latitude, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUS_RO", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), convert(longitude), convert(latitude),
return commandExecutor.readAsync(getName(), codec, GEORADIUS_RO_POS, getName(), convert(longitude), convert(latitude),
radius, geoUnit, "WITHCOORD", "COUNT", count, geoOrder);
}
@ -324,8 +322,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, Double>> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit) {
RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER_RO", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), encode(member), radius, geoUnit, "WITHDIST");
return commandExecutor.readAsync(getName(), codec, GEORADIUSBYMEMBER_RO_DISTANCE, getName(), encode(member), radius, geoUnit, "WITHDIST");
}
@Override
@ -335,8 +332,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, Double>> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit, int count) {
RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER_RO", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), encode(member), radius, geoUnit, "WITHDIST", "COUNT", count);
return commandExecutor.readAsync(getName(), codec, GEORADIUSBYMEMBER_RO_DISTANCE, getName(), encode(member), radius, geoUnit, "WITHDIST", "COUNT", count);
}
@Override
@ -346,8 +342,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, Double>> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder) {
RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER_RO", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), encode(member), radius, geoUnit, "WITHDIST", geoOrder);
return commandExecutor.readAsync(getName(), codec, GEORADIUSBYMEMBER_RO_DISTANCE, getName(), encode(member), radius, geoUnit, "WITHDIST", geoOrder);
}
@Override
@ -357,8 +352,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, Double>> radiusWithDistanceAsync(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count) {
RedisCommand command = new RedisCommand("GEORADIUSBYMEMBER_RO", distanceDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), encode(member), radius, geoUnit, "WITHDIST", "COUNT", count, geoOrder);
return commandExecutor.readAsync(getName(), codec, GEORADIUSBYMEMBER_RO_DISTANCE, getName(), encode(member), radius, geoUnit, "WITHDIST", "COUNT", count, geoOrder);
}
@Override
@ -368,8 +362,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER_RO", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), encode(member), radius, geoUnit, "WITHCOORD");
return commandExecutor.readAsync(getName(), codec, GEORADIUSBYMEMBER_RO_POS, getName(), encode(member), radius, geoUnit, "WITHCOORD");
}
@Override
@ -379,8 +372,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit, int count) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER_RO", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), encode(member), radius, geoUnit, "WITHCOORD", "COUNT", count);
return commandExecutor.readAsync(getName(), codec, GEORADIUSBYMEMBER_RO_POS, getName(), encode(member), radius, geoUnit, "WITHCOORD", "COUNT", count);
}
@Override
@ -390,8 +382,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER_RO", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), encode(member), radius, geoUnit, "WITHCOORD", geoOrder);
return commandExecutor.readAsync(getName(), codec, GEORADIUSBYMEMBER_RO_POS, getName(), encode(member), radius, geoUnit, "WITHCOORD", geoOrder);
}
@Override
@ -401,8 +392,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
@Override
public RFuture<Map<V, GeoPosition>> radiusWithPositionAsync(V member, double radius, GeoUnit geoUnit, GeoOrder geoOrder, int count) {
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEORADIUSBYMEMBER_RO", postitionDecoder);
return commandExecutor.readAsync(getName(), codec, command, getName(), encode(member), radius, geoUnit, "WITHCOORD", "COUNT", count, geoOrder);
return commandExecutor.readAsync(getName(), codec, GEORADIUSBYMEMBER_RO_POS, getName(), encode(member), radius, geoUnit, "WITHCOORD", "COUNT", count, geoOrder);
}
@Override

@ -54,6 +54,7 @@ import org.redisson.api.annotation.RCascade;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RFieldAccessor;
import org.redisson.api.annotation.RId;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.liveobject.LiveObjectTemplate;
import org.redisson.liveobject.core.AccessorInterceptor;
import org.redisson.liveobject.core.FieldAccessorInterceptor;
@ -61,7 +62,6 @@ import org.redisson.liveobject.core.LiveObjectInterceptor;
import org.redisson.liveobject.core.RExpirableInterceptor;
import org.redisson.liveobject.core.RMapInterceptor;
import org.redisson.liveobject.core.RObjectInterceptor;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.liveobject.misc.AdvBeanCopy;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
@ -84,12 +84,12 @@ public class RedissonLiveObjectService implements RLiveObjectService {
private static final ConcurrentMap<Class<? extends Resolver>, Resolver<?, ?, ?>> providerCache = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<Class<?>, Class<?>> classCache;
private final RedissonClient redisson;
private final RedissonObjectBuilder objectBuilder;
private final CommandAsyncExecutor commandExecutor;
public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap<Class<?>, Class<?>> classCache) {
public RedissonLiveObjectService(RedissonClient redisson, ConcurrentMap<Class<?>, Class<?>> classCache, CommandAsyncExecutor commandExecutor) {
this.redisson = redisson;
this.classCache = classCache;
this.objectBuilder = new RedissonObjectBuilder(redisson);
this.commandExecutor = commandExecutor;
}
//TODO: Add ttl renewal functionality
@ -204,9 +204,9 @@ public class RedissonLiveObjectService implements RLiveObjectService {
continue;
}
RObject rObject = objectBuilder.createObject(id, detachedObject.getClass(), object.getClass(), field.getName());
RObject rObject = commandExecutor.getObjectBuilder().createObject(id, detachedObject.getClass(), object.getClass(), field.getName(), redisson);
if (rObject != null) {
objectBuilder.store(rObject, field.getName(), liveMap);
commandExecutor.getObjectBuilder().store(rObject, field.getName(), liveMap);
if (rObject instanceof SortedSet) {
((RSortedSet)rObject).trySetComparator(((SortedSet)object).comparator());
}
@ -653,7 +653,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.install(LiveObjectInterceptor.Getter.class,
LiveObjectInterceptor.Setter.class))
.to(new LiveObjectInterceptor(redisson, entityClass,
getRIdFieldName(entityClass))))
getRIdFieldName(entityClass), commandExecutor.getObjectBuilder())))
// .intercept(MethodDelegation.to(
// new LiveObjectInterceptor(redisson, codecProvider, entityClass,
// getRIdFieldName(entityClass)))
@ -696,8 +696,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
.and(ElementMatchers.isPublic()
.or(ElementMatchers.isProtected()))
)
.intercept(MethodDelegation.to(
new AccessorInterceptor(redisson, objectBuilder)))
.intercept(MethodDelegation.to(new AccessorInterceptor(redisson, commandExecutor.getObjectBuilder())))
.make().load(entityClass.getClassLoader(),
ClassLoadingStrategy.Default.WRAPPER)

@ -34,7 +34,6 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonObjectFactory;
import io.netty.buffer.ByteBuf;
@ -250,7 +249,7 @@ public abstract class RedissonObject implements RObject {
public ByteBuf encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
@ -265,7 +264,7 @@ public abstract class RedissonObject implements RObject {
public ByteBuf encodeMapKey(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}
@ -280,7 +279,7 @@ public abstract class RedissonObject implements RObject {
public ByteBuf encodeMapValue(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}

@ -60,7 +60,6 @@ import org.redisson.api.RTransactionReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
@ -98,7 +97,6 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final CommandReactiveService commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ReferenceCodecProvider codecProvider;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
@ -109,7 +107,6 @@ public class RedissonReactive implements RedissonReactiveClient {
connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandReactiveService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
codecProvider = config.getReferenceCodecProvider();
}
public EvictionScheduler getEvictionScheduler() {
@ -437,11 +434,6 @@ public class RedissonReactive implements RedissonReactiveClient {
return config;
}
@Override
public ReferenceCodecProvider getCodecProvider() {
return codecProvider;
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);

@ -56,7 +56,6 @@ import org.redisson.api.RTransactionRx;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
@ -95,7 +94,6 @@ public class RedissonRx implements RedissonRxClient {
protected final CommandRxExecutor commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ReferenceCodecProvider codecProvider;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
@ -106,7 +104,6 @@ public class RedissonRx implements RedissonRxClient {
connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandRxService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
codecProvider = config.getReferenceCodecProvider();
}
@Override
@ -404,11 +401,6 @@ public class RedissonRx implements RedissonRxClient {
return config;
}
@Override
public ReferenceCodecProvider getCodecProvider() {
return codecProvider;
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);

@ -156,13 +156,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<V> pollFirstFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, params.toArray());
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BZPOPMIN_VALUE, toSeconds(timeout, unit), queueNames);
}
@Override
@ -172,13 +166,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
@Override
public RFuture<V> pollLastFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(toSeconds(timeout, unit));
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, params.toArray());
return commandExecutor.pollFromAnyAsync(getName(), codec, RedisCommands.BZPOPMAX_VALUE, toSeconds(timeout, unit), queueNames);
}
@Override

@ -29,7 +29,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import io.netty.buffer.ByteBuf;
@ -257,7 +257,7 @@ public class RedissonScript implements RScript {
private ByteBuf encode(Object value, Codec codec) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}

@ -33,7 +33,6 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.pubsub.AsyncSemaphore;
@ -87,7 +86,7 @@ public class RedissonTopic implements RTopic {
protected ByteBuf encode(Object value) {
if (commandExecutor.isRedissonReferenceSupportEnabled()) {
RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
RedissonReference reference = commandExecutor.getObjectBuilder().toReference(value);
if (reference != null) {
value = reference;
}

@ -18,7 +18,6 @@ package org.redisson.api;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
/**
@ -636,13 +635,6 @@ public interface RedissonReactiveClient {
*/
Config getConfig();
/**
* Returns the CodecProvider instance
*
* @return CodecProvider object
*/
ReferenceCodecProvider getCodecProvider();
/**
* Get Redis nodes group for server operations
*

@ -16,7 +16,6 @@
package org.redisson.api;
import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
/**
@ -618,13 +617,6 @@ public interface RedissonRxClient {
*/
Config getConfig();
/**
* Returns the CodecProvider instance
*
* @return CodecProvider object
*/
ReferenceCodecProvider getCodecProvider();
/**
* Get Redis nodes group for server operations
*

@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -36,7 +37,7 @@ public class LRUCacheMap<K, V> extends AbstractCacheMap<K, V> {
private final AtomicLong index = new AtomicLong();
private final List<Collection<CachedValue<K, V>>> queues =
new ArrayList<Collection<CachedValue<K, V>>>(Runtime.getRuntime().availableProcessors()*2);
new ArrayList<Collection<CachedValue<K, V>>>();
public LRUCacheMap(int size, long timeToLiveInMillis, long maxIdleInMillis) {
super(size, timeToLiveInMillis, maxIdleInMillis);
@ -54,7 +55,7 @@ public class LRUCacheMap<K, V> extends AbstractCacheMap<K, V> {
}
private Collection<CachedValue<K, V>> getQueue(CachedValue<K, V> value) {
return queues.get(value.hashCode() % queues.size());
return queues.get(Math.abs(value.hashCode() % queues.size()));
}
@Override

@ -340,8 +340,12 @@ public class CommandDecoder extends ReplayingDecoder<State> {
state().addLevel(lastLevel);
}
state().incLevel();
decodeList(in, data, parts, ctx, size, respParts, skipConvertor);
state().decLevel();
if (state().isMakeCheckpoint()) {
if (lastLevel == state().getLastLevel() && lastLevel.isFull()) {
state().removeLastLevel();

@ -152,7 +152,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
} else if (result instanceof PubSubPatternMessage) {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
}
@ -182,7 +182,7 @@ public class CommandPubSubDecoder extends CommandDecoder {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
} else if (result instanceof PubSubPatternMessage) {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
} else {
@ -242,11 +242,11 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (parts.size() == 2 && "message".equals(parts.get(0))) {
byte[] channelName = (byte[]) parts.get(1);
return entries.get(new ChannelName(channelName)).getDecoder().getDecoder(parts.size(), state());
return getDecoder(parts, channelName);
}
if (parts.size() == 3 && "pmessage".equals(parts.get(0))) {
byte[] patternName = (byte[]) parts.get(1);
return entries.get(new ChannelName(patternName)).getDecoder().getDecoder(parts.size(), state());
return getDecoder(parts, patternName);
}
}
@ -257,4 +257,12 @@ public class CommandPubSubDecoder extends CommandDecoder {
return super.selectDecoder(data, parts);
}
private Decoder<Object> getDecoder(List<Object> parts, byte[] name) {
PubSubEntry entry = entries.get(new ChannelName(name));
if (entry != null) {
return entry.getDecoder().getDecoder(parts.size(), state());
}
return ByteArrayCodec.INSTANCE.getValueDecoder();
}
}

@ -54,6 +54,14 @@ public class State {
return levels.get(level);
}
public void incLevel() {
level++;
}
public void decLevel() {
level--;
}
public void addLevel(StateLevel stateLevel) {
if (levels == null) {
levels = new ArrayList<StateLevel>(2);

@ -115,12 +115,23 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
public ListMultiDecoder(MultiDecoder<?> ... decoders) {
this.decoders = decoders;
}
private Integer fixedIndex;
public ListMultiDecoder(Integer fixedIndex, MultiDecoder<?> ... decoders) {
this.fixedIndex = fixedIndex;
this.decoders = decoders;
}
@Override
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum == 0) {
NestedDecoderState s = getDecoder(state);
s.incIndex();
if (fixedIndex != null) {
s.setIndex(fixedIndex);
} else {
s.incIndex();
}
s.resetPartsIndex();
}
@ -152,7 +163,11 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
int index = s.getIndex();
index += s.incPartsIndex();
if (index == -1) {
if (fixedIndex != null && parts.isEmpty()) {
s.resetPartsIndex();
}
if (index == -1 || (fixedIndex != null && state.getLevel() == 0)) {
return decoders[decoders.length-1].decode(parts, state);
}

@ -30,6 +30,7 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.liveobject.core.RedissonObjectBuilder;
/**
*
@ -38,6 +39,8 @@ import org.redisson.connection.MasterSlaveEntry;
*/
public interface CommandAsyncExecutor {
RedissonObjectBuilder getObjectBuilder();
ConnectionManager getConnectionManager();
CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson);
@ -109,5 +112,7 @@ public interface CommandAsyncExecutor {
<T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object ... params);
<T, R> RFuture<R> readRandomAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);
<V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String ... queueNames);
}

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@ -28,6 +29,8 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException;
@ -65,9 +68,9 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource;
import org.redisson.connection.NodeSource.Redirect;
import org.redisson.liveobject.core.RedissonObjectBuilder;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,7 +78,6 @@ import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.ReferenceCountUtil;
@ -94,6 +96,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
static final Logger log = LoggerFactory.getLogger(CommandAsyncService.class);
final ConnectionManager connectionManager;
private RedissonObjectBuilder objectBuilder;
protected RedissonClient redisson;
protected RedissonReactiveClient redissonReactive;
protected RedissonRxClient redissonRx;
@ -142,7 +145,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private void enableRedissonReferenceSupport(Config config) {
Codec codec = config.getCodec();
ReferenceCodecProvider codecProvider = config.getReferenceCodecProvider();
objectBuilder = new RedissonObjectBuilder(config);
ReferenceCodecProvider codecProvider = objectBuilder.getReferenceCodecProvider();
codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec);
}
@ -1192,14 +1196,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private Object fromReference(Object res) {
if (objectBuilder == null) {
return res;
}
try {
if (redisson != null) {
return RedissonObjectFactory.fromReference(redisson, (RedissonReference) res);
return objectBuilder.fromReference(redisson, (RedissonReference) res);
}
if (redissonReactive != null) {
return RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) res);
return objectBuilder.fromReference(redissonReactive, (RedissonReference) res);
}
return RedissonObjectFactory.fromReference(redissonRx, (RedissonReference) res);
return objectBuilder.fromReference(redissonRx, (RedissonReference) res);
} catch (Exception exception) {
throw new IllegalStateException(exception);
}
@ -1223,4 +1231,62 @@ public class CommandAsyncService implements CommandAsyncExecutor {
details.setWriteFuture(future);
}
}
@Override
public RedissonObjectBuilder getObjectBuilder() {
return objectBuilder;
}
public <V> RFuture<V> pollFromAnyAsync(String name, Codec codec, RedisCommand<Object> command, long secondsTimeout, String ... queueNames) {
if (connectionManager.isClusterMode() && queueNames.length > 0) {
RPromise<V> result = new RedissonPromise<V>();
AtomicReference<Iterator<String>> ref = new AtomicReference<Iterator<String>>();
List<String> names = new ArrayList<String>();
names.add(name);
names.addAll(Arrays.asList(queueNames));
ref.set(names.iterator());
AtomicLong counter = new AtomicLong(secondsTimeout);
poll(name, codec, result, ref, names, counter, command);
return result;
} else {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(name);
for (Object queueName : queueNames) {
params.add(queueName);
}
params.add(secondsTimeout);
return writeAsync(name, codec, command, params.toArray());
}
}
private <V> void poll(final String name, final Codec codec, final RPromise<V> result, final AtomicReference<Iterator<String>> ref,
final List<String> names, final AtomicLong counter, final RedisCommand<Object> command) {
if (ref.get().hasNext()) {
String currentName = ref.get().next().toString();
RFuture<V> future = writeAsync(currentName, codec, command, currentName, 1);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
if (future.getNow() != null) {
result.trySuccess(future.getNow());
} else {
if (counter.decrementAndGet() == 0) {
result.trySuccess(null);
return;
}
poll(name, codec, result, ref, names, counter, command);
}
}
});
} else {
ref.set(names.iterator());
poll(name, codec, result, ref, names, counter, command);
}
}
}

@ -66,11 +66,6 @@ public class Config {
*/
private Codec codec;
/**
* For codec registry and look up. DefaultCodecProvider used by default
*/
private ReferenceCodecProvider referenceCodecProvider = new DefaultReferenceCodecProvider();
private ExecutorService executor;
/**
@ -115,7 +110,6 @@ public class Config {
setNettyThreads(oldConf.getNettyThreads());
setThreads(oldConf.getThreads());
setCodec(oldConf.getCodec());
setReferenceCodecProvider(oldConf.getReferenceCodecProvider());
setReferenceEnabled(oldConf.isReferenceEnabled());
setEventLoopGroup(oldConf.getEventLoopGroup());
setTransportMode(oldConf.getTransportMode());
@ -159,28 +153,6 @@ public class Config {
return codec;
}
/**
* Reference objects codec provider used for codec registry and look up.
* <code>org.redisson.codec.DefaultReferenceCodecProvider</code> used by default.
*
* @param codecProvider object
* @return config
* @see org.redisson.codec.ReferenceCodecProvider
*/
public Config setReferenceCodecProvider(ReferenceCodecProvider codecProvider) {
this.referenceCodecProvider = codecProvider;
return this;
}
/**
* Returns the CodecProvider instance
*
* @return CodecProvider
*/
public ReferenceCodecProvider getReferenceCodecProvider() {
return referenceCodecProvider;
}
/**
* Config option indicate whether Redisson Reference feature is enabled.
* <p>

@ -32,7 +32,6 @@ import org.redisson.client.codec.Codec;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
import org.redisson.misc.RedissonObjectFactory;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.FieldValue;
@ -76,7 +75,7 @@ public class AccessorInterceptor {
if (isGetter(method, fieldName)) {
Object result = liveMap.get(fieldName);
if (result == null) {
RObject ar = objectBuilder.createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), fieldType, fieldName);
RObject ar = objectBuilder.createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), fieldType, fieldName, redisson);
if (ar != null) {
objectBuilder.store(ar, fieldName, liveMap);
return ar;
@ -90,7 +89,7 @@ public class AccessorInterceptor {
return result;
}
if (result instanceof RedissonReference) {
return RedissonObjectFactory.fromReference(redisson, (RedissonReference) result);
return objectBuilder.fromReference(redisson, (RedissonReference) result);
}
return result;
}
@ -105,9 +104,10 @@ public class AccessorInterceptor {
Class<? extends Object> rEntity = liveObject.getClass().getSuperclass();
REntity anno = ClassUtils.getAnnotation(rEntity, REntity.class);
Codec codec = objectBuilder.getReferenceCodecProvider().getCodec(anno, rEntity, redisson.getConfig());
NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class)
.newInstance(redisson.getConfig().getReferenceCodecProvider().getCodec(anno, (Class) rEntity, redisson.getConfig()));
.newInstance(codec);
liveMap.fastPut(fieldName, new RedissonReference(rEntity,
ns.getName(rEntity, fieldType, getREntityIdFieldName(liveObject),
liveObject.getLiveObjectId())));
@ -119,7 +119,7 @@ public class AccessorInterceptor {
&& TransformationMode.ANNOTATION_BASED
.equals(ClassUtils.getAnnotation(me.getClass().getSuperclass(),
REntity.class).fieldTransformation())) {
RObject rObject = objectBuilder.createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), arg.getClass(), fieldName);
RObject rObject = objectBuilder.createObject(((RLiveObject) me).getLiveObjectId(), me.getClass().getSuperclass(), arg.getClass(), fieldName, redisson);
if (arg != null) {
if (rObject instanceof Collection) {
Collection<?> c = (Collection<?>) rObject;

@ -57,9 +57,9 @@ public class LiveObjectInterceptor {
private final NamingScheme namingScheme;
private Codec codec;
public LiveObjectInterceptor(RedissonClient redisson, Class<?> entityClass, String idFieldName) {
public LiveObjectInterceptor(RedissonClient redisson, Class<?> entityClass, String idFieldName, RedissonObjectBuilder objectBuilder) {
this.redisson = redisson;
this.codecProvider = redisson.getConfig().getReferenceCodecProvider();
this.codecProvider = objectBuilder.getReferenceCodecProvider();
this.originalClass = entityClass;
this.idFieldName = idFieldName;

@ -16,7 +16,11 @@
package org.redisson.liveobject.core;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -32,21 +36,30 @@ import org.redisson.RedissonBlockingDeque;
import org.redisson.RedissonBlockingQueue;
import org.redisson.RedissonDeque;
import org.redisson.RedissonList;
import org.redisson.RedissonLiveObjectService;
import org.redisson.RedissonMap;
import org.redisson.RedissonQueue;
import org.redisson.RedissonReference;
import org.redisson.RedissonSet;
import org.redisson.RedissonSortedSet;
import org.redisson.api.RLiveObject;
import org.redisson.api.RMap;
import org.redisson.api.RObject;
import org.redisson.api.RObjectReactive;
import org.redisson.api.RObjectRx;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RId;
import org.redisson.api.annotation.RObjectField;
import org.redisson.client.codec.Codec;
import org.redisson.codec.DefaultReferenceCodecProvider;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
import org.redisson.misc.RedissonObjectFactory;
import io.netty.util.internal.PlatformDependent;
@ -73,15 +86,37 @@ public class RedissonObjectBuilder {
supportedClassMapping.put(List.class, RedissonList.class);
}
private final RedissonClient redisson;
private final ReferenceCodecProvider codecProvider;
private final Config config;
public RedissonObjectBuilder(RedissonClient redisson) {
public static class CodecMethodRef {
Method defaultCodecMethod;
Method customCodecMethod;
Method get(boolean value) {
if (value) {
return defaultCodecMethod;
}
return customCodecMethod;
}
}
private static final Map<Class<?>, CodecMethodRef> references = new HashMap<Class<?>, CodecMethodRef>();
private final ReferenceCodecProvider codecProvider = new DefaultReferenceCodecProvider();
public RedissonObjectBuilder(Config config) {
super();
this.redisson = redisson;
this.codecProvider = redisson.getConfig().getReferenceCodecProvider();
this.config = config;
fillCodecMethods(references, RedissonClient.class, RObject.class);
fillCodecMethods(references, RedissonReactiveClient.class, RObjectReactive.class);
fillCodecMethods(references, RedissonRxClient.class, RObjectRx.class);
}
public ReferenceCodecProvider getReferenceCodecProvider() {
return codecProvider;
}
public void store(RObject ar, String fieldName, RMap<String, Object> liveMap) {
Codec codec = ar.getCodec();
if (codec != null) {
@ -91,23 +126,15 @@ public class RedissonObjectBuilder {
new RedissonReference(ar.getClass(), ar.getName(), codec));
}
public RObject createObject(Object id, Class<?> clazz, Class<?> fieldType, String fieldName) {
public RObject createObject(Object id, Class<?> clazz, Class<?> fieldType, String fieldName, RedissonClient redisson) {
Class<? extends RObject> mappedClass = getMappedClass(fieldType);
try {
if (mappedClass != null) {
Codec fieldCodec = getFieldCodec(clazz, mappedClass, fieldName);
NamingScheme fieldNamingScheme = getFieldNamingScheme(clazz, fieldName, fieldCodec);
String referenceName = fieldNamingScheme.getFieldReferenceName(clazz, id, mappedClass, fieldName, null);
RObject obj = RedissonObjectFactory
.createRObject(redisson,
mappedClass,
fieldNamingScheme.getFieldReferenceName(clazz,
id,
mappedClass,
fieldName,
null),
fieldCodec);
return obj;
return createRObject(redisson, mappedClass, referenceName, fieldCodec);
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
@ -122,10 +149,10 @@ public class RedissonObjectBuilder {
Field field = ClassUtils.getDeclaredField(rEntity, fieldName);
if (field.isAnnotationPresent(RObjectField.class)) {
RObjectField anno = field.getAnnotation(RObjectField.class);
return codecProvider.getCodec(anno, rEntity, rObjectClass, fieldName, redisson.getConfig());
return codecProvider.getCodec(anno, rEntity, rObjectClass, fieldName, config);
} else {
REntity anno = ClassUtils.getAnnotation(rEntity, REntity.class);
return codecProvider.getCodec(anno, (Class<?>) rEntity, redisson.getConfig());
return codecProvider.getCodec(anno, (Class<?>) rEntity, config);
}
}
@ -151,4 +178,138 @@ public class RedissonObjectBuilder {
return null;
}
private void fillCodecMethods(Map<Class<?>, CodecMethodRef> b, Class<?> clientClazz, Class<?> objectClazz) {
for (Method method : clientClazz.getDeclaredMethods()) {
if (!method.getReturnType().equals(Void.TYPE)
&& objectClazz.isAssignableFrom(method.getReturnType())
&& method.getName().startsWith("get")) {
Class<?> cls = method.getReturnType();
if (!b.containsKey(cls)) {
b.put(cls, new CodecMethodRef());
}
CodecMethodRef builder = b.get(cls);
if (method.getParameterTypes().length == 2 //first param is name, second param is codec.
&& Codec.class.isAssignableFrom(method.getParameterTypes()[1])) {
builder.customCodecMethod = method;
} else if (method.getParameterTypes().length == 1) {
builder.defaultCodecMethod = method;
}
}
}
}
public Object fromReference(RedissonClient redisson, RedissonReference rr) throws Exception {
Class<? extends Object> type = rr.getType();
if (type != null) {
if (ClassUtils.isAnnotationPresent(type, REntity.class)) {
RedissonLiveObjectService liveObjectService = (RedissonLiveObjectService) redisson.getLiveObjectService();
REntity anno = ClassUtils.getAnnotation(type, REntity.class);
NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class)
.newInstance(codecProvider.getCodec(anno, type, redisson.getConfig()));
Object id = ns.resolveId(rr.getKeyName());
return liveObjectService.createLiveObject(type, id);
}
}
return getObject(redisson, rr, type, codecProvider);
}
private Object getObject(Object redisson, RedissonReference rr, Class<? extends Object> type,
ReferenceCodecProvider codecProvider)
throws IllegalAccessException, InvocationTargetException, Exception, ClassNotFoundException {
if (type != null) {
CodecMethodRef b = references.get(type);
if (b == null && type.getInterfaces().length > 0) {
type = type.getInterfaces()[0];
}
b = references.get(type);
if (b != null) {
Method builder = b.get(isDefaultCodec(rr));
return (isDefaultCodec(rr)
? builder.invoke(redisson, rr.getKeyName())
: builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType())));
}
}
throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName());
}
private boolean isDefaultCodec(RedissonReference rr) {
return rr.getCodec() == null;
}
public Object fromReference(RedissonRxClient redisson, RedissonReference rr) throws Exception {
Class<? extends Object> type = rr.getReactiveType();
/**
* Live Object from reference in reactive client is not supported yet.
*/
return getObject(redisson, rr, type, codecProvider);
}
public Object fromReference(RedissonReactiveClient redisson, RedissonReference rr) throws Exception {
Class<? extends Object> type = rr.getReactiveType();
/**
* Live Object from reference in reactive client is not supported yet.
*/
return getObject(redisson, rr, type, codecProvider);
}
public RedissonReference toReference(Object object) {
if (object != null && ClassUtils.isAnnotationPresent(object.getClass(), REntity.class)) {
throw new IllegalArgumentException("REntity should be attached to Redisson before save");
}
if (object instanceof RObject && !(object instanceof RLiveObject)) {
Class<?> clazz = object.getClass().getInterfaces()[0];
RObject rObject = ((RObject) object);
if (rObject.getCodec() != null) {
codecProvider.registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
}
return new RedissonReference(clazz, rObject.getName(), rObject.getCodec());
}
if (object instanceof RObjectReactive && !(object instanceof RLiveObject)) {
Class<?> clazz = object.getClass().getInterfaces()[0];
RObjectReactive rObject = ((RObjectReactive) object);
if (rObject.getCodec() != null) {
codecProvider.registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
}
return new RedissonReference(clazz, rObject.getName(), rObject.getCodec());
}
try {
if (object instanceof RLiveObject) {
Class<? extends Object> rEntity = object.getClass().getSuperclass();
REntity anno = ClassUtils.getAnnotation(rEntity, REntity.class);
NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class)
.newInstance(codecProvider.getCodec(anno, (Class) rEntity, config));
String name = Introspectior
.getFieldsWithAnnotation(rEntity, RId.class)
.getOnly().getName();
Class<?> type = ClassUtils.getDeclaredField(rEntity, name).getType();
return new RedissonReference(rEntity,
ns.getName(rEntity, type, name, ((RLiveObject) object).getLiveObjectId()));
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
return null;
}
public <T extends RObject, K extends Codec> T createRObject(RedissonClient redisson, Class<T> expectedType, String name, K codec) throws Exception {
List<Class<?>> interfaces = Arrays.asList(expectedType.getInterfaces());
for (Class<?> iType : interfaces) {
if (references.containsKey(iType)) {// user cache to speed up things a little.
Method builder = references.get(iType).get(codec != null);
return (T) (codec != null
? builder.invoke(redisson, name)
: builder.invoke(redisson, name, codec));
}
}
throw new ClassNotFoundException("No RObject is found to match class type of " + (expectedType != null ? expectedType.getName() : "null") + " with codec type of " + (codec != null ? codec.getClass().getName() : "null"));
}
}

@ -1,224 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.RedissonLiveObjectService;
import org.redisson.RedissonReference;
import org.redisson.api.RLiveObject;
import org.redisson.api.RObject;
import org.redisson.api.RObjectReactive;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.annotation.REntity;
import org.redisson.api.annotation.RId;
import org.redisson.client.codec.Codec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.liveobject.misc.Introspectior;
import org.redisson.liveobject.resolver.NamingScheme;
/**
*
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedissonObjectFactory {
public static class RedissonObjectBuilder {
Method defaultCodecMethod;
Method customCodecMethod;
Method get(boolean value) {
if (value) {
return defaultCodecMethod;
}
return customCodecMethod;
}
}
private static final Map<Class<?>, RedissonObjectBuilder> builders;
static {
HashMap<Class<?>, RedissonObjectBuilder> b = new HashMap<Class<?>, RedissonObjectBuilder>();
for (Method method : RedissonClient.class.getDeclaredMethods()) {
if (!method.getReturnType().equals(Void.TYPE)
&& RObject.class.isAssignableFrom(method.getReturnType())
&& method.getName().startsWith("get")) {
Class<?> cls = method.getReturnType();
if (!b.containsKey(cls)) {
b.put(cls, new RedissonObjectBuilder());
}
RedissonObjectBuilder builder = b.get(cls);
if (method.getParameterTypes().length == 2 //first param is name, second param is codec.
&& Codec.class.isAssignableFrom(method.getParameterTypes()[1])) {
builder.customCodecMethod = method;
} else if (method.getParameterTypes().length == 1) {
builder.defaultCodecMethod = method;
}
}
}
for (Method method : RedissonReactiveClient.class.getDeclaredMethods()) {
if (!method.getReturnType().equals(Void.TYPE)
&& RObjectReactive.class.isAssignableFrom(method.getReturnType())
&& method.getName().startsWith("get")) {
Class<?> cls = method.getReturnType();
if (!b.containsKey(cls)) {
b.put(cls, new RedissonObjectBuilder());
}
RedissonObjectBuilder builder = b.get(cls);
if (method.getParameterTypes().length == 2 //first param is name, second param is codec.
&& Codec.class.isAssignableFrom(method.getParameterTypes()[1])) {
builder.customCodecMethod = method;
} else if (method.getParameterTypes().length == 1) {
builder.defaultCodecMethod = method;
}
}
}
builders = Collections.unmodifiableMap(b);
}
public static Object fromReference(RedissonClient redisson, RedissonReference rr) throws Exception {
Class<? extends Object> type = rr.getType();
ReferenceCodecProvider codecProvider = redisson.getConfig().getReferenceCodecProvider();
if (type != null) {
if (ClassUtils.isAnnotationPresent(type, REntity.class)) {
RedissonLiveObjectService liveObjectService = (RedissonLiveObjectService) redisson.getLiveObjectService();
REntity anno = ClassUtils.getAnnotation(type, REntity.class);
NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class)
.newInstance(codecProvider.getCodec(anno, type, redisson.getConfig()));
Object id = ns.resolveId(rr.getKeyName());
return liveObjectService.createLiveObject(type, id);
}
}
return getObject(redisson, rr, type, codecProvider);
}
protected static Object getObject(Object redisson, RedissonReference rr, Class<? extends Object> type,
ReferenceCodecProvider codecProvider)
throws IllegalAccessException, InvocationTargetException, Exception, ClassNotFoundException {
if (type != null) {
RedissonObjectBuilder b = builders.get(type);
if (b == null && type.getInterfaces().length > 0) {
type = type.getInterfaces()[0];
}
b = builders.get(type);
if (b != null) {
Method builder = b.get(isDefaultCodec(rr));
return (isDefaultCodec(rr)
? builder.invoke(redisson, rr.getKeyName())
: builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType())));
}
}
throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName());
}
private static boolean isDefaultCodec(RedissonReference rr) {
return rr.getCodec() == null;
}
public static Object fromReference(RedissonRxClient redisson, RedissonReference rr) throws Exception {
Class<? extends Object> type = rr.getReactiveType();
ReferenceCodecProvider codecProvider = redisson.getConfig().getReferenceCodecProvider();
/**
* Live Object from reference in reactive client is not supported yet.
*/
return getObject(redisson, rr, type, codecProvider);
}
public static Object fromReference(RedissonReactiveClient redisson, RedissonReference rr) throws Exception {
Class<? extends Object> type = rr.getReactiveType();
ReferenceCodecProvider codecProvider = redisson.getConfig().getReferenceCodecProvider();
/**
* Live Object from reference in reactive client is not supported yet.
*/
return getObject(redisson, rr, type, codecProvider);
}
public static RedissonReference toReference(Config config, Object object) {
if (object != null && ClassUtils.isAnnotationPresent(object.getClass(), REntity.class)) {
throw new IllegalArgumentException("REntity should be attached to Redisson before save");
}
if (object instanceof RObject && !(object instanceof RLiveObject)) {
Class<?> clazz = object.getClass().getInterfaces()[0];
RObject rObject = ((RObject) object);
if (rObject.getCodec() != null) {
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
}
return new RedissonReference(clazz, rObject.getName(), rObject.getCodec());
}
if (object instanceof RObjectReactive && !(object instanceof RLiveObject)) {
Class<?> clazz = object.getClass().getInterfaces()[0];
RObjectReactive rObject = ((RObjectReactive) object);
if (rObject.getCodec() != null) {
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
}
return new RedissonReference(clazz, rObject.getName(), rObject.getCodec());
}
try {
if (object instanceof RLiveObject) {
Class<? extends Object> rEntity = object.getClass().getSuperclass();
REntity anno = ClassUtils.getAnnotation(rEntity, REntity.class);
NamingScheme ns = anno.namingScheme()
.getDeclaredConstructor(Codec.class)
.newInstance(config.getReferenceCodecProvider().getCodec(anno, (Class) rEntity, config));
String name = Introspectior
.getFieldsWithAnnotation(rEntity, RId.class)
.getOnly().getName();
Class<?> type = ClassUtils.getDeclaredField(rEntity, name).getType();
return new RedissonReference(rEntity,
ns.getName(rEntity, type, name, ((RLiveObject) object).getLiveObjectId()));
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
return null;
}
public static <T extends RObject, K extends Codec> T createRObject(RedissonClient redisson, Class<T> expectedType, String name, K codec) throws Exception {
List<Class<?>> interfaces = Arrays.asList(expectedType.getInterfaces());
for (Class<?> iType : interfaces) {
if (builders.containsKey(iType)) {// user cache to speed up things a little.
Method builder = builders.get(iType).get(codec != null);
return (T) (codec != null
? builder.invoke(redisson, name)
: builder.invoke(redisson, name, codec));
}
}
throw new ClassNotFoundException("No RObject is found to match class type of " + (expectedType != null ? expectedType.getName() : "null") + " with codec type of " + (codec != null ? codec.getClass().getName() : "null"));
}
public static void warmUp() {}
}

@ -70,10 +70,17 @@ public class URIBuilder {
throw new IOException(e);
}
}
private static String trimIpv6Brackets(String host) {
if (host.startsWith("[") && host.endsWith("]")) {
return host.substring(1, host.length() - 1);
}
return host;
}
public static boolean compare(InetSocketAddress entryAddr, URI addr) {
if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(addr.getHost()))
|| entryAddr.getAddress().getHostAddress().equals(addr.getHost()))
if (((entryAddr.getHostName() != null && entryAddr.getHostName().equals(trimIpv6Brackets(addr.getHost())))
|| entryAddr.getAddress().getHostAddress().equals(trimIpv6Brackets(addr.getHost())))
&& entryAddr.getPort() == addr.getPort()) {
return true;
}

@ -17,6 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
@ -314,6 +315,52 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
redisson.shutdown();
}
@Test
public void testPollFromAnyInCluster() throws Exception {
RedisRunner master1 = new RedisRunner().port(6890).randomDir().nosave();
RedisRunner master2 = new RedisRunner().port(6891).randomDir().nosave();
RedisRunner master3 = new RedisRunner().port(6892).randomDir().nosave();
RedisRunner slave1 = new RedisRunner().port(6900).randomDir().nosave();
RedisRunner slave2 = new RedisRunner().port(6901).randomDir().nosave();
RedisRunner slave3 = new RedisRunner().port(6902).randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterProcesses process = clusterRunner.run();
Thread.sleep(5000);
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
try {
queue3.put(2);
queue1.put(1);
queue2.put(3);
} catch (InterruptedException e) {
Assert.fail();
}
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2");
Assert.assertEquals(2, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
redisson.shutdown();
process.shutdown();
}
@Test
public void testPollFromAny() throws InterruptedException {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");

@ -74,8 +74,46 @@ public class RedissonGeoTest extends BaseTest {
assertThat(geo.hash("Palermo", "Catania")).isEmpty();
}
@Test
public void testPos4() {
RGeo<String> geo = redisson.getGeo("test");
geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"), new GeoEntry(15.087269, 37.502669, "Catania"));
Map<String, GeoPosition> expected = new LinkedHashMap<String, GeoPosition>();
expected.put("Palermo", new GeoPosition(13.361389338970184, 38.115556395496299));
expected.put("Catania", new GeoPosition(15.087267458438873, 37.50266842333162));
assertThat(geo.pos("Palermo", "Catania")).isEqualTo(expected);
}
@Test
public void testPos1() {
RGeo<String> geo = redisson.getGeo("test");
geo.add(0.123,0.893,"hi");
Map<String, GeoPosition> res = geo.pos("hi");
assertThat(res.get("hi").getLatitude()).isNotNull();
assertThat(res.get("hi").getLongitude()).isNotNull();
}
@Test
public void testPos3() {
RGeo<String> geo = redisson.getGeo("test");
geo.add(0.123,0.893,"hi");
Map<String, GeoPosition> res = geo.pos("hi", "123f", "sdfdsf");
assertThat(res.get("hi").getLatitude()).isNotNull();
assertThat(res.get("hi").getLongitude()).isNotNull();
}
@Test
public void testPos2() {
RGeo<String> geo = redisson.getGeo("test");
geo.add(new GeoEntry(13.361389, 38.115556, "Palermo"));
Map<String, GeoPosition> expected = new LinkedHashMap<String, GeoPosition>();
expected.put("Palermo", new GeoPosition(13.361389338970184, 38.115556395496299));
assertThat(geo.pos("test2", "Palermo", "test3", "Catania", "test1")).isEqualTo(expected);
}
@Test
public void testPos() {
RGeo<String> geo = redisson.getGeo("test");

@ -19,7 +19,6 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.context.ConfigurableApplicationContext;
@ -122,7 +121,6 @@ public class SpringNamespaceWikiTest {
assertEquals(2, config.getNettyThreads());
assertSame(context.getBean("myCodec", Codec.class), config.getCodec());
assertEquals(false, config.isReferenceEnabled());
assertSame(context.getBean("myCodecProvider", ReferenceCodecProvider.class), config.getReferenceCodecProvider());
assertSame(context.getBean("myExecutor", Executor.class), config.getExecutor());
assertSame(context.getBean("myEventLoopGroup", EventLoopGroup.class), config.getEventLoopGroup());
Method method = Config.class.getDeclaredMethod("getSingleServerConfig", (Class<?>[]) null);

Loading…
Cancel
Save