Merge branch 'master' into 3.0.0

pull/802/merge
Nikita 8 years ago
commit d703d759a9

@ -14,6 +14,7 @@ __NOTE__: Both version lines have same features except `CompletionStage` interfa
Please read [documentation](https://github.com/redisson/redisson/wiki) for more details.
Redisson [releases history](https://github.com/redisson/redisson/blob/master/CHANGELOG.md)
Checkout more [code examples](https://github.com/redisson/redisson-examples)
Browse [javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.2.2)
Licensed under the Apache License 2.0.
@ -61,6 +62,7 @@ Features
Remote service, Live Object service, Executor service, Scheduler service
* [Spring Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#141-spring-cache) implementation
* [Hibernate Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#142-hibernate-cache) implementation
* [JCache API (JSR-107)](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#143-jcache-api-jsr-107-implementation) implementation
* [Tomcat Session Manager](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks#144-tomcat-redis-session-manager) implementation
* [Spring Session](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#145-spring-session) implementation
* [Reactive Streams](https://github.com/redisson/redisson/wiki/3.-operations-execution#32-reactive-way)
@ -80,6 +82,7 @@ Articles
================================
[Java data structures powered by Redis. Introduction to Redisson (pdf)](http://redisson.org/Redisson.pdf)
[Moving from Hazelcast to Redis](https://engineering.datorama.com/moving-from-hazelcast-to-redis-b90a0769d1cb)
[A Look at the Java Distributed In-Memory Data Model (Powered by Redis)](https://dzone.com/articles/java-distributed-in-memory-data-model-powered-by-r)
[Distributed tasks Execution and Scheduling in Java, powered by Redis](https://dzone.com/articles/distributed-tasks-execution-and-scheduling-in-java)
[Introducing Redisson Live Objects (Object Hash Mapping)](https://dzone.com/articles/introducing-redisson-live-object-object-hash-mappi)

@ -83,6 +83,11 @@
<artifactId>redisson</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-benchmark</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>

@ -21,22 +21,22 @@ Usage
2. Copy two jars into `TOMCAT_BASE/lib` directory:
1. __For JDK 1.8+__
[redisson-all-3.2.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.2.0&e=jar)
[redisson-all-3.2.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.2.2&e=jar)
for Tomcat 6.x
[redisson-tomcat-6-3.2.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.2.0&e=jar)
[redisson-tomcat-6-3.2.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.2.2&e=jar)
for Tomcat 7.x
[redisson-tomcat-7-3.2.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.2.0&e=jar)
[redisson-tomcat-7-3.2.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.2.2&e=jar)
for Tomcat 8.x
[redisson-tomcat-8-3.2.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.2.0&e=jar)
[redisson-tomcat-8-3.2.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.2.2&e=jar)
1. __For JDK 1.6+__
[redisson-all-2.7.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.7.0&e=jar)
[redisson-all-2.7.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.7.2&e=jar)
for Tomcat 6.x
[redisson-tomcat-6-2.7.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.7.0&e=jar)
[redisson-tomcat-6-2.7.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.7.2&e=jar)
for Tomcat 7.x
[redisson-tomcat-7-2.7.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.7.0&e=jar)
[redisson-tomcat-7-2.7.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.7.2&e=jar)
for Tomcat 8.x
[redisson-tomcat-8-2.7.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.7.0&e=jar)
[redisson-tomcat-8-2.7.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.7.2&e=jar)

@ -15,6 +15,7 @@
*/
package org.redisson.tomcat;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@ -33,11 +34,19 @@ import org.redisson.api.RMap;
public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
public RedissonSession(RedissonSessionManager manager) {
super(manager);
this.redissonManager = manager;
try {
Field attr = StandardSession.class.getDeclaredField("attributes");
attrs = (Map<String, Object>) attr.get(this);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private static final long serialVersionUID = -2518607181636076487L;
@ -143,7 +152,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:isValid", isValid);
newMap.put("session:isNew", isNew);
for (Entry<String, Object> entry : attributes.entrySet()) {
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}

@ -15,6 +15,7 @@
*/
package org.redisson.tomcat;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@ -33,11 +34,18 @@ import org.redisson.api.RMap;
public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
public RedissonSession(RedissonSessionManager manager) {
super(manager);
this.redissonManager = manager;
try {
Field attr = StandardSession.class.getDeclaredField("attributes");
attrs = (Map<String, Object>) attr.get(this);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private static final long serialVersionUID = -2518607181636076487L;
@ -143,7 +151,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:isValid", isValid);
newMap.put("session:isNew", isNew);
for (Entry<String, Object> entry : attributes.entrySet()) {
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}

@ -15,6 +15,7 @@
*/
package org.redisson.tomcat;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@ -33,11 +34,19 @@ import org.redisson.api.RMap;
public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
public RedissonSession(RedissonSessionManager manager) {
super(manager);
this.redissonManager = manager;
try {
Field attr = StandardSession.class.getDeclaredField("attributes");
attrs = (Map<String, Object>) attr.get(this);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private static final long serialVersionUID = -2518607181636076487L;
@ -143,7 +152,7 @@ public class RedissonSession extends StandardSession {
newMap.put("session:isValid", isValid);
newMap.put("session:isNew", isNew);
for (Entry<String, Object> entry : attributes.entrySet()) {
for (Entry<String, Object> entry : attrs.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}

@ -240,12 +240,12 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(connectionManager.getCommandExecutor(), name);
return new RedissonListMultimap<K, V>(this, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimap<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimap<K, V>(this, codec, connectionManager.getCommandExecutor(), name);
}
@Override
@ -260,37 +260,37 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name);
return new RedissonMap<K, V>(this, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimap<K, V>(this, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimapCache<K, V>(this, evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimapCache<K, V>(this, evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimapCache<K, V>(this, evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name, Codec codec) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
return new RedissonListMultimapCache<K, V>(this, evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimap<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonSetMultimap<K, V>(this, codec, connectionManager.getCommandExecutor(), name);
}
@Override
@ -305,17 +305,17 @@ public class Redisson implements RedissonClient {
@Override
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonMapCache<K, V>(this, evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name);
return new RedissonMapCache<K, V>(this, codec, evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name);
return new RedissonMap<K, V>(this, codec, connectionManager.getCommandExecutor(), name);
}
@Override
@ -538,7 +538,7 @@ public class Redisson implements RedissonClient {
@Override
public RBatch createBatch() {
RedissonBatch batch = new RedissonBatch(evictionScheduler, connectionManager);
RedissonBatch batch = new RedissonBatch(this, evictionScheduler, connectionManager);
if (config.isRedissonReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}

@ -22,12 +22,15 @@ import java.util.List;
import java.util.NoSuchElementException;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
abstract class RedissonBaseIterator<V> implements Iterator<V> {
private List<V> firstValues;
private List<V> lastValues;
private Iterator<V> lastIter;
private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues;
private Iterator<ScanObjectEntry> lastIter;
protected long nextIterPos;
protected InetSocketAddress client;
@ -40,6 +43,8 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
public boolean hasNext() {
if (lastIter == null || !lastIter.hasNext()) {
if (finished) {
free(firstValues);
free(lastValues);
currentElementRemoved = false;
removeExecuted = false;
@ -56,8 +61,12 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
long prevIterPos;
do {
prevIterPos = nextIterPos;
ListScanResult<V> res = iterator(client, nextIterPos);
lastValues = new ArrayList<V>(res.getValues());
ListScanResult<ScanObjectEntry> res = iterator(client, nextIterPos);
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getValues());
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
@ -87,6 +96,9 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
}
}
} else if (lastValues.removeAll(firstValues)) {
free(firstValues);
free(lastValues);
currentElementRemoved = false;
removeExecuted = false;
client = null;
@ -111,11 +123,28 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
return lastIter.hasNext();
}
private List<ByteBuf> convert(List<ScanObjectEntry> list) {
List<ByteBuf> result = new ArrayList<ByteBuf>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getBuf());
}
return result;
}
private void free(List<ByteBuf> list) {
if (list == null) {
return;
}
for (ByteBuf byteBuf : list) {
byteBuf.release();
}
}
protected boolean tryAgain() {
return false;
}
abstract ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos);
abstract ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos);
@Override
public V next() {
@ -123,7 +152,7 @@ abstract class RedissonBaseIterator<V> implements Iterator<V> {
throw new NoSuchElementException("No such element");
}
value = lastIter.next();
value = (V) lastIter.next().getObj();
currentElementRemoved = false;
return value;
}

@ -41,6 +41,7 @@ import org.redisson.api.RScriptAsync;
import org.redisson.api.RSetAsync;
import org.redisson.api.RSetCacheAsync;
import org.redisson.api.RTopicAsync;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
@ -55,10 +56,12 @@ public class RedissonBatch implements RBatch {
private final EvictionScheduler evictionScheduler;
private final CommandBatchService executorService;
private final RedissonClient client;
protected RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
protected RedissonBatch(RedissonClient client, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
this.executorService = new CommandBatchService(connectionManager);
this.evictionScheduler = evictionScheduler;
this.client = client;
}
@Override
@ -93,12 +96,12 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMapAsync<K, V> getMap(String name) {
return new RedissonMap<K, V>(executorService, name);
return new RedissonMap<K, V>(client, executorService, name);
}
@Override
public <K, V> RMapAsync<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, executorService, name);
return new RedissonMap<K, V>(client, codec, executorService, name);
}
@Override
@ -193,12 +196,12 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, executorService, name);
return new RedissonMapCache<K, V>(client, codec, evictionScheduler, executorService, name);
}
@Override
public <K, V> RMapCacheAsync<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(evictionScheduler, executorService, name);
return new RedissonMapCache<K, V>(client, evictionScheduler, executorService, name);
}
@Override
@ -243,22 +246,22 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMultimapAsync<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(executorService, name);
return new RedissonSetMultimap<K, V>(client, executorService, name);
}
@Override
public <K, V> RMultimapAsync<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimap<K, V>(codec, executorService, name);
return new RedissonSetMultimap<K, V>(client, codec, executorService, name);
}
@Override
public <K, V> RMultimapAsync<K, V> getListMultimap(String name) {
return new RedissonListMultimap<K, V>(executorService, name);
return new RedissonListMultimap<K, V>(client, executorService, name);
}
@Override
public <K, V> RMultimapAsync<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimap<K, V>(codec, executorService, name);
return new RedissonListMultimap<K, V>(client, codec, executorService, name);
}
@Override
@ -273,22 +276,22 @@ public class RedissonBatch implements RBatch {
@Override
public <K, V> RMultimapCacheAsync<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, executorService, name);
return new RedissonSetMultimapCache<K, V>(client, evictionScheduler, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, executorService, name);
return new RedissonSetMultimapCache<K, V>(client, evictionScheduler, codec, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getListMultimapCache(String name) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, executorService, name);
return new RedissonListMultimapCache<K, V>(client, evictionScheduler, executorService, name);
}
@Override
public <K, V> RMultimapCacheAsync<K, V> getListMultimapCache(String name, Codec codec) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, executorService, name);
return new RedissonListMultimapCache<K, V>(client, evictionScheduler, codec, executorService, name);
}
protected void enableRedissonReferenceSupport(Redisson redisson) {

@ -33,9 +33,11 @@ import org.redisson.api.RFuture;
import org.redisson.api.RKeys;
import org.redisson.api.RType;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
@ -104,12 +106,12 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null);
}
private ListScanResult<String> scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<String>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count);
return commandExecutor.get(f);
}
RFuture<ListScanResult<String>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
return commandExecutor.get(f);
}
@ -117,7 +119,7 @@ public class RedissonKeys implements RKeys {
return new RedissonBaseIterator<String>() {
@Override
ListScanResult<String> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
}

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RListMultimap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
@ -44,12 +45,12 @@ public class RedissonListMultimap<K, V> extends RedissonMultimap<K, V> implement
private static final RedisStrictCommand<Boolean> LLEN_VALUE = new RedisStrictCommand<Boolean>("LLEN", new BooleanAmountReplayConvertor());
RedissonListMultimap(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
RedissonListMultimap(RedissonClient client, CommandAsyncExecutor connectionManager, String name) {
super(client, connectionManager, name);
}
RedissonListMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
RedissonListMultimap(RedissonClient client, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(client, codec, connectionManager, name);
}
@Override

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RListMultimapCache;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
@ -36,14 +37,14 @@ public class RedissonListMultimapCache<K, V> extends RedissonListMultimap<K, V>
private final RedissonMultimapCache<K> baseCache;
RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
RedissonListMultimapCache(RedissonClient client, EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(client, connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
}
RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
RedissonListMultimapCache(RedissonClient client, EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(client, codec, connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
}

@ -190,12 +190,12 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
private int invalidationListenerId;
protected RedissonLocalCachedMap(RedissonClient redisson, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options) {
super(commandExecutor, name);
super(redisson, commandExecutor, name);
init(redisson, name, options);
}
protected RedissonLocalCachedMap(RedissonClient redisson, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options) {
super(codec, connectionManager, name);
super(redisson, codec, connectionManager, name);
init(redisson, name, options);
}

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.AbstractCollection;
@ -29,9 +30,11 @@ import java.util.Map;
import java.util.Set;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -42,6 +45,7 @@ import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.misc.Hash;
/**
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
@ -60,14 +64,33 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
static final RedisCommand<Boolean> EVAL_REMOVE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP);
static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
protected RedissonMap(CommandAsyncExecutor commandExecutor, String name) {
private final RedissonClient client;
protected RedissonMap(RedissonClient client, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.client = client;
}
public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonMap(RedissonClient client, Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
this.client = client;
}
@Override
public RLock getLock(K key) {
String lockName = getLockName(key);
return client.getLock(lockName);
}
private String getLockName(Object key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
return "{" + getName() + "}:" + Hash.hashToBase64(keyState) + ":key";
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public int size() {
return get(sizeAsync());
@ -431,7 +454,7 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos);
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
return get(f);
}

@ -27,9 +27,10 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -84,13 +85,13 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
private static final RedisCommand<Boolean> EVAL_CONTAINS_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
protected RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
protected RedissonMapCache(RedissonClient redisson, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(redisson, commandExecutor, name);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName());
}
public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
public RedissonMapCache(RedissonClient redisson, Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(redisson, codec, commandExecutor, name);
evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName());
}
@ -530,7 +531,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new ScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new MapScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
RFuture<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN,
"local result = {}; "
+ "local idleKeys = {}; "

@ -15,6 +15,7 @@
*/
package org.redisson;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractCollection;
import java.util.AbstractSet;
@ -29,10 +30,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMultimap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -49,14 +52,33 @@ import org.redisson.misc.Hash;
*/
public abstract class RedissonMultimap<K, V> extends RedissonExpirable implements RMultimap<K, V> {
RedissonMultimap(CommandAsyncExecutor connectionManager, String name) {
private final RedissonClient client;
RedissonMultimap(RedissonClient client, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
this.client = client;
}
RedissonMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) {
RedissonMultimap(RedissonClient client, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
this.client = client;
}
@Override
public RLock getLock(K key) {
String lockName = getLockName(key);
return client.getLock(lockName);
}
private String getLockName(Object key) {
try {
byte[] keyState = codec.getMapKeyEncoder().encode(key);
return "{" + getName() + "}:" + Hash.hashToBase64(keyState) + ":key";
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
protected String hash(byte[] objectState) {
return Hash.hashToBase64(objectState);
}
@ -249,7 +271,7 @@ public abstract class RedissonMultimap<K, V> extends RedissonExpirable implement
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos);
return get(f);
}

@ -147,7 +147,7 @@ public class RedissonNode {
private void retrieveAdresses() {
ConnectionManager connectionManager = ((Redisson)redisson).getConnectionManager();
for (MasterSlaveEntry entry : connectionManager.getEntrySet()) {
RFuture<RedisConnection> readFuture = entry.connectionReadOp(RedisCommands.PUBLISH);
RFuture<RedisConnection> readFuture = entry.connectionReadOp(null);
if (readFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout())
&& readFuture.isSuccess()) {
RedisConnection connection = readFuture.getNow();
@ -156,7 +156,7 @@ public class RedissonNode {
localAddress = (InetSocketAddress) connection.getChannel().localAddress();
return;
}
RFuture<RedisConnection> writeFuture = entry.connectionWriteOp(RedisCommands.PUBLISH);
RFuture<RedisConnection> writeFuture = entry.connectionWriteOp(null);
if (writeFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout())
&& writeFuture.isSuccess()) {
RedisConnection connection = writeFuture.getNow();

@ -34,6 +34,7 @@ import org.redisson.api.SortOrder;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.ScoredCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -41,6 +42,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -259,8 +261,8 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), o);
}
private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<V>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
return get(f);
}
@ -269,7 +271,7 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}

@ -28,11 +28,13 @@ import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -81,8 +83,8 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return getName();
}
ListScanResult<V> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<V>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos);
ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);
return get(f);
}
@ -91,7 +93,7 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos);
}

@ -28,12 +28,14 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RSetCache;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -102,13 +104,13 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
Arrays.<Object>asList(getName()), System.currentTimeMillis(), o);
}
ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<V>> f = scanIteratorAsync(client, startPos);
ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(client, startPos);
return get(f);
}
public RFuture<ListScanResult<V>> scanIteratorAsync(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_ZSCAN,
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), RedisCommands.EVAL_ZSCAN,
"local result = {}; "
+ "local res = redis.call('zscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
@ -127,7 +129,7 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}

@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.RSetMultimap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -47,12 +48,12 @@ public class RedissonSetMultimap<K, V> extends RedissonMultimap<K, V> implements
private static final RedisStrictCommand<Boolean> SCARD_VALUE = new RedisStrictCommand<Boolean>("SCARD", new BooleanAmountReplayConvertor());
private static final RedisCommand<Boolean> SISMEMBER_VALUE = new RedisCommand<Boolean>("SISMEMBER", new BooleanReplayConvertor());
RedissonSetMultimap(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
RedissonSetMultimap(RedissonClient client, CommandAsyncExecutor connectionManager, String name) {
super(client, connectionManager, name);
}
RedissonSetMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
RedissonSetMultimap(RedissonClient client, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(client, codec, connectionManager, name);
}
@Override

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.RSetMultimapCache;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
@ -36,14 +37,14 @@ public class RedissonSetMultimapCache<K, V> extends RedissonSetMultimap<K, V> im
private final RedissonMultimapCache<K> baseCache;
RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
RedissonSetMultimapCache(RedissonClient client, EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) {
super(client, connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
}
RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(codec, connectionManager, name);
RedissonSetMultimapCache(RedissonClient client, EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) {
super(client, codec, connectionManager, name);
evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName());
baseCache = new RedissonMultimapCache<K>(connectionManager, name, codec, getTimeoutSetName());
}

@ -29,6 +29,7 @@ import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -39,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
/**
@ -161,8 +163,8 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
Arrays.<Object>asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o);
}
private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<V>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN,
private ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_SSCAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false then "
@ -182,7 +184,7 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
return new RedissonBaseIterator<V>() {
@Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
}

@ -33,6 +33,14 @@ import java.util.concurrent.ConcurrentMap;
*/
public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K, V> {
/**
* Returns <code>RLock</code> instance associated with key
*
* @param key - map key
* @return lock
*/
RLock getLock(K key);
/**
* Returns size of value mapped by key in bytes
*

@ -29,6 +29,14 @@ import java.util.Set;
*/
public interface RMultimap<K, V> extends RExpirable, RMultimapAsync<K, V> {
/**
* Returns <code>RLock</code> instance associated with key
*
* @param key - map key
* @return lock
*/
RLock getLock(K key);
/**
* Returns the number of key-value pairs in this multimap.
*

@ -52,15 +52,14 @@ public class RedisConnection implements RedisCommands {
private long lastUsageTime;
public RedisConnection(RedisClient redisClient, Channel channel) {
super();
this.redisClient = redisClient;
this(redisClient);
updateChannel(channel);
lastUsageTime = System.currentTimeMillis();
}
protected RedisConnection() {
redisClient = null;
protected RedisConnection(RedisClient redisClient) {
this.redisClient = redisClient;
}
public static <C extends RedisConnection> C getFrom(Channel channel) {

@ -0,0 +1,100 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.codec;
import java.io.IOException;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
*
* @author Nikita Koksharov
*
*/
public class MapScanCodec implements Codec {
private final Codec delegate;
private final Codec mapValueCodec;
public MapScanCodec(Codec delegate) {
this(delegate, null);
}
public MapScanCodec(Codec delegate, Codec mapValueCodec) {
this.delegate = delegate;
this.mapValueCodec = mapValueCodec;
}
@Override
public Decoder<Object> getValueDecoder() {
return delegate.getValueDecoder();
}
@Override
public Encoder getValueEncoder() {
return delegate.getValueEncoder();
}
@Override
public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
Object val = c.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
}
@Override
public Encoder getMapValueEncoder() {
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
return c.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Object val = delegate.getMapKeyDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
}
@Override
public Encoder getMapKeyEncoder() {
return delegate.getMapKeyEncoder();
}
}

@ -33,20 +33,21 @@ import io.netty.buffer.Unpooled;
public class ScanCodec implements Codec {
private final Codec delegate;
private final Codec mapValueCodec;
public ScanCodec(Codec delegate) {
this(delegate, null);
}
public ScanCodec(Codec delegate, Codec mapValueCodec) {
this.delegate = delegate;
this.mapValueCodec = mapValueCodec;
}
@Override
public Decoder<Object> getValueDecoder() {
return delegate.getValueDecoder();
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Object val = delegate.getValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
}
@Override
@ -56,40 +57,17 @@ public class ScanCodec implements Codec {
@Override
public Decoder<Object> getMapValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
Object val = c.getMapValueDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
return delegate.getMapValueDecoder();
}
@Override
public Encoder getMapValueEncoder() {
Codec c = delegate;
if (mapValueCodec != null) {
c = mapValueCodec;
}
return c.getMapValueEncoder();
return delegate.getMapValueEncoder();
}
@Override
public Decoder<Object> getMapKeyDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf, State state) throws IOException {
ByteBuf b = Unpooled.copiedBuffer(buf);
Object val = delegate.getMapKeyDecoder().decode(buf, state);
return new ScanObjectEntry(b, val);
}
};
return delegate.getMapKeyDecoder();
}
@Override

@ -804,22 +804,47 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private <R, V> void handleReference(RPromise<R> mainPromise, R res) {
if (res instanceof List || res instanceof ListScanResult) {
List r = res instanceof ListScanResult ? ((ListScanResult)res).getValues() : (List) res;
if (res instanceof List) {
List<Object> r = (List<Object>)res;
for (int i = 0; i < r.size(); i++) {
if (r.get(i) instanceof RedissonReference) {
try {
r.set(i ,(redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) r.get(i))
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) r.get(i))));
r.set(i, redisson != null
? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i))
: RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i)));
} catch (Exception exception) {//skip and carry on to next one.
}
} else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) {
try {
ScoredEntry se = ((ScoredEntry) r.get(i));
r.set(i ,new ScoredEntry(se.getScore(), redisson != null
ScoredEntry<?> se = ((ScoredEntry<?>) r.get(i));
se = new ScoredEntry(se.getScore(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) se.getValue())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue())));
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue()));
r.set(i, se);
} catch (Exception exception) {//skip and carry on to next one.
}
}
}
mainPromise.trySuccess(res);
} else if (res instanceof ListScanResult) {
List<ScanObjectEntry> r = ((ListScanResult)res).getValues();
for (int i = 0; i < r.size(); i++) {
ScanObjectEntry e = r.get(i);
if (e.getObj() instanceof RedissonReference) {
try {
r.set(i , new ScanObjectEntry(e.getBuf(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) e.getObj())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) e.getObj())));
} catch (Exception exception) {//skip and carry on to next one.
}
} else if (e.getObj() instanceof ScoredEntry && ((ScoredEntry<?>) e.getObj()).getValue() instanceof RedissonReference) {
try {
ScoredEntry<?> se = ((ScoredEntry<?>) e.getObj());
se = new ScoredEntry(se.getScore(), redisson != null
? RedissonObjectFactory.<R>fromReference(redisson, (RedissonReference) se.getValue())
: RedissonObjectFactory.<R>fromReference(redissonReactive, (RedissonReference) se.getValue()));
r.set(i, new ScanObjectEntry(e.getBuf(), se));
} catch (Exception exception) {//skip and carry on to next one.
}
}

