Merge branch 'master' into 3.0.0

# Conflicts:
#	pom.xml
#	redisson-all/pom.xml
#	redisson-tomcat/pom.xml
#	redisson-tomcat/redisson-tomcat-6/pom.xml
#	redisson-tomcat/redisson-tomcat-7/pom.xml
#	redisson-tomcat/redisson-tomcat-8/pom.xml
#	redisson/pom.xml
pull/1303/head
Nikita 7 years ago
commit 9ccdb318a5

@ -4,6 +4,25 @@ Redisson Releases History
Try __[Redisson PRO](https://redisson.pro)__ version.
### 31-Oct-2017 - versions 2.10.5 and 3.5.5 released
`ProjectReactor` dependency for `3.5.5` version was updated to `3.1.1` version
Feature - Added pingConnection, keepAlive, tcpNoDelay settings
Feature - Slaves synchronization support for `RBatch`/`RBatchReactive` objects
Improvement - Data encoding should be executed on client thread only
Improvement - Handling Redis redirect optimization
Improvement - Better collection handling for RedissonReference (thanks to Rui Gu)
Fixed - `RedisLoadingException` handling during re-connection process
Fixed - `RedisClient` can't be shutdown properly
Fixed - timeout drift for `RFairLock`
Fixed - expiration handling of reentrant write lock
Fixed - `RReadWriteLock` doesn't work in cluster
Fixed - Blocking queues are't rethrow exceptions
Fixed - out of connections problem on high load during `RemoteExecutorService`/`ExecutorService` usage
Fixed - NPE during `RemoteService` object usage
Fixed - Getting memory leak warnings when gracefully shutting down tomcat
Fixed - `RMapCache.getAll` doesn't support large keySet
### 28-Sep-2017 - versions 2.10.4 and 3.5.4 released
Feature - added `maxSize` setting for `SpringCacheManager`
Feature - allow `LiveObjectService` to work with classes that inherit from REntities (thanks to @sdjacobs)

@ -4,10 +4,10 @@ Redisson: Redis based In-Memory Data Grid for Java.
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
| Stable <br/> Release Version | JDK Version<br/> compatibility | Release Date | `CompletionStage` <br/> support |
| ------------- | ------------- | ------------| -----------|
| 3.5.4 | 1.8+ | 28.09.2017 | Yes |
| 2.10.4 | 1.6, 1.7, 1.8 and Android | 28.09.2017 | No |
| Stable <br/> Release Version | JDK Version<br/> compatibility | Release Date | `CompletionStage` <br/> support | `ProjectReactor` version<br/> compatibility |
| ------------- | ------------- | ------------| -----------| -----------|
| 3.5.5 | 1.8+ | 31.10.2017 | Yes | 3.1.x |
| 2.10.5 | 1.6, 1.7, 1.8 and Android | 31.10.2017 | No | 2.0.8 |
Features
@ -87,23 +87,23 @@ Quick start
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.5.4</version>
<version>3.5.5</version>
</dependency>
<!-- JDK 1.6+ compatible -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.10.4</version>
<version>2.10.5</version>
</dependency>
#### Gradle
// JDK 1.8+ compatible
compile 'org.redisson:redisson:3.5.4'
compile 'org.redisson:redisson:3.5.5'
// JDK 1.6+ compatible
compile 'org.redisson:redisson:2.10.4'
compile 'org.redisson:redisson:2.10.5'
#### Java
@ -128,11 +128,11 @@ RExecutorService executor = redisson.getExecutorService("myExecutorService");
Downloads
===============================
[Redisson 3.5.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.5.4&e=jar),
[Redisson node 3.5.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.5.4&e=jar)
[Redisson 3.5.5](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.5.5&e=jar),
[Redisson node 3.5.5](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.5.5&e=jar)
[Redisson 2.10.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.10.4&e=jar),
[Redisson node 2.10.4](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.10.4&e=jar)
[Redisson 2.10.5](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=2.10.5&e=jar),
[Redisson node 2.10.5](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.10.5&e=jar)
### Supported by

@ -30,22 +30,21 @@ Usage
**2** Copy two jars into `TOMCAT_BASE/lib` directory:
1. __For JDK 1.8+__
[redisson-all-3.5.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.5.4&e=jar)
[redisson-all-3.5.5.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.5.5&e=jar)
for Tomcat 6.x
[redisson-tomcat-6-3.5.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.5.4&e=jar)
[redisson-tomcat-6-3.5.5.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.5.5&e=jar)
for Tomcat 7.x
[redisson-tomcat-7-3.5.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.5.4&e=jar)
[redisson-tomcat-7-3.5.5.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.5.5&e=jar)
for Tomcat 8.x
[redisson-tomcat-8-3.5.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.5.4&e=jar)
[redisson-tomcat-8-3.5.5.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.5.5&e=jar)
2. __For JDK 1.6+__
[redisson-all-2.10.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.10.4&e=jar)
[redisson-all-2.10.5.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.10.5&e=jar)
for Tomcat 6.x
[redisson-tomcat-6-2.10.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.10.4&e=jar)
[redisson-tomcat-6-2.10.5.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.10.5&e=jar)
for Tomcat 7.x
[redisson-tomcat-7-2.10.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.10.4&e=jar)
for Tomcat 8.x
[redisson-tomcat-8-2.10.4.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.10.4&e=jar)
[redisson-tomcat-7-2.10.5.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.10.5&e=jar)

@ -167,8 +167,10 @@ public class RedissonSession extends StandardSession {
newMap.put("session:isValid", isValid);
newMap.put("session:isNew", isNew);
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
if (attrs != null) {
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}
}
map.putAll(newMap);

@ -171,8 +171,10 @@ public class RedissonSession extends StandardSession {
newMap.put("session:isValid", isValid);
newMap.put("session:isNew", isNew);
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
if (attrs != null) {
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}
}
map.putAll(newMap);

@ -172,8 +172,10 @@ public class RedissonSession extends StandardSession {
newMap.put("session:isValid", isValid);
newMap.put("session:isNew", isNew);
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
if (attrs != null) {
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}
}
map.putAll(newMap);

@ -57,6 +57,11 @@
<artifactId>netty-transport</artifactId>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>

@ -107,7 +107,7 @@ public abstract class BaseRemoteService {
return "{remote_response}:" + executorId;
}
protected String getRequestQueueName(Class<?> remoteInterface) {
public String getRequestQueueName(Class<?> remoteInterface) {
return "{" + name + ":" + remoteInterface.getName() + "}";
}
@ -278,11 +278,8 @@ public abstract class BaseRemoteService {
RemoteServiceAck ack = future.getNow();
if (ack == null) {
RPromise<RemoteServiceAck> ackFuture = new RedissonPromise<RemoteServiceAck>();
responses.get(executorId).getResponses().put(request.getId(), ackFuture);
RFuture<RemoteServiceAck> ackFutureAttempt =
tryPollAckAgainAsync(optionsCopy, ackFuture, ackName);
tryPollAckAgainAsync(optionsCopy, ackName, request.getId(), responseName);
ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>() {
@Override
@ -428,16 +425,15 @@ public abstract class BaseRemoteService {
RPromise<RRemoteServiceResponse> promise = (RPromise<RRemoteServiceResponse>) entry.getResponses().remove(response.getId());
if (promise != null) {
promise.trySuccess(response);
}
if (!entry.getResponses().isEmpty()) {
responseQueue.takeAsync().addListener(this);
} else {
entry.getStarted().set(false);
if (!entry.getResponses().isEmpty()) {
responseQueue.takeAsync().addListener(this);
} else {
entry.getStarted().set(false);
if (!entry.getResponses().isEmpty()) {
pollTasks(entry, responseName);
}
pollTasks(entry, responseName);
}
}
}
});
@ -532,8 +528,8 @@ public abstract class BaseRemoteService {
return null;
}
private RFuture<RemoteServiceAck> tryPollAckAgainAsync(RemoteInvocationOptions optionsCopy,
final RPromise<RemoteServiceAck> pollFuture, String ackName)
private RFuture<RemoteServiceAck> tryPollAckAgainAsync(final RemoteInvocationOptions optionsCopy,
String ackName, final String requestId, final String responseName)
throws InterruptedException {
final RPromise<RemoteServiceAck> promise = new RedissonPromise<RemoteServiceAck>();
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(ackName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
@ -553,7 +549,8 @@ public abstract class BaseRemoteService {
}
if (future.getNow()) {
pollFuture.addListener(new FutureListener<RemoteServiceAck>() {
RPromise<RemoteServiceAck> ackFuture = poll(optionsCopy.getAckTimeoutInMillis(), requestId, responseName);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override
public void operationComplete(Future<RemoteServiceAck> future) throws Exception {
if (!future.isSuccess()) {

@ -39,8 +39,29 @@ public abstract class QueueTransferTask {
private static final Logger log = LoggerFactory.getLogger(QueueTransferTask.class);
public static class TimeoutTask {
private final long startTime;
private final Timeout task;
public TimeoutTask(long startTime, Timeout task) {
super();
this.startTime = startTime;
this.task = task;
}
public long getStartTime() {
return startTime;
}
public Timeout getTask() {
return task;
}
}
private int usage = 1;
private final AtomicReference<Timeout> timeoutReference = new AtomicReference<Timeout>();
private final AtomicReference<TimeoutTask> lastTimeout = new AtomicReference<TimeoutTask>();
private final ConnectionManager connectionManager;
public QueueTransferTask(ConnectionManager connectionManager) {
@ -84,14 +105,13 @@ public abstract class QueueTransferTask {
}
private void scheduleTask(final Long startTime) {
if (startTime == null) {
TimeoutTask oldTimeout = lastTimeout.get();
if (startTime == null || (oldTimeout != null && oldTimeout.getStartTime() < startTime)) {
return;
}
Timeout oldTimeout = timeoutReference.get();
if (oldTimeout != null) {
oldTimeout.cancel();
timeoutReference.compareAndSet(oldTimeout, null);
oldTimeout.getTask().cancel();
}
long delay = startTime - System.currentTimeMillis();
@ -100,9 +120,16 @@ public abstract class QueueTransferTask {
@Override
public void run(Timeout timeout) throws Exception {
pushTask();
TimeoutTask currentTimeout = lastTimeout.get();
if (currentTimeout.getTask() == timeout) {
lastTimeout.compareAndSet(currentTimeout, null);
}
}
}, delay, TimeUnit.MILLISECONDS);
timeoutReference.set(timeout);
if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
timeout.cancel();
}
} else {
pushTask();
}
@ -114,10 +141,6 @@ public abstract class QueueTransferTask {
private void pushTask() {
RFuture<Long> startTimeFuture = pushTaskAsync();
addListener(startTimeFuture);
}
private void addListener(RFuture<Long> startTimeFuture) {
startTimeFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {
@ -137,5 +160,4 @@ public abstract class QueueTransferTask {
});
}
}

@ -142,7 +142,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
this.executorId = redissonId + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}
requestQueueName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}";
remoteService = redisson.getRemoteService(name, codec);
requestQueueName = ((RedissonRemoteService)remoteService).getRequestQueueName(RemoteExecutorService.class);
String objectName = requestQueueName;
tasksCounterName = objectName + ":counter";
tasksName = objectName + ":tasks";
@ -157,7 +158,6 @@ public class RedissonExecutorService implements RScheduledExecutorService {
workersSemaphoreName = objectName + ":workers-semaphore";
workersCounterName = objectName + ":workers-counter";
remoteService = redisson.getRemoteService(name, codec);
workersTopic = redisson.getTopic(workersChannelName);
TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses);

@ -33,15 +33,15 @@ import org.redisson.client.codec.GeoEntryCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScoredCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.FlatNestedMultiDecoder;
import org.redisson.client.protocol.decoder.CodecDecoder;
import org.redisson.client.protocol.decoder.GeoDistanceDecoder;
import org.redisson.client.protocol.decoder.GeoMapReplayDecoder;
import org.redisson.client.protocol.decoder.GeoPositionDecoder;
import org.redisson.client.protocol.decoder.GeoPositionMapDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
@ -54,19 +54,15 @@ import org.redisson.connection.decoder.MapGetAllDecoder;
*/
public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V> {
MultiDecoder<Map<Object, Object>> postitionDecoder;
MultiDecoder<Map<Object, Object>> distanceDecoder;
private final MultiDecoder<Map<Object, Object>> postitionDecoder = new ListMultiDecoder(new CodecDecoder(), new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoMapReplayDecoder());
private final MultiDecoder<Map<Object, Object>> distanceDecoder = new ListMultiDecoder(new GeoDistanceDecoder(), new GeoMapReplayDecoder());
public RedissonGeo(CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(connectionManager, name, redisson);
postitionDecoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
distanceDecoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
}
public RedissonGeo(Codec codec, CommandAsyncExecutor connectionManager, String name, RedissonClient redisson) {
super(codec, connectionManager, name, redisson);
postitionDecoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
distanceDecoder = new FlatNestedMultiDecoder(new GeoDistanceDecoder(codec), new GeoMapReplayDecoder(), true);
}
@Override
@ -139,7 +135,7 @@ public class RedissonGeo<V> extends RedissonScoredSortedSet<V> implements RGeo<V
params.add(encode(member));
}
MultiDecoder<Map<Object, Object>> decoder = new NestedMultiDecoder(new GeoPositionDecoder(), new GeoPositionMapDecoder((List<Object>)Arrays.asList(members)), true);
MultiDecoder<Map<Object, Object>> decoder = new ListMultiDecoder(new GeoPositionDecoder(), new ObjectListReplayDecoder(ListMultiDecoder.RESET), new GeoPositionMapDecoder((List<Object>)Arrays.asList(members)));
RedisCommand<Map<Object, Object>> command = new RedisCommand<Map<Object, Object>>("GEOPOS", decoder);
return commandExecutor.readAsync(getName(), new ScoredCodec(codec), command, params.toArray());
}

@ -47,11 +47,11 @@ public class RedissonListMultimap<K, V> extends RedissonMultimap<K, V> implement
private static final RedisStrictCommand<Boolean> LLEN_VALUE = new RedisStrictCommand<Boolean>("LLEN", new BooleanAmountReplayConvertor());
RedissonListMultimap(UUID id, CommandAsyncExecutor connectionManager, String name) {
public RedissonListMultimap(UUID id, CommandAsyncExecutor connectionManager, String name) {
super(id, connectionManager, name);
}
RedissonListMultimap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name) {
public RedissonListMultimap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(id, codec, connectionManager, name);
}

@ -34,6 +34,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RHyperLogLogReactive;
import org.redisson.api.RKeysReactive;
import org.redisson.api.RLexSortedSetReactive;
import org.redisson.api.RListMultimapReactive;
import org.redisson.api.RListReactive;
import org.redisson.api.RLockReactive;
import org.redisson.api.RMapCacheReactive;
@ -45,6 +46,7 @@ import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
@ -66,6 +68,7 @@ import org.redisson.reactive.RedissonDequeReactive;
import org.redisson.reactive.RedissonHyperLogLogReactive;
import org.redisson.reactive.RedissonKeysReactive;
import org.redisson.reactive.RedissonLexSortedSetReactive;
import org.redisson.reactive.RedissonListMultimapReactive;
import org.redisson.reactive.RedissonListReactive;
import org.redisson.reactive.RedissonLockReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
@ -77,6 +80,7 @@ import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonScriptReactive;
import org.redisson.reactive.RedissonSemaphoreReactive;
import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetMultimapReactive;
import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonTopicReactive;
@ -179,6 +183,26 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonListReactive<V>(codec, commandExecutor, name);
}
@Override
public <K, V> RListMultimapReactive<K, V> getListMultimap(String name) {
return new RedissonListMultimapReactive<K, V>(id, commandExecutor, name);
}
@Override
public <K, V> RListMultimapReactive<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimapReactive<K, V>(id, codec, commandExecutor, name);
}
@Override
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name) {
return new RedissonSetMultimapReactive<K, V>(id, commandExecutor, name);
}
@Override
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimapReactive<K, V>(id, codec, commandExecutor, name);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name) {
return new RedissonMapReactive<K, V>(commandExecutor, name, null);

@ -50,11 +50,11 @@ public class RedissonSetMultimap<K, V> extends RedissonMultimap<K, V> implements
private static final RedisStrictCommand<Boolean> SCARD_VALUE = new RedisStrictCommand<Boolean>("SCARD", new BooleanAmountReplayConvertor());
private static final RedisCommand<Boolean> SISMEMBER_VALUE = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor());
RedissonSetMultimap(UUID id, CommandAsyncExecutor connectionManager, String name) {
public RedissonSetMultimap(UUID id, CommandAsyncExecutor connectionManager, String name) {
super(id, connectionManager, name);
}
RedissonSetMultimap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name) {
public RedissonSetMultimap(UUID id, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(id, codec, connectionManager, name);
}

@ -33,9 +33,10 @@ import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
@ -49,7 +50,7 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RedissonSetMultimapValues<V> extends RedissonExpirable implements RSet<V> {
private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE);
private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE);
private final RSet<V> set;
private final Object key;

@ -0,0 +1,82 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.List;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface RListMultimapReactive<K, V> extends RMultimapReactive<K, V> {
/**
* Returns a view List of the values associated with {@code key} in this
* multimap, if any. Note that when {@code containsKey(key)} is false, this
* returns an empty collection, not {@code null}.
*
* <p>Changes to the returned collection will update the underlying multimap,
* and vice versa.
*
* @param key - map key
* @return list of values
*/
RListReactive<V> get(K key);
/**
* Returns all elements at once. Result Set is <b>NOT</b> backed by map,
* so changes are not reflected in map.
*
* @param key - map key
* @return list of values
*/
Publisher<List<V>> getAll(K key);
/**
* Removes all values associated with the key {@code key}.
*
* <p>Once this method returns, {@code key} will not be mapped to any values
* <p>Use {@link RMultimapReactive#fastRemove} if values are not needed.</p>
*
* @param key - map key
* @return the values that were removed (possibly empty). The returned
* list <i>may</i> be modifiable, but updating it will have no
* effect on the multimap.
*/
Publisher<List<V>> removeAll(Object key);
/**
* Stores a collection of values with the same key, replacing any existing
* values for that key.
*
* <p>If {@code values} is empty, this is equivalent to
* {@link #removeAll(Object)}.
*
* @param key - map key
* @param values - map values
* @return list of replaced values, or an empty collection if no
* values were previously associated with the key. List
* <i>may</i> be modifiable, but updating it will have no effect on the
* multimap.
*/
Publisher<List<V>> replaceValues(K key, Iterable<? extends V> values);
}

@ -0,0 +1,139 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Set;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface RMultimapReactive<K, V> {
/**
* Returns the number of key-value pairs in this multimap.
*
* @return size of multimap
*/
Publisher<Integer> size();
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the key {@code key}.
*
* @param key - map key
* @return <code>true</code> if contains a key
*/
Publisher<Boolean> containsKey(Object key);
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the value {@code value}.
*
* @param value - map value
* @return <code>true</code> if contains a value
*/
Publisher<Boolean> containsValue(Object value);
/**
* Returns {@code true} if this multimap contains at least one key-value pair
* with the key {@code key} and the value {@code value}.
*
* @param key - map key
* @param value - map value
* @return <code>true</code> if contains an entry
*/
Publisher<Boolean> containsEntry(Object key, Object value);
/**
* Stores a key-value pair in this multimap.
*
* <p>Some multimap implementations allow duplicate key-value pairs, in which
* case {@code put} always adds a new key-value pair and increases the
* multimap size by 1. Other implementations prohibit duplicates, and storing
* a key-value pair that's already in the multimap has no effect.
*
* @param key - map key
* @param value - map value
* @return {@code true} if the method increased the size of the multimap, or
* {@code false} if the multimap already contained the key-value pair and
* doesn't allow duplicates
*/
Publisher<Boolean> put(K key, V value);
/**
* Removes a single key-value pair with the key {@code key} and the value
* {@code value} from this multimap, if such exists. If multiple key-value
* pairs in the multimap fit this description, which one is removed is
* unspecified.
*
* @param key - map key
* @param value - map value
* @return {@code true} if the multimap changed
*/
Publisher<Boolean> remove(Object key, Object value);
// Bulk Operations
/**
* Stores a key-value pair in this multimap for each of {@code values}, all
* using the same key, {@code key}. Equivalent to (but expected to be more
* efficient than): <pre> {@code
*
* for (V value : values) {
* put(key, value);
* }}</pre>
*
* <p>In particular, this is a no-op if {@code values} is empty.
*
* @param key - map key
* @param values - map values
* @return {@code true} if the multimap changed
*/
Publisher<Boolean> putAll(K key, Iterable<? extends V> values);
/**
* Returns the number of key-value pairs in this multimap.
*
* @return keys amount
*/
Publisher<Integer> keySize();
/**
* Removes <code>keys</code> from map by one operation
*
* Works faster than <code>RMultimap.remove</code> but not returning
* the value associated with <code>key</code>
*
* @param keys - map keys
* @return the number of keys that were removed from the hash, not including specified but non existing keys
*/
Publisher<Long> fastRemove(K ... keys);
/**
* Read all keys at once
*
* @return keys
*/
Publisher<Set<K>> readAllKeySet();
}

@ -0,0 +1,82 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.Set;
import org.reactivestreams.Publisher;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public interface RSetMultimapReactive<K, V> extends RMultimapReactive<K, V> {
/**
* Returns a view Set of the values associated with {@code key} in this
* multimap, if any. Note that when {@code containsKey(key)} is false, this
* returns an empty collection, not {@code null}.
*
* <p>Changes to the returned collection will update the underlying multimap,
* and vice versa.
*
* @param key - map key
* @return set of values
*/
RSetReactive<V> get(K key);
/**
* Returns all elements at once. Result Set is <b>NOT</b> backed by map,
* so changes are not reflected in map.
*
* @param key - map key
* @return set of values
*/
Publisher<Set<V>> getAll(K key);
/**
* Removes all values associated with the key {@code key}.
*
* <p>Once this method returns, {@code key} will not be mapped to any values
* <p>Use {@link RMultimapReactive#fastRemove} if values are not needed.</p>
*
* @param key - map key
* @return the values that were removed (possibly empty). The returned
* set <i>may</i> be modifiable, but updating it will have no
* effect on the multimap.
*/
Publisher<Set<V>> removeAll(Object key);
/**
* Stores a collection of values with the same key, replacing any existing
* values for that key.
*
* <p>If {@code values} is empty, this is equivalent to
* {@link #removeAll(Object)}.
*
* @param key - map key
* @param values - map values
* @return set of replaced values, or an empty collection if no
* values were previously associated with the key. Set
* <i>may</i> be modifiable, but updating it will have no effect on the
* multimap.
*/
Publisher<Set<V>> replaceValues(K key, Iterable<? extends V> values);
}

@ -208,6 +208,51 @@ public interface RedissonReactiveClient {
*/
<V> RListReactive<V> getList(String name, Codec codec);
/**
* Returns List based Multimap instance by name.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @return ListMultimap object
*/
<K, V> RListMultimapReactive<K, V> getListMultimap(String name);
/**
* Returns List based Multimap instance by name
* using provided codec for both map keys and values.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param codec - codec for keys and values
* @return RListMultimapReactive object
*/
<K, V> RListMultimapReactive<K, V> getListMultimap(String name, Codec codec);
/**
* Returns Set based Multimap instance by name.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @return SetMultimap object
*/
<K, V> RSetMultimapReactive<K, V> getSetMultimap(String name);
/**
* Returns Set based Multimap instance by name
* using provided codec for both map keys and values.
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of object
* @param codec - codec for keys and values
* @return SetMultimap object
*/
<K, V> RSetMultimapReactive<K, V> getSetMultimap(String name, Codec codec);
/**
* Returns map instance by name.
*

@ -34,7 +34,6 @@ import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.SlotsDecoder;
import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise;
@ -78,8 +77,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} else {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
if (cmd.getCommand().getReplayMultiDecoder() != null
&& (NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())
|| SlotsDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())
&& (SlotsDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())
|| ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) {
makeCheckpoint = false;
}
@ -92,7 +90,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (data == null) {
while (in.writerIndex() > in.readerIndex()) {
decode(in, null, null, ctx.channel());
decode(in, null, null, ctx.channel());
}
} else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
@ -330,7 +328,7 @@ public class CommandDecoder extends ReplayingDecoder<State> {
if (parts.isEmpty()) {
return null;
}
}
}
return data.getCommand().getReplayMultiDecoder();
}
@ -342,8 +340,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (parts != null) {
MultiDecoder<Object> multiDecoder = data.getCommand().getReplayMultiDecoder();
if (multiDecoder.isApplicable(parts.size(), state())) {
decoder = multiDecoder;
Decoder<Object> mDecoder = multiDecoder.getDecoder(parts.size(), state());
if (mDecoder != null) {
decoder = mDecoder;
}
}
if (decoder == null) {

@ -176,11 +176,11 @@ public class CommandPubSubDecoder extends CommandDecoder {
if (data == null && parts != null) {
if (parts.size() == 2 && "message".equals(parts.get(0))) {
String channelName = (String) parts.get(1);
return entries.get(channelName).getDecoder();
return entries.get(channelName).getDecoder().getDecoder(parts.size(), state());
}
if (parts.size() == 3 && "pmessage".equals(parts.get(0))) {
String patternName = (String) parts.get(1);
return entries.get(patternName).getDecoder();
return entries.get(patternName).getDecoder().getDecoder(parts.size(), state());
}
}
return super.selectDecoder(data, parts);

@ -42,13 +42,14 @@ import org.redisson.client.protocol.convertor.TypeConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ClusterNodesDecoder;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.ListResultReplayDecoder;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.Long2MultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectFirstScoreReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
@ -130,10 +131,10 @@ public interface RedisCommands {
RedisCommand<List<ScoredEntry<Object>>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new NestedMultiDecoder(new ScoredSortedSetScanDecoder<Object>(), new ScoredSortedSetScanReplayDecoder()), ValueType.OBJECT);
RedisCommand<ListScanResult<Object>> ZSCAN = new RedisCommand<ListScanResult<Object>>("ZSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ScoredSortedSetScanDecoder<Object>(), new ScoredSortedSetScanReplayDecoder()));
RedisStrictCommand<Double> ZINCRBY = new RedisStrictCommand<Double>("ZINCRBY", new DoubleReplayConvertor());
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new NestedMultiDecoder(new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisCommand<ListScanResult<String>> SCAN = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<String>(), new ListScanResultReplayDecoder()));
RedisStrictCommand<String> RANDOM_KEY = new RedisStrictCommand<String>("RANDOMKEY", new StringDataDecoder());
RedisStrictCommand<String> PING = new RedisStrictCommand<String>("PING");
RedisStrictCommand<Boolean> PING_BOOL = new RedisStrictCommand<Boolean>("PING", new BooleanNotNullReplayConvertor());
@ -152,9 +153,9 @@ public interface RedisCommands {
RedisCommand<Boolean> SMOVE = new RedisCommand<Boolean>("SMOVE", new BooleanReplayConvertor());
RedisCommand<Set<Object>> SMEMBERS = new RedisCommand<Set<Object>>("SMEMBERS", new ObjectSetReplayDecoder<Object>());
RedisCommand<Object> SRANDMEMBER_SINGLE = new RedisCommand<Object>("SRANDMEMBER");
RedisCommand<ListScanResult<Object>> SSCAN = new RedisCommand<ListScanResult<Object>>("SSCAN", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisCommand<ListScanResult<Object>> EVAL_ZSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new NestedMultiDecoder(new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()), ValueType.OBJECT);
RedisCommand<ListScanResult<Object>> SSCAN = new RedisCommand<ListScanResult<Object>>("SSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()));
RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()));
RedisCommand<ListScanResult<Object>> EVAL_ZSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder<Object>(), new ListScanResultReplayDecoder()));
RedisCommand<Boolean> SISMEMBER = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor());
RedisStrictCommand<Integer> SCARD_INT = new RedisStrictCommand<Integer>("SCARD", new IntegerReplayConvertor());
RedisStrictCommand<Long> SCARD = new RedisStrictCommand<Long>("SCARD");
@ -254,7 +255,7 @@ public interface RedisCommands {
RedisStrictCommand<Boolean> HSETNX = new RedisStrictCommand<Boolean>("HSETNX", new BooleanReplayConvertor());
RedisStrictCommand<Boolean> HSET = new RedisStrictCommand<Boolean>("HSET", new BooleanReplayConvertor());
RedisStrictCommand<Void> HSET_VOID = new RedisStrictCommand<Void>("HSET", new VoidReplayConvertor());
RedisCommand<MapScanResult<Object, Object>> HSCAN = new RedisCommand<MapScanResult<Object, Object>>("HSCAN", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
RedisCommand<MapScanResult<Object, Object>> HSCAN = new RedisCommand<MapScanResult<Object, Object>>("HSCAN", new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
RedisCommand<Map<Object, Object>> HGETALL = new RedisCommand<Map<Object, Object>>("HGETALL", new ObjectMapReplayDecoder(), ValueType.MAP);
RedisCommand<Set<Entry<Object, Object>>> HGETALL_ENTRY = new RedisCommand<Set<Entry<Object, Object>>>("HGETALL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
RedisCommand<List<Object>> HVALS = new RedisCommand<List<Object>>("HVALS", new ObjectListReplayDecoder<Object>(), ValueType.MAP_VALUE);
@ -309,13 +310,14 @@ public interface RedisCommands {
Set<String> PUBSUB_COMMANDS = new HashSet<String>(
Arrays.asList(PSUBSCRIBE.getName(), SUBSCRIBE.getName(), PUNSUBSCRIBE.getName(), UNSUBSCRIBE.getName()));
RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder());
RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder(false));
RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES_SSL = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder(true));
RedisCommand<Object> TIME = new RedisCommand<Object>("TIME", new LongListObjectDecoder());
RedisStrictCommand<Map<String, String>> CLUSTER_INFO = new RedisStrictCommand<Map<String, String>>("CLUSTER", "INFO", new StringMapDataDecoder());
RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisCommand<List<Map<String, String>>>("SENTINEL", "SLAVES",
new NestedMultiDecoder(new ObjectMapReplayDecoder(), new ListResultReplayDecoder(), true));
new ListMultiDecoder(new ObjectMapReplayDecoder(), new ObjectListReplayDecoder<String>(ListMultiDecoder.RESET), new ListResultReplayDecoder()));
RedisStrictCommand<Void> CLUSTER_ADDSLOTS = new RedisStrictCommand<Void>("CLUSTER", "ADDSLOTS");
RedisStrictCommand<Void> CLUSTER_REPLICATE = new RedisStrictCommand<Void>("CLUSTER", "REPLICATE");

@ -34,6 +34,13 @@ import io.netty.util.CharsetUtil;
*/
public class ClusterNodesDecoder implements Decoder<List<ClusterNodeInfo>> {
private final boolean ssl;
public ClusterNodesDecoder(boolean ssl) {
super();
this.ssl = ssl;
}
@Override
public List<ClusterNodeInfo> decode(ByteBuf buf, State state) throws IOException {
String response = buf.toString(CharsetUtil.UTF_8);
@ -46,7 +53,11 @@ public class ClusterNodesDecoder implements Decoder<List<ClusterNodeInfo>> {
String nodeId = params[0];
node.setNodeId(nodeId);
String addr = "redis://" + params[1].split("@")[0];
String protocol = "redis://";
if (ssl) {
protocol = "rediss://";
}
String addr = protocol + params[1].split("@")[0];
node.setAddress(addr);
String flags = params[2];

@ -15,36 +15,26 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
* @author Nikita Koksharov
*
* @param <T> object type
*/
public class FlatNestedMultiDecoder<T> extends NestedMultiDecoder {
public FlatNestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder secondDecoder, boolean handleEmpty) {
super(firstDecoder, secondDecoder, handleEmpty);
}
public class CodecDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return firstDecoder.decode(buf, state);
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public boolean isApplicable(int paramNum, State state) {
NestedDecoderState ds = getDecoder(state);
if (paramNum == 0) {
ds.resetDecoderIndex();
}
return firstDecoder.isApplicable(paramNum, state);
public Object decode(List<Object> parts, State state) {
return null;
}
}

@ -15,14 +15,11 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -31,29 +28,14 @@ import io.netty.buffer.ByteBuf;
*/
public class GeoDistanceDecoder implements MultiDecoder<List<Object>> {
private final ThreadLocal<Integer> pos = new ThreadLocal<Integer>();
private final Codec codec;
public GeoDistanceDecoder(Codec codec) {
super();
this.codec = codec;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
if (pos.get() % 2 == 0) {
return codec.getValueDecoder().decode(buf, state);
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state);
return null;
}
@Override
public boolean isApplicable(int paramNum, State state) {
pos.set(paramNum);
return true;
}
@Override
public List<Object> decode(List<Object> parts, State state) {
return parts;

@ -15,7 +15,6 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -23,8 +22,7 @@ import java.util.Map;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -33,27 +31,19 @@ import io.netty.buffer.ByteBuf;
*/
public class GeoDistanceMapDecoder implements MultiDecoder<Map<Object, Object>> {
private final ThreadLocal<Integer> pos = new ThreadLocal<Integer>();
private final Codec codec;
public GeoDistanceMapDecoder(Codec codec) {
super();
this.codec = codec;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
if (pos.get() % 2 == 0) {
return codec.getValueDecoder().decode(buf, state);
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 == 0) {
return codec.getValueDecoder();
}
return DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
pos.set(paramNum);
return true;
return DoubleCodec.INSTANCE.getValueDecoder();
}
@Override

@ -20,8 +20,7 @@ import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -31,10 +30,10 @@ import io.netty.buffer.ByteBuf;
public class GeoMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
public Decoder<Object> getDecoder(int paramNum, State state) {
return ListMultiDecoder.RESET;
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size());
@ -45,9 +44,4 @@ public class GeoMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
return result;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -15,14 +15,12 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.api.GeoPosition;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -32,23 +30,18 @@ import io.netty.buffer.ByteBuf;
public class GeoPositionDecoder implements MultiDecoder<GeoPosition> {
@Override
public Double decode(ByteBuf buf, State state) throws IOException {
return (Double) DoubleCodec.INSTANCE.getValueDecoder().decode(buf, state);
public Decoder<Object> getDecoder(int paramNum, State state) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
@Override
public GeoPosition decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
Double longitude = Double.valueOf(parts.get(0).toString());
Double latitude = Double.valueOf(parts.get(1).toString());
Double longitude = (Double)parts.get(0);
Double latitude = (Double)parts.get(1);
return new GeoPosition(longitude, latitude);
}

@ -15,15 +15,14 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -39,15 +38,10 @@ public class GeoPositionMapDecoder implements MultiDecoder<Map<Object, Object>>
}
@Override
public Double decode(ByteBuf buf, State state) throws IOException {
throw new UnsupportedOperationException();
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
@ -59,6 +53,10 @@ public class GeoPositionMapDecoder implements MultiDecoder<Map<Object, Object>>
if (value == null || value == Collections.emptyMap()) {
continue;
}
if (value instanceof List && ((List) value).isEmpty()) {
continue;
}
result.put(args.get(index), value);
}
return result;

@ -15,9 +15,11 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
@ -30,12 +32,20 @@ import io.netty.util.CharsetUtil;
public class KeyValueObjectDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(1);
return status;
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum == 0) {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(1);
return status;
}
};
}
return null;
}
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
@ -44,9 +54,4 @@ public class KeyValueObjectDecoder implements MultiDecoder<Object> {
return new KeyValueMessage(parts.get(0), parts.get(1));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}
}

@ -18,8 +18,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -28,11 +27,6 @@ import io.netty.buffer.ByteBuf;
*/
public class ListFirstObjectDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public Object decode(List<Object> parts, State state) {
if (!parts.isEmpty()) {
@ -42,8 +36,8 @@ public class ListFirstObjectDecoder implements MultiDecoder<Object> {
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -18,8 +18,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -28,19 +27,14 @@ import io.netty.buffer.ByteBuf;
*/
public class ListIteratorReplayDecoder implements MultiDecoder<ListIteratorResult<Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public ListIteratorResult<Object> decode(List<Object> parts, State state) {
return new ListIteratorResult<Object>(parts.get(0), Long.valueOf(parts.get(1).toString()));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -19,6 +19,7 @@ import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import io.netty.buffer.ByteBuf;
@ -30,6 +31,13 @@ import io.netty.buffer.ByteBuf;
*/
public class ListMultiDecoder<T> implements MultiDecoder<Object> {
public static final Decoder<Object> RESET = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return null;
}
};
private final MultiDecoder<?>[] decoders;
public static class NestedDecoderState implements DecoderState {
@ -45,6 +53,10 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
this.index = index;
}
public void resetIndex() {
index = 0;
}
public void resetPartsIndex() {
partsIndex = -1;
}
@ -90,26 +102,35 @@ public class ListMultiDecoder<T> implements MultiDecoder<Object> {
this.decoders = decoders;
}
public Object decode(ByteBuf buf, State state) throws IOException {
int index = getDecoder(state).getIndex();
return decoders[index].decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum == 0) {
NestedDecoderState s = getDecoder(state);
s.incIndex();
s.resetPartsIndex();
}
return true;
}
int index = getDecoder(state).getIndex();
Decoder<Object> decoder = decoders[index].getDecoder(paramNum, state);
if (decoder == RESET) {
NestedDecoderState s = getDecoder(state);
s.resetIndex();
int ind = s.getIndex();
return decoders[ind].getDecoder(paramNum, state);
}
return decoder;
}
@Override
public Object decode(List<Object> parts, State state) {
NestedDecoderState s = getDecoder(state);
int index = s.getIndex();
index += s.incPartsIndex();
if (index == -1) {
return decoders[decoders.length-1].decode(parts, state);
}
Object res = decoders[index].decode(parts, state);
if (res == null) {
index = s.incIndex() + s.getPartsIndex();

@ -19,10 +19,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.redisson.client.protocol.Decoder;
/**
*
@ -32,10 +31,10 @@ import io.netty.util.CharsetUtil;
public class ListResultReplayDecoder implements MultiDecoder<List<Map<Object, Object>>> {
@Override
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
public Decoder<Object> getDecoder(int paramNum, State state) {
return StringCodec.INSTANCE.getValueDecoder();
}
@Override
@SuppressWarnings("unchecked")
public List<Map<Object, Object>> decode(List<Object> parts, State state) {
@ -43,9 +42,4 @@ public class ListResultReplayDecoder implements MultiDecoder<List<Map<Object, Ob
return Arrays.asList(res);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -17,10 +17,9 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.redisson.client.protocol.Decoder;
/**
*
@ -30,18 +29,13 @@ import io.netty.util.CharsetUtil;
public class ListScanResultReplayDecoder implements MultiDecoder<ListScanResult<Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public ListScanResult<Object> decode(List<Object> parts, State state) {
return new ListScanResult<Object>((Long)parts.get(0), (List<Object>)parts.get(1));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -31,15 +29,10 @@ import io.netty.buffer.ByteBuf;
public class LongMultiDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return LongCodec.INSTANCE.getValueDecoder().decode(buf, state);
public Decoder<Object> getDecoder(int paramNum, State state) {
return LongCodec.INSTANCE.getValueDecoder();
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
@Override
public Object decode(List<Object> parts, State state) {
return null;

@ -15,13 +15,11 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -30,11 +28,6 @@ import io.netty.buffer.ByteBuf;
*/
public class MapCacheScanResultReplayDecoder implements MultiDecoder<MapCacheScanResult<Object, Object>> {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public MapCacheScanResult<Object, Object> decode(List<Object> parts, State state) {
Long pos = (Long)parts.get(0);
@ -44,8 +37,8 @@ public class MapCacheScanResultReplayDecoder implements MultiDecoder<MapCacheSca
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -19,9 +19,7 @@ import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.redisson.client.protocol.Decoder;
/**
*
@ -31,18 +29,13 @@ import io.netty.util.CharsetUtil;
public class MapScanResultReplayDecoder implements MultiDecoder<MapScanResult<Object, Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public MapScanResult<Object, Object> decode(List<Object> parts, State state) {
return new MapScanResult<Object, Object>((Long)parts.get(0), (Map<Object, Object>)parts.get(1));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}
}

@ -26,10 +26,10 @@ import org.redisson.client.protocol.Decoder;
*
* @param <T> type
*/
public interface MultiDecoder<T> extends Decoder<Object> {
boolean isApplicable(int paramNum, State state);
public interface MultiDecoder<T> {
Decoder<Object> getDecoder(int paramNum, State state);
T decode(List<Object> parts, State state);
}

@ -1,182 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
* @param <T> object type
*/
public class NestedMultiDecoder<T> implements MultiDecoder<Object> {
public static class NestedDecoderState implements DecoderState {
int decoderIndex;
int flipDecoderIndex;
public NestedDecoderState() {
}
public NestedDecoderState(int decoderIndex, int flipDecoderIndex) {
super();
this.decoderIndex = decoderIndex;
this.flipDecoderIndex = flipDecoderIndex;
}
public int getDecoderIndex() {
return decoderIndex;
}
public void resetDecoderIndex() {
decoderIndex = 0;
}
public void incDecoderIndex() {
decoderIndex++;
}
public int getFlipDecoderIndex() {
return flipDecoderIndex;
}
public void resetFlipDecoderIndex() {
flipDecoderIndex = 0;
}
public void incFlipDecoderIndex() {
flipDecoderIndex++;
}
@Override
public DecoderState copy() {
return new NestedDecoderState(decoderIndex, flipDecoderIndex);
}
@Override
public String toString() {
return "NestedDecoderState [decoderIndex=" + decoderIndex + ", flipDecoderIndex=" + flipDecoderIndex + "]";
}
}
protected final MultiDecoder<Object> firstDecoder;
protected final MultiDecoder<Object> secondDecoder;
private MultiDecoder<Object> thirdDecoder;
private boolean handleEmpty;
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
this(firstDecoder, secondDecoder, false);
}
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder, boolean handleEmpty) {
this(firstDecoder, secondDecoder, null, handleEmpty);
}
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder, MultiDecoder<Object> thirdDecoder) {
this(firstDecoder, secondDecoder, thirdDecoder, false);
}
public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder, MultiDecoder<Object> thirdDecoder, boolean handleEmpty) {
this.firstDecoder = firstDecoder;
this.secondDecoder = secondDecoder;
this.thirdDecoder = thirdDecoder;
this.handleEmpty = handleEmpty;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
NestedDecoderState ds = getDecoder(state);
MultiDecoder<?> decoder = null;
if (ds.getFlipDecoderIndex() == 2) {
decoder = firstDecoder;
}
if (ds.getFlipDecoderIndex() == 1) {
decoder = secondDecoder;
}
return decoder.decode(buf, state);
}
@Override
public boolean isApplicable(int paramNum, State state) {
NestedDecoderState ds = getDecoder(state);
if (paramNum == 0) {
ds.incFlipDecoderIndex();
ds.resetDecoderIndex();
}
// used only with thirdDecoder
if (ds.getFlipDecoderIndex() == 3) {
ds.resetFlipDecoderIndex();
ds.incFlipDecoderIndex();
}
MultiDecoder<?> decoder = null;
if (ds.getFlipDecoderIndex() == 2) {
decoder = firstDecoder;
}
if (ds.getFlipDecoderIndex() == 1) {
decoder = secondDecoder;
}
return decoder.isApplicable(paramNum, state);
}
protected final NestedDecoderState getDecoder(State state) {
NestedDecoderState ds = state.getDecoderState();
if (ds == null) {
ds = new NestedDecoderState();
state.setDecoderState(ds);
}
return ds;
}
@Override
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty() && state.getDecoderState() == null && handleEmpty) {
MultiDecoder<?> decoder = secondDecoder;
if (thirdDecoder != null) {
decoder = thirdDecoder;
}
return decoder.decode(parts, state);
}
NestedDecoderState ds = getDecoder(state);
if (parts.isEmpty()) {
ds.resetDecoderIndex();
}
ds.incDecoderIndex();
MultiDecoder<?> decoder = null;
if (ds.getDecoderIndex() == 1) {
decoder = firstDecoder;
}
if (ds.getDecoderIndex() == 2) {
decoder = secondDecoder;
}
if (ds.getDecoderIndex() == 3) {
decoder = thirdDecoder;
}
return decoder.decode(parts, state);
}
}

@ -18,8 +18,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -29,19 +28,14 @@ import io.netty.buffer.ByteBuf;
*/
public class ObjectFirstResultReplayDecoder<T> implements MultiDecoder<T> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public T decode(List<Object> parts, State state) {
return (T) parts.get(0);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.client.protocol.decoder;
import java.math.BigDecimal;
import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.redisson.client.protocol.Decoder;
/**
*
@ -32,18 +30,16 @@ import io.netty.util.CharsetUtil;
public class ObjectFirstScoreReplayDecoder implements MultiDecoder<Double> {
@Override
public Object decode(ByteBuf buf, State state) {
return new BigDecimal(buf.toString(CharsetUtil.UTF_8)).doubleValue();
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;
}
@Override
public Double decode(List<Object> parts, State state) {
return (Double) parts.get(1);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum % 2 != 0;
}
}

@ -15,13 +15,11 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -37,10 +35,10 @@ public class ObjectListDecoder<T> implements MultiDecoder<List<T>> {
super();
this.codec = codec;
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return codec.getMapKeyDecoder().decode(buf, state);
public Decoder<Object> getDecoder(int paramNum, State state) {
return codec.getMapKeyDecoder();
}
@Override
@ -48,9 +46,4 @@ public class ObjectListDecoder<T> implements MultiDecoder<List<T>> {
return (List<T>) parts;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -18,8 +18,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -29,9 +28,15 @@ import io.netty.buffer.ByteBuf;
*/
public class ObjectListReplayDecoder<T> implements MultiDecoder<List<T>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
private final Decoder<Object> decoder;
public ObjectListReplayDecoder() {
this(null);
}
public ObjectListReplayDecoder(Decoder<Object> decoder) {
super();
this.decoder = decoder;
}
@Override
@ -40,8 +45,7 @@ public class ObjectListReplayDecoder<T> implements MultiDecoder<List<T>> {
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return decoder;
}
}

@ -15,15 +15,13 @@
*/
package org.redisson.client.protocol.decoder;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.codec.Codec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -42,13 +40,13 @@ public class ObjectMapDecoder implements MultiDecoder<Map<Object, Object>> {
private int pos;
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
public Decoder<Object> getDecoder(int paramNum, State state) {
if (pos++ % 2 == 0) {
return codec.getMapKeyDecoder().decode(buf, state);
return codec.getMapKeyDecoder();
}
return codec.getMapValueDecoder().decode(buf, state);
return codec.getMapValueDecoder();
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
@ -60,9 +58,4 @@ public class ObjectMapDecoder implements MultiDecoder<Map<Object, Object>> {
return result;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -22,8 +22,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -33,8 +32,8 @@ import io.netty.buffer.ByteBuf;
public class ObjectMapEntryReplayDecoder implements MultiDecoder<Set<Entry<Object, Object>>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
@ -48,9 +47,4 @@ public class ObjectMapEntryReplayDecoder implements MultiDecoder<Set<Entry<Objec
return result.entrySet();
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
}

@ -20,8 +20,7 @@ import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -30,11 +29,6 @@ import io.netty.buffer.ByteBuf;
*/
public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public Map<Object, Object> decode(List<Object> parts, State state) {
Map<Object, Object> result = new LinkedHashMap<Object, Object>(parts.size()/2);
@ -47,8 +41,8 @@ public class ObjectMapReplayDecoder implements MultiDecoder<Map<Object, Object>>
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -20,8 +20,7 @@ import java.util.List;
import java.util.Set;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import org.redisson.client.protocol.Decoder;
/**
*
@ -31,19 +30,14 @@ import io.netty.buffer.ByteBuf;
*/
public class ObjectSetReplayDecoder<T> implements MultiDecoder<Set<T>> {
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public Set<T> decode(List<Object> parts, State state) {
return new LinkedHashSet(parts);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -15,16 +15,14 @@
*/
package org.redisson.client.protocol.decoder;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.ScoredEntry;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
@ -34,10 +32,13 @@ import io.netty.util.CharsetUtil;
public class ScoredSortedSetReplayDecoder<T> implements MultiDecoder<List<ScoredEntry<T>>> {
@Override
public Object decode(ByteBuf buf, State state) {
return new BigDecimal(buf.toString(CharsetUtil.UTF_8));
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return null;
}
@Override
public List<ScoredEntry<T>> decode(List<Object> parts, State state) {
List<ScoredEntry<T>> result = new ArrayList<ScoredEntry<T>>();
@ -47,9 +48,4 @@ public class ScoredSortedSetReplayDecoder<T> implements MultiDecoder<List<Scored
return result;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum % 2 != 0;
}
}

@ -15,12 +15,9 @@
*/
package org.redisson.client.protocol.decoder;
import java.math.BigDecimal;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.redisson.client.protocol.Decoder;
/**
*
@ -31,13 +28,11 @@ import io.netty.util.CharsetUtil;
public class ScoredSortedSetScanDecoder<T> extends ObjectListReplayDecoder<T> {
@Override
public Object decode(ByteBuf buf, State state) {
return new BigDecimal(buf.toString(CharsetUtil.UTF_8));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum % 2 != 0;
public Decoder<Object> getDecoder(int paramNum, State state) {
if (paramNum % 2 != 0) {
return DoubleCodec.INSTANCE.getValueDecoder();
}
return super.getDecoder(paramNum, state);
}
}

@ -18,9 +18,7 @@ package org.redisson.client.protocol.decoder;
import java.util.List;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.redisson.client.protocol.Decoder;
/**
*
@ -30,10 +28,10 @@ import io.netty.util.CharsetUtil;
public class ScoredSortedSetScanReplayDecoder implements MultiDecoder<ListScanResult<Object>> {
@Override
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public ListScanResult<Object> decode(List<Object> parts, State state) {
List<Object> values = (List<Object>)parts.get(1);
@ -43,9 +41,4 @@ public class ScoredSortedSetScanReplayDecoder implements MultiDecoder<ListScanRe
return new ListScanResult<Object>((Long)parts.get(0), values);
}
@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}
}

@ -22,12 +22,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.cluster.ClusterSlotRange;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
/**
*
* @author Nikita Koksharov
@ -36,10 +35,10 @@ import io.netty.util.CharsetUtil;
public class SlotsDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
public Decoder<Object> getDecoder(int paramNum, State state) {
return StringCodec.INSTANCE.getValueDecoder();
}
@Override
public Object decode(List<Object> parts, State state) {
if (parts.size() > 2 && parts.get(0) instanceof List) {
@ -62,9 +61,4 @@ public class SlotsDecoder implements MultiDecoder<Object> {
return parts;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -18,10 +18,9 @@ package org.redisson.client.protocol.decoder;
import java.util.Arrays;
import java.util.List;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import org.redisson.client.protocol.Decoder;
/**
*
@ -31,18 +30,13 @@ import io.netty.util.CharsetUtil;
public class StringListReplayDecoder implements MultiDecoder<List<String>> {
@Override
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
public Decoder<Object> getDecoder(int paramNum, State state) {
return StringCodec.INSTANCE.getValueDecoder();
}
@Override
public List<String> decode(List<Object> parts, State state) {
return Arrays.asList(Arrays.copyOf(parts.toArray(), parts.size(), String[].class));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -15,15 +15,12 @@
*/
package org.redisson.client.protocol.pubsub;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
public class PubSubMessageDecoder implements MultiDecoder<Object> {
private final Decoder<Object> decoder;
@ -34,18 +31,13 @@ public class PubSubMessageDecoder implements MultiDecoder<Object> {
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return decoder.decode(buf, null);
public Decoder<Object> getDecoder(int paramNum, State state) {
return decoder;
}
@Override
public PubSubMessage decode(List<Object> parts, State state) {
return new PubSubMessage(parts.get(1).toString(), parts.get(2));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -15,15 +15,12 @@
*/
package org.redisson.client.protocol.pubsub;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
public class PubSubPatternMessageDecoder implements MultiDecoder<Object> {
private final Decoder<Object> decoder;
@ -34,18 +31,13 @@ public class PubSubPatternMessageDecoder implements MultiDecoder<Object> {
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return decoder.decode(buf, null);
public Decoder<Object> getDecoder(int paramNum, State state) {
return decoder;
}
@Override
public PubSubPatternMessage decode(List<Object> parts, State state) {
return new PubSubPatternMessage(parts.get(1).toString(), parts.get(2).toString(), parts.get(3));
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -15,9 +15,11 @@
*/
package org.redisson.client.protocol.pubsub;
import java.io.IOException;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
@ -26,20 +28,21 @@ import io.netty.util.CharsetUtil;
public class PubSubStatusDecoder implements MultiDecoder<Object> {
@Override
public Object decode(ByteBuf buf, State state) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
public Decoder<Object> getDecoder(int paramNum, State state) {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
}
};
}
@Override
public PubSubStatusMessage decode(List<Object> parts, State state) {
return new PubSubStatusMessage(PubSubType.valueOf(parts.get(0).toString().toUpperCase()), parts.get(1).toString());
}
@Override
public boolean isApplicable(int paramNum, State state) {
return true;
}
}

@ -39,6 +39,7 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.cluster.ClusterPartition.Type;
import org.redisson.config.ClusterServersConfig;
@ -76,6 +77,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private volatile URI lastClusterNode;
private RedisStrictCommand<List<ClusterNodeInfo>> clusterNodesCommand;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
super(config);
@ -88,7 +91,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RFuture<RedisConnection> connectionFuture = connect(cfg, addr);
try {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
List<ClusterNodeInfo> nodes = connection.sync(RedisCommands.CLUSTER_NODES);
clusterNodesCommand = RedisCommands.CLUSTER_NODES;
if ("rediss".equals(addr.getScheme())) {
clusterNodesCommand = RedisCommands.CLUSTER_NODES_SSL;
}
List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);
StringBuilder nodesValue = new StringBuilder();
for (ClusterNodeInfo clusterNodeInfo : nodes) {
@ -355,7 +364,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator, final URI uri) {
RFuture<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
RFuture<List<ClusterNodeInfo>> future = connection.async(clusterNodesCommand);
future.addListener(new FutureListener<List<ClusterNodeInfo>>() {
@Override
public void operationComplete(Future<List<ClusterNodeInfo>> future) throws Exception {

@ -70,7 +70,12 @@ public class SnappyCodec implements Codec {
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
snappyDecoder.get().decode(buf, out);
while (buf.isReadable()) {
int chunkSize = buf.readInt();
ByteBuf chunk = buf.readSlice(chunkSize);
snappyDecoder.get().decode(chunk, out);
snappyDecoder.get().reset();
}
return innerCodec.getValueDecoder().decode(out, state);
} finally {
snappyDecoder.get().reset();
@ -84,9 +89,19 @@ public class SnappyCodec implements Codec {
@Override
public ByteBuf encode(Object in) throws IOException {
ByteBuf buf = innerCodec.getValueEncoder().encode(in);
ByteBuf out = ByteBufAllocator.DEFAULT.buffer(buf.readableBytes() + 128);
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
snappyEncoder.get().encode(buf, out, buf.readableBytes());
int chunksAmount = (int)Math.ceil(buf.readableBytes() / (double)Short.MAX_VALUE);
for (int i = 1; i <= chunksAmount; i++) {
int chunkSize = Math.min(Short.MAX_VALUE, buf.readableBytes());
ByteBuf chunk = buf.readSlice(chunkSize);
int lenIndex = out.writerIndex();
out.writeInt(0);
snappyEncoder.get().encode(chunk, out, chunk.readableBytes());
int compressedDataLength = out.writerIndex() - 4 - lenIndex;
out.setInt(lenIndex, compressedDataLength);
}
return out;
} finally {
buf.release();
@ -124,5 +139,5 @@ public class SnappyCodec implements Codec {
public Encoder getValueEncoder() {
return encoder;
}
}

@ -16,20 +16,26 @@
package org.redisson.connection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.client.RedisConnectionException;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
/**
@ -42,6 +48,8 @@ public class DNSMonitor {
private static final Logger log = LoggerFactory.getLogger(DNSMonitor.class);
private DnsAddressResolverGroup resolverGroup = new DnsAddressResolverGroup(NioDatagramChannel.class, DefaultDnsServerAddressStreamProvider.INSTANCE);
private ScheduledFuture<?> dnsMonitorFuture;
private ConnectionManager connectionManager;
@ -52,19 +60,16 @@ public class DNSMonitor {
private long dnsMonitoringInterval;
public DNSMonitor(ConnectionManager connectionManager, Set<URI> masterHosts, Set<URI> slaveHosts, long dnsMonitoringInterval) {
AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
for (URI host : masterHosts) {
try {
masters.put(host, InetAddress.getByName(host.getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host: " + host.getHost(), e);
}
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0));
resolveFuture.syncUninterruptibly();
masters.put(host, resolveFuture.getNow().getAddress());
}
for (URI host : slaveHosts) {
try {
slaves.put(host, InetAddress.getByName(host.getHost()));
} catch (UnknownHostException e) {
throw new RedisConnectionException("Unknown host: " + host.getHost(), e);
}
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0));
resolveFuture.syncUninterruptibly();
slaves.put(host, resolveFuture.getNow().getAddress());
}
this.connectionManager = connectionManager;
this.dnsMonitoringInterval = dnsMonitoringInterval;
@ -85,50 +90,69 @@ public class DNSMonitor {
dnsMonitorFuture = connectionManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
// As InetAddress.getByName call is blocking. Method should be run in dedicated thread
connectionManager.getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
for (Entry<URI, InetAddress> entry : masters.entrySet()) {
InetAddress master = entry.getValue();
InetAddress now = InetAddress.getByName(entry.getKey().getHost());
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), master.getHostAddress(), now.getHostAddress());
for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) {
if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost())
&& entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) {
entrySet.changeMaster(entry.getKey());
}
final AddressResolver<InetSocketAddress> resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size());
for (final Entry<URI, InetAddress> entry : masters.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
return;
}
InetAddress master = entry.getValue();
InetAddress now = future.get().getAddress();
if (!now.getHostAddress().equals(master.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), master.getHostAddress(), now.getHostAddress());
for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) {
if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost())
&& entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) {
entrySet.changeMaster(entry.getKey());
}
masters.put(entry.getKey(), now);
log.info("Master {} has been changed", entry.getKey().getHost());
}
masters.put(entry.getKey(), now);
log.info("Master {} has been changed", entry.getKey().getHost());
}
}
});
}
for (final Entry<URI, InetAddress> entry : slaves.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
return;
}
for (Entry<URI, InetAddress> entry : slaves.entrySet()) {
InetAddress slave = entry.getValue();
InetAddress updatedSlave = InetAddress.getByName(entry.getKey().getHost());
if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress());
for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
URI uri = URIBuilder.create("redis://" + slave.getHostAddress() + ":" + entry.getKey().getPort());
if (masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER)) {
masterSlaveEntry.slaveUp(entry.getKey(), FreezeReason.MANAGER);
}
InetAddress slave = entry.getValue();
InetAddress updatedSlave = future.get().getAddress();
if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) {
log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress());
for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort());
if (masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER)) {
masterSlaveEntry.slaveUp(entry.getKey(), FreezeReason.MANAGER);
}
slaves.put(entry.getKey(), updatedSlave);
log.info("Slave {} has been changed", entry.getKey().getHost());
}
slaves.put(entry.getKey(), updatedSlave);
log.info("Slave {} has been changed", entry.getKey().getHost());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
monitorDnsChange();
}
}
});
});
}
}
}, dnsMonitoringInterval, TimeUnit.MILLISECONDS);

@ -133,7 +133,7 @@ public class MasterSlaveEntry {
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
}
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available
if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
@ -317,7 +317,7 @@ public class MasterSlaveEntry {
}
public RFuture<Void> addSlave(URI address) {
return addSlave(address, true, NodeType.SLAVE);
return addSlave(address, false, NodeType.SLAVE);
}
private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) {
@ -350,7 +350,7 @@ public class MasterSlaveEntry {
// exclude master from slaves
if (!config.checkSkipSlavesInit()
&& (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) {
slaveDown(address, FreezeReason.SYSTEM);
slaveDown(masterEntry.getClient().getConfig().getAddress(), FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr);
}
return true;

@ -15,12 +15,12 @@
*/
package org.redisson.connection.balancer;
import java.security.SecureRandom;
import java.util.List;
import java.util.Random;
import org.redisson.connection.ClientConnectionsEntry;
import io.netty.util.internal.ThreadLocalRandom;
/**
*
* @author Nikita Koksharov
@ -28,10 +28,8 @@ import org.redisson.connection.ClientConnectionsEntry;
*/
public class RandomLoadBalancer implements LoadBalancer {
private final Random random = new SecureRandom();
public ClientConnectionsEntry getEntry(List<ClientConnectionsEntry> clientsCopy) {
int ind = random.nextInt(clientsCopy.size());
int ind = ThreadLocalRandom.current().nextInt(clientsCopy.size());
return clientsCopy.get(ind);
}

@ -19,10 +19,9 @@ import java.util.Collection;
import java.util.List;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
public class ListDrainToDecoder<V> implements MultiDecoder<Integer> {
private Collection<Object> list;
@ -31,11 +30,6 @@ public class ListDrainToDecoder<V> implements MultiDecoder<Integer> {
this.list = list;
}
@Override
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}
@Override
public Integer decode(List<Object> parts, State state) {
list.addAll(parts);
@ -43,8 +37,8 @@ public class ListDrainToDecoder<V> implements MultiDecoder<Integer> {
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
}

@ -22,6 +22,7 @@ import java.util.List;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
@ -48,15 +49,10 @@ public class MapCacheGetAllDecoder implements MultiDecoder<List<Object>> {
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
return LongCodec.INSTANCE.getValueDecoder().decode(buf, state);
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
}
@Override
public List<Object> decode(List<Object> parts, State state) {
if (parts.isEmpty()) {

@ -15,17 +15,15 @@
*/
package org.redisson.connection.decoder;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
@ -48,13 +46,8 @@ public class MapGetAllDecoder implements MultiDecoder<Map<Object, Object>> {
}
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean isApplicable(int paramNum, State state) {
return false;
public Decoder<Object> getDecoder(int paramNum, State state) {
return null;
}
@Override

@ -29,6 +29,7 @@ import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager;
@ -169,8 +170,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
public RFuture<T> get(RedisCommand<?> command) {
for (int j = entries.size() - 1; j >= 0; j--) {
final ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed()
&& tryAcquireConnection(entry)) {
if ((!entry.isFreezed() ||
(entry.getFreezeReason() == FreezeReason.SYSTEM && config.getReadMode() == ReadMode.MASTER_SLAVE)) &&
tryAcquireConnection(entry)) {
return acquireConnection(command, entry);
}
}

@ -0,0 +1,180 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.List;
import java.util.Set;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.api.RMultimap;
import org.redisson.api.RMultimapReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.misc.Hash;
import io.netty.buffer.ByteBuf;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
abstract class RedissonBaseMultimapReactive<K, V> extends RedissonExpirableReactive implements RMultimapReactive<K, V> {
private final RMultimap<K, V> instance;
public RedissonBaseMultimapReactive(RMultimap<K, V> instance, CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.instance = instance;
}
public RedissonBaseMultimapReactive(RMultimap<K, V> instance, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
this.instance = instance;
}
@Override
public Publisher<Integer> size() {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sizeAsync();
}
});
}
@Override
public Publisher<Boolean> containsKey(final Object key) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.containsKeyAsync(key);
}
});
}
@Override
public Publisher<Boolean> containsValue(final Object value) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.containsValueAsync(value);
}
});
}
@Override
public Publisher<Boolean> containsEntry(final Object key, final Object value) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.containsEntryAsync(key, value);
}
});
}
@Override
public Publisher<Boolean> put(final K key, final V value) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.putAsync(key, value);
}
});
}
@Override
public Publisher<Boolean> remove(final Object key, final Object value) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.removeAsync(key, value);
}
});
}
@Override
public Publisher<Boolean> putAll(final K key, final Iterable<? extends V> values) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.putAllAsync(key, values);
}
});
}
@Override
public Publisher<Integer> keySize() {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.keySizeAsync();
}
});
}
@Override
public Publisher<Long> fastRemove(final K... keys) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.fastRemoveAsync(keys);
}
});
}
@Override
public Publisher<Set<K>> readAllKeySet() {
return reactive(new Supplier<RFuture<Set<K>>>() {
@Override
public RFuture<Set<K>> get() {
return instance.readAllKeySetAsync();
}
});
}
protected String hash(ByteBuf objectState) {
return Hash.hashToBase64(objectState);
}
protected String hashAndRelease(ByteBuf objectState) {
try {
return Hash.hashToBase64(objectState);
} finally {
objectState.release();
}
}
String getValuesName(String hash) {
return "{" + getName() + "}:" + hash;
}
protected <T> Publisher<T> fastRemove(List<Object> mapKeys, List<Object> listKeys, RedisCommand<T> evalCommandType) {
return commandExecutor.evalWriteReactive(getName(), codec, evalCommandType,
"local res = redis.call('hdel', KEYS[1], unpack(ARGV)); " +
"if res > 0 then " +
"redis.call('del', unpack(KEYS, 2, #KEYS)); " +
"end; " +
"return res; ",
listKeys, mapKeys.toArray());
}
}