@ -201,7 +201,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
return connectionManager.newFailedFuture(exception);
}
private RFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {
private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {
final RPromise<T> result = connectionManager.newPromise();
acquireConnection(entry, new Runnable() {
@Override

@ -58,7 +58,7 @@ import org.redisson.api.RLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -2089,7 +2089,7 @@ public class JCache<K, V> extends RedissonObject implements Cache<K, V> {
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f
= commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos);
= commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos);
return get(f);
}

@ -19,7 +19,6 @@ import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.EvictionScheduler;
import org.redisson.Redisson;
import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBitSetReactive;

@ -30,7 +30,7 @@ import org.redisson.EvictionScheduler;
import org.redisson.api.RMapCacheReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
@ -342,7 +342,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonMapReactive<K, V> im
@Override
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadReactive(client, getName(), new ScanCodec(codec), EVAL_HSCAN,
return commandExecutor.evalReadReactive(client, getName(), new MapScanCodec(codec), EVAL_HSCAN,
"local result = {}; "
+ "local res = redis.call('hscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "

@ -24,7 +24,7 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
import org.redisson.api.RMapReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.MapScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
@ -50,12 +50,12 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
instance = new RedissonMap<K, V>(codec, commandExecutor, name);
instance = new RedissonMap<K, V>(null, codec, commandExecutor, name);
}
public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
instance = new RedissonMap<K, V>(codec, commandExecutor, name);
instance = new RedissonMap<K, V>(null, codec, commandExecutor, name);
}
@Override
@ -130,7 +130,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
}
Publisher<MapScanResult<ScanObjectEntry, ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos);
}
@Override

@ -23,6 +23,7 @@ import java.util.Collections;
import org.reactivestreams.Publisher;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
@ -30,6 +31,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactive implements RScoredSortedSetReactive<V> {
@ -122,15 +124,15 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
return commandExecutor.readReactive(getName(), codec, RedisCommands.ZRANK, getName(), o);
}
private Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos);
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos);
}
@Override
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};

@ -30,6 +30,7 @@ import org.redisson.api.RSetCacheReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
/**
@ -76,7 +77,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
return reactive(instance.containsAsync(o));
}
Publisher<ListScanResult<V>> scanIterator(InetSocketAddress client, long startPos) {
Publisher<ListScanResult<ScanObjectEntry>> scanIterator(InetSocketAddress client, long startPos) {
return reactive(instance.scanIteratorAsync(client, startPos));
}
@ -84,7 +85,7 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos);
}
};

@ -26,8 +26,10 @@ import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
import org.redisson.api.RSetReactive;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.ScanCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
/**
@ -66,8 +68,8 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
return reactive(instance.containsAsync(o));
}
private Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
private Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos);
}
@Override
@ -156,7 +158,7 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};

@ -16,13 +16,16 @@
package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
@ -32,28 +35,27 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
private List<V> firstValues;
private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
private boolean finished;
@Override
protected void onRequest(long n) {
currentIndex = n;
nextValues();
}
private void handle(List<V> vals) {
for (V val : vals) {
onNext(val);
private void handle(List<ScanObjectEntry> vals) {
for (ScanObjectEntry val : vals) {
onNext((V)val.getObj());
}
}
protected void nextValues() {
final ReactiveSubscription<V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<ScanObjectEntry>>() {
@Override
public void onSubscribe(Subscription s) {
@ -61,32 +63,68 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
}
@Override
public void onNext(ListScanResult<V> res) {
client = res.getRedisClient();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
currentIndex = 0;
public void onNext(ListScanResult<ScanObjectEntry> res) {
if (finished) {
free(firstValues);
free(lastValues);
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
return;
}
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
long prevIterPos = nextIterPos;
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getValues());
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
}
} else {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
if (res.getPos() == 0) {
finished = true;
m.onComplete();
return;
}
}
} else if (lastValues.removeAll(firstValues)) {
free(firstValues);
free(lastValues);
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
prevIterPos = -1;
finished = true;
m.onComplete();
return;
}
}
handle(res.getValues());
if (currentIndex == 0) {
return;
}
if (nextIterPos == -1) {
nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
finished = true;
m.onComplete();
currentIndex = 0;
}
}
@ -97,7 +135,7 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
@Override
public void onComplete() {
if (currentIndex == 0) {
if (finished) {
return;
}
nextValues();
@ -106,7 +144,24 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
}
});
}
private void free(List<ByteBuf> list) {
if (list == null) {
return;
}
for (ByteBuf byteBuf : list) {
byteBuf.release();
}
}
private List<ByteBuf> convert(List<ScanObjectEntry> list) {
List<ByteBuf> result = new ArrayList<ByteBuf>(list.size());
for (ScanObjectEntry entry : list) {
result.add(entry.getBuf());
}
return result;
}
protected abstract Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long nextIterPos);
protected abstract Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos);
}

@ -15,7 +15,6 @@
*/
package org.redisson.spring.cache;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@ -24,7 +23,6 @@ import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.redisson.misc.Hash;
import org.springframework.cache.Cache;
import org.springframework.cache.support.SimpleValueWrapper;
@ -126,8 +124,7 @@ public class RedissonCache implements Cache {
public <T> T get(Object key, Callable<T> valueLoader) {
Object value = map.get(key);
if (value == null) {
String lockName = getLockName(key);
RLock lock = redisson.getLock(lockName);
RLock lock = map.getLock(key);
lock.lock();
try {
value = map.get(key);
@ -154,15 +151,6 @@ public class RedissonCache implements Cache {
return (T) fromStoreValue(value);
}
private String getLockName(Object key) {
try {
byte[] keyState = redisson.getConfig().getCodec().getMapKeyEncoder().encode(key);
return "{" + map.getName() + "}:" + Hash.hashToBase64(keyState) + ":key";
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
protected Object fromStoreValue(Object storeValue) {
if (storeValue == NullValue.INSTANCE) {
return null;

@ -86,9 +86,9 @@ public class RedissonScoredSortedSetReactiveTest extends BaseReactiveTest {
@Test
public void testRemoveAsync() throws InterruptedException, ExecutionException {
RScoredSortedSetReactive<Integer> set = redisson.getScoredSortedSet("simple");
set.add(0.11, 1);
set.add(0.22, 3);
set.add(0.33, 7);
sync(set.add(0.11, 1));
sync(set.add(0.22, 3));
sync(set.add(0.33, 7));
Assert.assertTrue(sync(set.remove(1)));
Assert.assertFalse(sync(set.contains(1)));

@ -75,10 +75,10 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest {
assertThat(sync(set.add("123", 1, TimeUnit.SECONDS))).isFalse();
Thread.sleep(50);
Thread.sleep(800);
assertThat(sync(set.contains("123"))).isTrue();
Thread.sleep(150);
Thread.sleep(250);
assertThat(sync(set.contains("123"))).isFalse();
}
@ -104,12 +104,15 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest {
}
@Test
public void testIteratorSequence() {
public void testIteratorSequence() throws InterruptedException {
RSetCacheReactive<Long> set = redisson.getSetCache("set");
for (int i = 0; i < 1000; i++) {
sync(set.add(Long.valueOf(i)));
set.add(Long.valueOf(i));
}
Thread.sleep(1000);
assertThat(sync(set.size())).isEqualTo(1000);
Set<Long> setCopy = new HashSet<Long>();
for (int i = 0; i < 1000; i++) {
setCopy.add(Long.valueOf(i));

Loading…
Cancel
Save