@ -0,0 +1,143 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonListMultimap;
import org.redisson.api.RListMultimapReactive;
import org.redisson.api.RListReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public class RedissonListMultimapReactive<K, V> extends RedissonBaseMultimapReactive<K, V> implements RListMultimapReactive<K, V> {
public RedissonListMultimapReactive(UUID id, CommandReactiveExecutor commandExecutor, String name) {
super(new RedissonListMultimap<K, V>(id, commandExecutor, name), commandExecutor, name);
}
public RedissonListMultimapReactive(UUID id, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(new RedissonListMultimap<K, V>(id, codec, commandExecutor, name), codec, commandExecutor, name);
}
@Override
public RListReactive<V> get(final K key) {
final ByteBuf keyState = encodeMapKey(key);
final String keyHash = hashAndRelease(keyState);
final String setName = getValuesName(keyHash);
return new RedissonListReactive<V>(codec, commandExecutor, setName) {
@Override
public Publisher<Boolean> delete() {
ByteBuf keyState = encodeMapKey(key);
return RedissonListMultimapReactive.this.fastRemove(Arrays.<Object>asList(keyState), Arrays.<Object>asList(setName), RedisCommands.EVAL_BOOLEAN_AMOUNT);
}
@Override
public Publisher<Boolean> clearExpire() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> expireAt(long timestamp) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Long> remainTimeToLive() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Void> rename(String newName) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> renamenx(String newName) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
};
}
@Override
public Publisher<List<V>> getAll(K key) {
ByteBuf keyState = encodeMapKey(key);
String keyHash = hashAndRelease(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.readReactive(getName(), codec, RedisCommands.LRANGE, setName, 0, -1);
}
@Override
public Publisher<List<V>> removeAll(Object key) {
ByteBuf keyState = encodeMapKey(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LIST,
"redis.call('hdel', KEYS[1], ARGV[1]); " +
"local members = redis.call('lrange', KEYS[2], 0, -1); " +
"redis.call('del', KEYS[2]); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), keyState);
}
@Override
public Publisher<List<V>> replaceValues(K key, Iterable<? extends V> values) {
List<Object> params = new ArrayList<Object>();
ByteBuf keyState = encodeMapKey(key);
params.add(keyState);
String keyHash = hash(keyState);
params.add(keyHash);
for (Object value : values) {
ByteBuf valueState = encodeMapValue(value);
params.add(valueState);
}
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_LIST,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"local members = redis.call('lrange', KEYS[2], 0, -1); " +
"redis.call('del', KEYS[2]); " +
"redis.call('rpush', KEYS[2], unpack(ARGV, 3, #ARGV)); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), params.toArray());
}
}

@ -0,0 +1,144 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSetMultimap;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import io.netty.buffer.ByteBuf;
/**
*
* @author Nikita Koksharov
*
* @param <K> key type
* @param <V> value type
*/
public class RedissonSetMultimapReactive<K, V> extends RedissonBaseMultimapReactive<K, V> implements RSetMultimapReactive<K, V> {
public RedissonSetMultimapReactive(UUID id, CommandReactiveExecutor commandExecutor, String name) {
super(new RedissonSetMultimap<K, V>(id, commandExecutor, name), commandExecutor, name);
}
public RedissonSetMultimapReactive(UUID id, Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(new RedissonSetMultimap<K, V>(id, codec, commandExecutor, name), codec, commandExecutor, name);
}
@Override
public RSetReactive<V> get(final K key) {
final ByteBuf keyState = encodeMapKey(key);
final String keyHash = hashAndRelease(keyState);
final String setName = getValuesName(keyHash);
return new RedissonSetReactive<V>(codec, commandExecutor, setName) {
@Override
public Publisher<Boolean> delete() {
ByteBuf keyState = encodeMapKey(key);
return RedissonSetMultimapReactive.this.fastRemove(Arrays.<Object>asList(keyState), Arrays.<Object>asList(setName), RedisCommands.EVAL_BOOLEAN_AMOUNT);
}
@Override
public Publisher<Boolean> clearExpire() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> expireAt(long timestamp) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Long> remainTimeToLive() {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Void> rename(String newName) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
@Override
public Publisher<Boolean> renamenx(String newName) {
throw new UnsupportedOperationException("This operation is not supported for SetMultimap values Set");
}
};
}
@Override
public Publisher<Set<V>> getAll(K key) {
ByteBuf keyState = encodeMapKey(key);
String keyHash = hashAndRelease(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.readReactive(getName(), codec, RedisCommands.SMEMBERS, setName);
}
@Override
public Publisher<Set<V>> removeAll(Object key) {
ByteBuf keyState = encodeMapKey(key);
String keyHash = hash(keyState);
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_SET,
"redis.call('hdel', KEYS[1], ARGV[1]); " +
"local members = redis.call('smembers', KEYS[2]); " +
"redis.call('del', KEYS[2]); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), keyState);
}
@Override
public Publisher<Set<V>> replaceValues(K key, Iterable<? extends V> values) {
List<Object> params = new ArrayList<Object>();
ByteBuf keyState = encodeMapKey(key);
params.add(keyState);
String keyHash = hash(keyState);
params.add(keyHash);
for (Object value : values) {
ByteBuf valueState = encodeMapValue(value);
params.add(valueState);
}
String setName = getValuesName(keyHash);
return commandExecutor.evalWriteReactive(getName(), codec, RedisCommands.EVAL_SET,
"redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); " +
"local members = redis.call('smembers', KEYS[2]); " +
"redis.call('del', KEYS[2]); " +
"redis.call('sadd', KEYS[2], unpack(ARGV, 3, #ARGV)); " +
"return members; ",
Arrays.<Object>asList(getName(), setName), params.toArray());
}
}

@ -33,6 +33,9 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import io.netty.buffer.ByteBuf;
import net.bytebuddy.utility.RandomString;
public class RedissonCodecTest extends BaseTest {
private Codec smileCodec = new SmileJacksonCodec();
private Codec codec = new SerializationCodec();
@ -107,6 +110,15 @@ public class RedissonCodecTest extends BaseTest {
test(redisson);
}
@Test
public void testSnappyBig() throws IOException {
SnappyCodec sc = new SnappyCodec();
String randomData = RandomString.make(Short.MAX_VALUE*2 + 142);
ByteBuf g = sc.getValueEncoder().encode(randomData);
String decompressedData = (String) sc.getValueDecoder().decode(g, null);
assertThat(decompressedData).isEqualTo(randomData);
}
@Test
public void testSnappy() {
Config config = createConfig();

@ -14,7 +14,7 @@ public class ClusterNodesDecoderTest {
@Test
public void test() throws IOException {
ClusterNodesDecoder decoder = new ClusterNodesDecoder();
ClusterNodesDecoder decoder = new ClusterNodesDecoder(false);
ByteBuf buf = Unpooled.buffer();
String info = "7af253f8c20a3b3fbd481801bd361ec6643c6f0b 192.168.234.129:7001@17001 master - 0 1478865073260 8 connected 5461-10922\n" +

@ -97,6 +97,17 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(redisson.getAtomicLong("executed").get()).isEqualTo(2);
}
@Test
public void testCronExpressionMultipleTasks() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");
executor.schedule(new ScheduledRunnableTask("executed1"), CronSchedule.of("0/5 * * * * ?"));
executor.schedule(new ScheduledRunnableTask("executed2"), CronSchedule.of("0/1 * * * * ?"));
Thread.sleep(30000);
assertThat(redisson.getAtomicLong("executed1").get()).isEqualTo(6);
assertThat(redisson.getAtomicLong("executed2").get()).isEqualTo(30);
}
@Test
public void testCancel() throws InterruptedException, ExecutionException {
RScheduledExecutorService executor = redisson.getExecutorService("test");

Loading…
Cancel
Save