diff --git a/README.md b/README.md index f01263216..f84444263 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ __NOTE__: Both version lines have same features except `CompletionStage` interfa Please read [documentation](https://github.com/redisson/redisson/wiki) for more details. Redisson [releases history](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) Checkout more [code examples](https://github.com/redisson/redisson-examples) +Browse [javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.2.2) Licensed under the Apache License 2.0. @@ -61,6 +62,7 @@ Features Remote service, Live Object service, Executor service, Scheduler service * [Spring Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#141-spring-cache) implementation * [Hibernate Cache](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#142-hibernate-cache) implementation +* [JCache API (JSR-107)](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#143-jcache-api-jsr-107-implementation) implementation * [Tomcat Session Manager](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks#144-tomcat-redis-session-manager) implementation * [Spring Session](https://github.com/redisson/redisson/wiki/14.-Integration%20with%20frameworks/#145-spring-session) implementation * [Reactive Streams](https://github.com/redisson/redisson/wiki/3.-operations-execution#32-reactive-way) @@ -80,6 +82,7 @@ Articles ================================ [Java data structures powered by Redis. Introduction to Redisson (pdf)](http://redisson.org/Redisson.pdf) +[Moving from Hazelcast to Redis](https://engineering.datorama.com/moving-from-hazelcast-to-redis-b90a0769d1cb) [A Look at the Java Distributed In-Memory Data Model (Powered by Redis)](https://dzone.com/articles/java-distributed-in-memory-data-model-powered-by-r) [Distributed tasks Execution and Scheduling in Java, powered by Redis](https://dzone.com/articles/distributed-tasks-execution-and-scheduling-in-java) [Introducing Redisson Live Objects (Object Hash Mapping)](https://dzone.com/articles/introducing-redisson-live-object-object-hash-mappi) diff --git a/redisson-all/pom.xml b/redisson-all/pom.xml index 56743bae4..1a5d7140a 100644 --- a/redisson-all/pom.xml +++ b/redisson-all/pom.xml @@ -83,6 +83,11 @@ redisson ${project.version} + + org.redisson + redisson-benchmark + ${project.version} + io.netty netty-transport-native-epoll diff --git a/redisson-tomcat/README.md b/redisson-tomcat/README.md index 4285e20cb..b4642bcd2 100644 --- a/redisson-tomcat/README.md +++ b/redisson-tomcat/README.md @@ -21,22 +21,22 @@ Usage 2. Copy two jars into `TOMCAT_BASE/lib` directory: 1. __For JDK 1.8+__ - [redisson-all-3.2.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.2.0&e=jar) + [redisson-all-3.2.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.2.2&e=jar) for Tomcat 6.x - [redisson-tomcat-6-3.2.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.2.0&e=jar) + [redisson-tomcat-6-3.2.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.2.2&e=jar) for Tomcat 7.x - [redisson-tomcat-7-3.2.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.2.0&e=jar) + [redisson-tomcat-7-3.2.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.2.2&e=jar) for Tomcat 8.x - [redisson-tomcat-8-3.2.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.2.0&e=jar) + [redisson-tomcat-8-3.2.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.2.2&e=jar) 1. __For JDK 1.6+__ - [redisson-all-2.7.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.7.0&e=jar) + [redisson-all-2.7.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=2.7.2&e=jar) for Tomcat 6.x - [redisson-tomcat-6-2.7.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.7.0&e=jar) + [redisson-tomcat-6-2.7.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=2.7.2&e=jar) for Tomcat 7.x - [redisson-tomcat-7-2.7.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.7.0&e=jar) + [redisson-tomcat-7-2.7.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=2.7.2&e=jar) for Tomcat 8.x - [redisson-tomcat-8-2.7.0.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.7.0&e=jar) + [redisson-tomcat-8-2.7.2.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=2.7.2&e=jar) diff --git a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSession.java b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSession.java index b1a9abac2..a8981b95b 100644 --- a/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSession.java +++ b/redisson-tomcat/redisson-tomcat-6/src/main/java/org/redisson/tomcat/RedissonSession.java @@ -15,6 +15,7 @@ */ package org.redisson.tomcat; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -33,11 +34,19 @@ import org.redisson.api.RMap; public class RedissonSession extends StandardSession { private final RedissonSessionManager redissonManager; + private final Map attrs; private RMap map; public RedissonSession(RedissonSessionManager manager) { super(manager); this.redissonManager = manager; + + try { + Field attr = StandardSession.class.getDeclaredField("attributes"); + attrs = (Map) attr.get(this); + } catch (Exception e) { + throw new IllegalStateException(e); + } } private static final long serialVersionUID = -2518607181636076487L; @@ -143,7 +152,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:isValid", isValid); newMap.put("session:isNew", isNew); - for (Entry entry : attributes.entrySet()) { + for (Entry entry : attrs.entrySet()) { newMap.put(entry.getKey(), entry.getValue()); } diff --git a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSession.java b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSession.java index b1a9abac2..31db2671f 100644 --- a/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSession.java +++ b/redisson-tomcat/redisson-tomcat-7/src/main/java/org/redisson/tomcat/RedissonSession.java @@ -15,6 +15,7 @@ */ package org.redisson.tomcat; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -33,11 +34,18 @@ import org.redisson.api.RMap; public class RedissonSession extends StandardSession { private final RedissonSessionManager redissonManager; + private final Map attrs; private RMap map; public RedissonSession(RedissonSessionManager manager) { super(manager); this.redissonManager = manager; + try { + Field attr = StandardSession.class.getDeclaredField("attributes"); + attrs = (Map) attr.get(this); + } catch (Exception e) { + throw new IllegalStateException(e); + } } private static final long serialVersionUID = -2518607181636076487L; @@ -143,7 +151,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:isValid", isValid); newMap.put("session:isNew", isNew); - for (Entry entry : attributes.entrySet()) { + for (Entry entry : attrs.entrySet()) { newMap.put(entry.getKey(), entry.getValue()); } diff --git a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSession.java b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSession.java index b1a9abac2..a8981b95b 100644 --- a/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSession.java +++ b/redisson-tomcat/redisson-tomcat-8/src/main/java/org/redisson/tomcat/RedissonSession.java @@ -15,6 +15,7 @@ */ package org.redisson.tomcat; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -33,11 +34,19 @@ import org.redisson.api.RMap; public class RedissonSession extends StandardSession { private final RedissonSessionManager redissonManager; + private final Map attrs; private RMap map; public RedissonSession(RedissonSessionManager manager) { super(manager); this.redissonManager = manager; + + try { + Field attr = StandardSession.class.getDeclaredField("attributes"); + attrs = (Map) attr.get(this); + } catch (Exception e) { + throw new IllegalStateException(e); + } } private static final long serialVersionUID = -2518607181636076487L; @@ -143,7 +152,7 @@ public class RedissonSession extends StandardSession { newMap.put("session:isValid", isValid); newMap.put("session:isNew", isNew); - for (Entry entry : attributes.entrySet()) { + for (Entry entry : attrs.entrySet()) { newMap.put(entry.getKey(), entry.getValue()); } diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index 4d929bbee..801e55f79 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -240,12 +240,12 @@ public class Redisson implements RedissonClient { @Override public RListMultimap getListMultimap(String name) { - return new RedissonListMultimap(connectionManager.getCommandExecutor(), name); + return new RedissonListMultimap(this, connectionManager.getCommandExecutor(), name); } @Override public RListMultimap getListMultimap(String name, Codec codec) { - return new RedissonListMultimap(codec, connectionManager.getCommandExecutor(), name); + return new RedissonListMultimap(this, codec, connectionManager.getCommandExecutor(), name); } @Override @@ -260,37 +260,37 @@ public class Redisson implements RedissonClient { @Override public RMap getMap(String name) { - return new RedissonMap(connectionManager.getCommandExecutor(), name); + return new RedissonMap(this, connectionManager.getCommandExecutor(), name); } @Override public RSetMultimap getSetMultimap(String name) { - return new RedissonSetMultimap(connectionManager.getCommandExecutor(), name); + return new RedissonSetMultimap(this, connectionManager.getCommandExecutor(), name); } @Override public RSetMultimapCache getSetMultimapCache(String name) { - return new RedissonSetMultimapCache(evictionScheduler, connectionManager.getCommandExecutor(), name); + return new RedissonSetMultimapCache(this, evictionScheduler, connectionManager.getCommandExecutor(), name); } @Override public RSetMultimapCache getSetMultimapCache(String name, Codec codec) { - return new RedissonSetMultimapCache(evictionScheduler, codec, connectionManager.getCommandExecutor(), name); + return new RedissonSetMultimapCache(this, evictionScheduler, codec, connectionManager.getCommandExecutor(), name); } @Override public RListMultimapCache getListMultimapCache(String name) { - return new RedissonListMultimapCache(evictionScheduler, connectionManager.getCommandExecutor(), name); + return new RedissonListMultimapCache(this, evictionScheduler, connectionManager.getCommandExecutor(), name); } @Override public RListMultimapCache getListMultimapCache(String name, Codec codec) { - return new RedissonListMultimapCache(evictionScheduler, codec, connectionManager.getCommandExecutor(), name); + return new RedissonListMultimapCache(this, evictionScheduler, codec, connectionManager.getCommandExecutor(), name); } @Override public RSetMultimap getSetMultimap(String name, Codec codec) { - return new RedissonSetMultimap(codec, connectionManager.getCommandExecutor(), name); + return new RedissonSetMultimap(this, codec, connectionManager.getCommandExecutor(), name); } @Override @@ -305,17 +305,17 @@ public class Redisson implements RedissonClient { @Override public RMapCache getMapCache(String name) { - return new RedissonMapCache(evictionScheduler, connectionManager.getCommandExecutor(), name); + return new RedissonMapCache(this, evictionScheduler, connectionManager.getCommandExecutor(), name); } @Override public RMapCache getMapCache(String name, Codec codec) { - return new RedissonMapCache(codec, evictionScheduler, connectionManager.getCommandExecutor(), name); + return new RedissonMapCache(this, codec, evictionScheduler, connectionManager.getCommandExecutor(), name); } @Override public RMap getMap(String name, Codec codec) { - return new RedissonMap(codec, connectionManager.getCommandExecutor(), name); + return new RedissonMap(this, codec, connectionManager.getCommandExecutor(), name); } @Override @@ -538,7 +538,7 @@ public class Redisson implements RedissonClient { @Override public RBatch createBatch() { - RedissonBatch batch = new RedissonBatch(evictionScheduler, connectionManager); + RedissonBatch batch = new RedissonBatch(this, evictionScheduler, connectionManager); if (config.isRedissonReferenceEnabled()) { batch.enableRedissonReferenceSupport(this); } diff --git a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java index 0f85a46d9..fc9151f8d 100644 --- a/redisson/src/main/java/org/redisson/RedissonBaseIterator.java +++ b/redisson/src/main/java/org/redisson/RedissonBaseIterator.java @@ -22,12 +22,15 @@ import java.util.List; import java.util.NoSuchElementException; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; + +import io.netty.buffer.ByteBuf; abstract class RedissonBaseIterator implements Iterator { - private List firstValues; - private List lastValues; - private Iterator lastIter; + private List firstValues; + private List lastValues; + private Iterator lastIter; protected long nextIterPos; protected InetSocketAddress client; @@ -40,6 +43,8 @@ abstract class RedissonBaseIterator implements Iterator { public boolean hasNext() { if (lastIter == null || !lastIter.hasNext()) { if (finished) { + free(firstValues); + free(lastValues); currentElementRemoved = false; removeExecuted = false; @@ -56,8 +61,12 @@ abstract class RedissonBaseIterator implements Iterator { long prevIterPos; do { prevIterPos = nextIterPos; - ListScanResult res = iterator(client, nextIterPos); - lastValues = new ArrayList(res.getValues()); + ListScanResult res = iterator(client, nextIterPos); + if (lastValues != null) { + free(lastValues); + } + + lastValues = convert(res.getValues()); client = res.getRedisClient(); if (nextIterPos == 0 && firstValues == null) { @@ -87,6 +96,9 @@ abstract class RedissonBaseIterator implements Iterator { } } } else if (lastValues.removeAll(firstValues)) { + free(firstValues); + free(lastValues); + currentElementRemoved = false; removeExecuted = false; client = null; @@ -111,11 +123,28 @@ abstract class RedissonBaseIterator implements Iterator { return lastIter.hasNext(); } + private List convert(List list) { + List result = new ArrayList(list.size()); + for (ScanObjectEntry entry : list) { + result.add(entry.getBuf()); + } + return result; + } + + private void free(List list) { + if (list == null) { + return; + } + for (ByteBuf byteBuf : list) { + byteBuf.release(); + } + } + protected boolean tryAgain() { return false; } - abstract ListScanResult iterator(InetSocketAddress client, long nextIterPos); + abstract ListScanResult iterator(InetSocketAddress client, long nextIterPos); @Override public V next() { @@ -123,7 +152,7 @@ abstract class RedissonBaseIterator implements Iterator { throw new NoSuchElementException("No such element"); } - value = lastIter.next(); + value = (V) lastIter.next().getObj(); currentElementRemoved = false; return value; } diff --git a/redisson/src/main/java/org/redisson/RedissonBatch.java b/redisson/src/main/java/org/redisson/RedissonBatch.java index fc7f9c774..dfd4181a2 100644 --- a/redisson/src/main/java/org/redisson/RedissonBatch.java +++ b/redisson/src/main/java/org/redisson/RedissonBatch.java @@ -41,6 +41,7 @@ import org.redisson.api.RScriptAsync; import org.redisson.api.RSetAsync; import org.redisson.api.RSetCacheAsync; import org.redisson.api.RTopicAsync; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; @@ -55,10 +56,12 @@ public class RedissonBatch implements RBatch { private final EvictionScheduler evictionScheduler; private final CommandBatchService executorService; + private final RedissonClient client; - protected RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { + protected RedissonBatch(RedissonClient client, EvictionScheduler evictionScheduler, ConnectionManager connectionManager) { this.executorService = new CommandBatchService(connectionManager); this.evictionScheduler = evictionScheduler; + this.client = client; } @Override @@ -93,12 +96,12 @@ public class RedissonBatch implements RBatch { @Override public RMapAsync getMap(String name) { - return new RedissonMap(executorService, name); + return new RedissonMap(client, executorService, name); } @Override public RMapAsync getMap(String name, Codec codec) { - return new RedissonMap(codec, executorService, name); + return new RedissonMap(client, codec, executorService, name); } @Override @@ -193,12 +196,12 @@ public class RedissonBatch implements RBatch { @Override public RMapCacheAsync getMapCache(String name, Codec codec) { - return new RedissonMapCache(codec, evictionScheduler, executorService, name); + return new RedissonMapCache(client, codec, evictionScheduler, executorService, name); } @Override public RMapCacheAsync getMapCache(String name) { - return new RedissonMapCache(evictionScheduler, executorService, name); + return new RedissonMapCache(client, evictionScheduler, executorService, name); } @Override @@ -243,22 +246,22 @@ public class RedissonBatch implements RBatch { @Override public RMultimapAsync getSetMultimap(String name) { - return new RedissonSetMultimap(executorService, name); + return new RedissonSetMultimap(client, executorService, name); } @Override public RMultimapAsync getSetMultimap(String name, Codec codec) { - return new RedissonSetMultimap(codec, executorService, name); + return new RedissonSetMultimap(client, codec, executorService, name); } @Override public RMultimapAsync getListMultimap(String name) { - return new RedissonListMultimap(executorService, name); + return new RedissonListMultimap(client, executorService, name); } @Override public RMultimapAsync getListMultimap(String name, Codec codec) { - return new RedissonListMultimap(codec, executorService, name); + return new RedissonListMultimap(client, codec, executorService, name); } @Override @@ -273,22 +276,22 @@ public class RedissonBatch implements RBatch { @Override public RMultimapCacheAsync getSetMultimapCache(String name) { - return new RedissonSetMultimapCache(evictionScheduler, executorService, name); + return new RedissonSetMultimapCache(client, evictionScheduler, executorService, name); } @Override public RMultimapCacheAsync getSetMultimapCache(String name, Codec codec) { - return new RedissonSetMultimapCache(evictionScheduler, codec, executorService, name); + return new RedissonSetMultimapCache(client, evictionScheduler, codec, executorService, name); } @Override public RMultimapCacheAsync getListMultimapCache(String name) { - return new RedissonListMultimapCache(evictionScheduler, executorService, name); + return new RedissonListMultimapCache(client, evictionScheduler, executorService, name); } @Override public RMultimapCacheAsync getListMultimapCache(String name, Codec codec) { - return new RedissonListMultimapCache(evictionScheduler, codec, executorService, name); + return new RedissonListMultimapCache(client, evictionScheduler, codec, executorService, name); } protected void enableRedissonReferenceSupport(Redisson redisson) { diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 84e994b7e..449cbdec4 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -33,9 +33,11 @@ import org.redisson.api.RFuture; import org.redisson.api.RKeys; import org.redisson.api.RType; import org.redisson.client.RedisException; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.connection.MasterSlaveEntry; @@ -104,12 +106,12 @@ public class RedissonKeys implements RKeys { return getKeysByPattern(null); } - private ListScanResult scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) { + private ListScanResult scanIterator(InetSocketAddress client, MasterSlaveEntry entry, long startPos, String pattern, int count) { if (pattern == null) { - RFuture> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); + RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "COUNT", count); return commandExecutor.get(f); } - RFuture> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); + RFuture> f = commandExecutor.readAsync(client, entry, new ScanCodec(StringCodec.INSTANCE), RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); return commandExecutor.get(f); } @@ -117,7 +119,7 @@ public class RedissonKeys implements RKeys { return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); } diff --git a/redisson/src/main/java/org/redisson/RedissonListMultimap.java b/redisson/src/main/java/org/redisson/RedissonListMultimap.java index 766377f7f..ef1703d15 100644 --- a/redisson/src/main/java/org/redisson/RedissonListMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonListMultimap.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RList; import org.redisson.api.RListMultimap; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; @@ -44,12 +45,12 @@ public class RedissonListMultimap extends RedissonMultimap implement private static final RedisStrictCommand LLEN_VALUE = new RedisStrictCommand("LLEN", new BooleanAmountReplayConvertor()); - RedissonListMultimap(CommandAsyncExecutor connectionManager, String name) { - super(connectionManager, name); + RedissonListMultimap(RedissonClient client, CommandAsyncExecutor connectionManager, String name) { + super(client, connectionManager, name); } - RedissonListMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) { - super(codec, connectionManager, name); + RedissonListMultimap(RedissonClient client, Codec codec, CommandAsyncExecutor connectionManager, String name) { + super(client, codec, connectionManager, name); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonListMultimapCache.java b/redisson/src/main/java/org/redisson/RedissonListMultimapCache.java index 3f2184d66..a30628f26 100644 --- a/redisson/src/main/java/org/redisson/RedissonListMultimapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonListMultimapCache.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RList; import org.redisson.api.RListMultimapCache; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; @@ -36,14 +37,14 @@ public class RedissonListMultimapCache extends RedissonListMultimap private final RedissonMultimapCache baseCache; - RedissonListMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { - super(connectionManager, name); + RedissonListMultimapCache(RedissonClient client, EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { + super(client, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } - RedissonListMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { - super(codec, connectionManager, name); + RedissonListMultimapCache(RedissonClient client, EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { + super(client, codec, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } diff --git a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java index b452837d4..36f90fdea 100644 --- a/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java +++ b/redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java @@ -190,12 +190,12 @@ public class RedissonLocalCachedMap extends RedissonMap implements R private int invalidationListenerId; protected RedissonLocalCachedMap(RedissonClient redisson, CommandAsyncExecutor commandExecutor, String name, LocalCachedMapOptions options) { - super(commandExecutor, name); + super(redisson, commandExecutor, name); init(redisson, name, options); } protected RedissonLocalCachedMap(RedissonClient redisson, Codec codec, CommandAsyncExecutor connectionManager, String name, LocalCachedMapOptions options) { - super(codec, connectionManager, name); + super(redisson, codec, connectionManager, name); init(redisson, name, options); } diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 490c71632..b8dc79afc 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.io.IOException; import java.math.BigDecimal; import java.net.InetSocketAddress; import java.util.AbstractCollection; @@ -29,9 +30,11 @@ import java.util.Map; import java.util.Set; import org.redisson.api.RFuture; +import org.redisson.api.RLock; import org.redisson.api.RMap; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -42,6 +45,7 @@ import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; import org.redisson.connection.decoder.MapGetAllDecoder; +import org.redisson.misc.Hash; /** * Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap} @@ -60,14 +64,33 @@ public class RedissonMap extends RedissonExpirable implements RMap { static final RedisCommand EVAL_REMOVE_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4, ValueType.MAP); static final RedisCommand EVAL_PUT = EVAL_REPLACE; - protected RedissonMap(CommandAsyncExecutor commandExecutor, String name) { + private final RedissonClient client; + + protected RedissonMap(RedissonClient client, CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); + this.client = client; } - public RedissonMap(Codec codec, CommandAsyncExecutor commandExecutor, String name) { + public RedissonMap(RedissonClient client, Codec codec, CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); + this.client = client; } + @Override + public RLock getLock(K key) { + String lockName = getLockName(key); + return client.getLock(lockName); + } + + private String getLockName(Object key) { + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + return "{" + getName() + "}:" + Hash.hashToBase64(keyState) + ":key"; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + @Override public int size() { return get(sizeAsync()); @@ -431,7 +454,7 @@ public class RedissonMap extends RedissonExpirable implements RMap { MapScanResult scanIterator(String name, InetSocketAddress client, long startPos) { RFuture> f - = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos); + = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); return get(f); } diff --git a/redisson/src/main/java/org/redisson/RedissonMapCache.java b/redisson/src/main/java/org/redisson/RedissonMapCache.java index 0c1aee5a7..e260ee7f1 100644 --- a/redisson/src/main/java/org/redisson/RedissonMapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonMapCache.java @@ -27,9 +27,10 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RMapCache; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -84,13 +85,13 @@ public class RedissonMapCache extends RedissonMap implements RMapCac private static final RedisCommand EVAL_CONTAINS_VALUE = new RedisCommand("EVAL", new BooleanReplayConvertor(), 7, ValueType.MAP_VALUE); private static final RedisCommand EVAL_FAST_REMOVE = new RedisCommand("EVAL", 5, ValueType.MAP_KEY); - protected RedissonMapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) { - super(commandExecutor, name); + protected RedissonMapCache(RedissonClient redisson, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) { + super(redisson, commandExecutor, name); evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName()); } - public RedissonMapCache(Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) { - super(codec, commandExecutor, name); + public RedissonMapCache(RedissonClient redisson, Codec codec, EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) { + super(redisson, codec, commandExecutor, name); evictionScheduler.schedule(getName(), getTimeoutSetName(), getIdleSetName()); } @@ -530,7 +531,7 @@ public class RedissonMapCache extends RedissonMap implements RMapCac @Override MapScanResult scanIterator(String name, InetSocketAddress client, long startPos) { RedisCommand> EVAL_HSCAN = new RedisCommand>("EVAL", - new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new ScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); + new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new MapScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); RFuture> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN, "local result = {}; " + "local idleKeys = {}; " diff --git a/redisson/src/main/java/org/redisson/RedissonMultimap.java b/redisson/src/main/java/org/redisson/RedissonMultimap.java index 45afa3e46..7bd673874 100644 --- a/redisson/src/main/java/org/redisson/RedissonMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonMultimap.java @@ -15,6 +15,7 @@ */ package org.redisson; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.AbstractCollection; import java.util.AbstractSet; @@ -29,10 +30,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; +import org.redisson.api.RLock; import org.redisson.api.RMultimap; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -49,14 +52,33 @@ import org.redisson.misc.Hash; */ public abstract class RedissonMultimap extends RedissonExpirable implements RMultimap { - RedissonMultimap(CommandAsyncExecutor connectionManager, String name) { + private final RedissonClient client; + + RedissonMultimap(RedissonClient client, CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); + this.client = client; } - RedissonMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) { + RedissonMultimap(RedissonClient client, Codec codec, CommandAsyncExecutor connectionManager, String name) { super(codec, connectionManager, name); + this.client = client; } + @Override + public RLock getLock(K key) { + String lockName = getLockName(key); + return client.getLock(lockName); + } + + private String getLockName(Object key) { + try { + byte[] keyState = codec.getMapKeyEncoder().encode(key); + return "{" + getName() + "}:" + Hash.hashToBase64(keyState) + ":key"; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + protected String hash(byte[] objectState) { return Hash.hashToBase64(objectState); } @@ -249,7 +271,7 @@ public abstract class RedissonMultimap extends RedissonExpirable implement MapScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); + RFuture> f = commandExecutor.readAsync(client, getName(), new MapScanCodec(codec, StringCodec.INSTANCE), RedisCommands.HSCAN, getName(), startPos); return get(f); } diff --git a/redisson/src/main/java/org/redisson/RedissonNode.java b/redisson/src/main/java/org/redisson/RedissonNode.java index 2ed91c3c8..52e30a141 100644 --- a/redisson/src/main/java/org/redisson/RedissonNode.java +++ b/redisson/src/main/java/org/redisson/RedissonNode.java @@ -147,7 +147,7 @@ public class RedissonNode { private void retrieveAdresses() { ConnectionManager connectionManager = ((Redisson)redisson).getConnectionManager(); for (MasterSlaveEntry entry : connectionManager.getEntrySet()) { - RFuture readFuture = entry.connectionReadOp(RedisCommands.PUBLISH); + RFuture readFuture = entry.connectionReadOp(null); if (readFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout()) && readFuture.isSuccess()) { RedisConnection connection = readFuture.getNow(); @@ -156,7 +156,7 @@ public class RedissonNode { localAddress = (InetSocketAddress) connection.getChannel().localAddress(); return; } - RFuture writeFuture = entry.connectionWriteOp(RedisCommands.PUBLISH); + RFuture writeFuture = entry.connectionWriteOp(null); if (writeFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout()) && writeFuture.isSuccess()) { RedisConnection connection = writeFuture.getNow(); diff --git a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java index 771483f56..a4d7cb95f 100644 --- a/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java +++ b/redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java @@ -34,6 +34,7 @@ import org.redisson.api.SortOrder; import org.redisson.client.codec.Codec; import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.LongCodec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.ScoredCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -41,6 +42,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -259,8 +261,8 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), o); } - private ListScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); + private ListScanResult scanIterator(InetSocketAddress client, long startPos) { + RFuture> f = commandExecutor.readAsync(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); return get(f); } @@ -269,7 +271,7 @@ public class RedissonScoredSortedSet extends RedissonExpirable implements RSc return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return scanIterator(client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 3d88107e9..524cae608 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -28,11 +28,13 @@ import org.redisson.api.RFuture; import org.redisson.api.RSet; import org.redisson.api.SortOrder; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -81,8 +83,8 @@ public class RedissonSet extends RedissonExpirable implements RSet { return getName(); } - ListScanResult scanIterator(String name, InetSocketAddress client, long startPos) { - RFuture> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos); + ListScanResult scanIterator(String name, InetSocketAddress client, long startPos) { + RFuture> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos); return get(f); } @@ -91,7 +93,7 @@ public class RedissonSet extends RedissonExpirable implements RSet { return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return scanIterator(getName(), client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 408853ce6..451edd4ef 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -28,12 +28,14 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RSetCache; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -102,13 +104,13 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< Arrays.asList(getName()), System.currentTimeMillis(), o); } - ListScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = scanIteratorAsync(client, startPos); + ListScanResult scanIterator(InetSocketAddress client, long startPos) { + RFuture> f = scanIteratorAsync(client, startPos); return get(f); } - public RFuture> scanIteratorAsync(InetSocketAddress client, long startPos) { - return commandExecutor.evalReadAsync(client, getName(), codec, RedisCommands.EVAL_ZSCAN, + public RFuture> scanIteratorAsync(InetSocketAddress client, long startPos) { + return commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), RedisCommands.EVAL_ZSCAN, "local result = {}; " + "local res = redis.call('zscan', KEYS[1], ARGV[1]); " + "for i, value in ipairs(res[2]) do " @@ -127,7 +129,7 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return scanIterator(client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimap.java b/redisson/src/main/java/org/redisson/RedissonSetMultimap.java index 30945073f..99f5269ad 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimap.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimap.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RSet; import org.redisson.api.RSetMultimap; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -47,12 +48,12 @@ public class RedissonSetMultimap extends RedissonMultimap implements private static final RedisStrictCommand SCARD_VALUE = new RedisStrictCommand("SCARD", new BooleanAmountReplayConvertor()); private static final RedisCommand SISMEMBER_VALUE = new RedisCommand("SISMEMBER", new BooleanReplayConvertor()); - RedissonSetMultimap(CommandAsyncExecutor connectionManager, String name) { - super(connectionManager, name); + RedissonSetMultimap(RedissonClient client, CommandAsyncExecutor connectionManager, String name) { + super(client, connectionManager, name); } - RedissonSetMultimap(Codec codec, CommandAsyncExecutor connectionManager, String name) { - super(codec, connectionManager, name); + RedissonSetMultimap(RedissonClient client, Codec codec, CommandAsyncExecutor connectionManager, String name) { + super(client, codec, connectionManager, name); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapCache.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapCache.java index b212c2513..f683d1cab 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapCache.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.redisson.api.RFuture; import org.redisson.api.RSet; import org.redisson.api.RSetMultimapCache; +import org.redisson.api.RedissonClient; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandAsyncExecutor; @@ -36,14 +37,14 @@ public class RedissonSetMultimapCache extends RedissonSetMultimap im private final RedissonMultimapCache baseCache; - RedissonSetMultimapCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { - super(connectionManager, name); + RedissonSetMultimapCache(RedissonClient client, EvictionScheduler evictionScheduler, CommandAsyncExecutor connectionManager, String name) { + super(client, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } - RedissonSetMultimapCache(EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { - super(codec, connectionManager, name); + RedissonSetMultimapCache(RedissonClient client, EvictionScheduler evictionScheduler, Codec codec, CommandAsyncExecutor connectionManager, String name) { + super(client, codec, connectionManager, name); evictionScheduler.scheduleCleanMultimap(name, getTimeoutSetName()); baseCache = new RedissonMultimapCache(connectionManager, name, codec, getTimeoutSetName()); } diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index 049f0d292..239a7d771 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -29,6 +29,7 @@ import org.redisson.api.RFuture; import org.redisson.api.RSet; import org.redisson.api.SortOrder; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -39,6 +40,7 @@ import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandAsyncExecutor; /** @@ -161,8 +163,8 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R Arrays.asList(timeoutSetName, getName()), System.currentTimeMillis(), key, o); } - private ListScanResult scanIterator(InetSocketAddress client, long startPos) { - RFuture> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_SSCAN, + private ListScanResult scanIterator(InetSocketAddress client, long startPos) { + RFuture> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_SSCAN, "local expireDate = 92233720368547758; " + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " + "if expireDateScore ~= false then " @@ -182,7 +184,7 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R return new RedissonBaseIterator() { @Override - ListScanResult iterator(InetSocketAddress client, long nextIterPos) { + ListScanResult iterator(InetSocketAddress client, long nextIterPos) { return scanIterator(client, nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index 415985715..cae8c6738 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -33,6 +33,14 @@ import java.util.concurrent.ConcurrentMap; */ public interface RMap extends ConcurrentMap, RExpirable, RMapAsync { + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return lock + */ + RLock getLock(K key); + /** * Returns size of value mapped by key in bytes * diff --git a/redisson/src/main/java/org/redisson/api/RMultimap.java b/redisson/src/main/java/org/redisson/api/RMultimap.java index 05f9b94f9..31212d450 100644 --- a/redisson/src/main/java/org/redisson/api/RMultimap.java +++ b/redisson/src/main/java/org/redisson/api/RMultimap.java @@ -29,6 +29,14 @@ import java.util.Set; */ public interface RMultimap extends RExpirable, RMultimapAsync { + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return lock + */ + RLock getLock(K key); + /** * Returns the number of key-value pairs in this multimap. * diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index e0fe7c07c..23c64625c 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -52,15 +52,14 @@ public class RedisConnection implements RedisCommands { private long lastUsageTime; public RedisConnection(RedisClient redisClient, Channel channel) { - super(); - this.redisClient = redisClient; + this(redisClient); updateChannel(channel); lastUsageTime = System.currentTimeMillis(); } - protected RedisConnection() { - redisClient = null; + protected RedisConnection(RedisClient redisClient) { + this.redisClient = redisClient; } public static C getFrom(Channel channel) { diff --git a/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java new file mode 100644 index 000000000..8ee35fd74 --- /dev/null +++ b/redisson/src/main/java/org/redisson/client/codec/MapScanCodec.java @@ -0,0 +1,100 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.client.codec; + +import java.io.IOException; + +import org.redisson.client.handler.State; +import org.redisson.client.protocol.Decoder; +import org.redisson.client.protocol.Encoder; +import org.redisson.client.protocol.decoder.ScanObjectEntry; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * + * @author Nikita Koksharov + * + */ +public class MapScanCodec implements Codec { + + private final Codec delegate; + private final Codec mapValueCodec; + + public MapScanCodec(Codec delegate) { + this(delegate, null); + } + + public MapScanCodec(Codec delegate, Codec mapValueCodec) { + this.delegate = delegate; + this.mapValueCodec = mapValueCodec; + } + + @Override + public Decoder getValueDecoder() { + return delegate.getValueDecoder(); + } + + @Override + public Encoder getValueEncoder() { + return delegate.getValueEncoder(); + } + + @Override + public Decoder getMapValueDecoder() { + return new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + ByteBuf b = Unpooled.copiedBuffer(buf); + Codec c = delegate; + if (mapValueCodec != null) { + c = mapValueCodec; + } + Object val = c.getMapValueDecoder().decode(buf, state); + return new ScanObjectEntry(b, val); + } + }; + } + + @Override + public Encoder getMapValueEncoder() { + Codec c = delegate; + if (mapValueCodec != null) { + c = mapValueCodec; + } + + return c.getMapValueEncoder(); + } + + @Override + public Decoder getMapKeyDecoder() { + return new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + ByteBuf b = Unpooled.copiedBuffer(buf); + Object val = delegate.getMapKeyDecoder().decode(buf, state); + return new ScanObjectEntry(b, val); + } + }; + } + + @Override + public Encoder getMapKeyEncoder() { + return delegate.getMapKeyEncoder(); + } + +} diff --git a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java index 407eb13ce..d4d0c6335 100644 --- a/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java +++ b/redisson/src/main/java/org/redisson/client/codec/ScanCodec.java @@ -33,20 +33,21 @@ import io.netty.buffer.Unpooled; public class ScanCodec implements Codec { private final Codec delegate; - private final Codec mapValueCodec; public ScanCodec(Codec delegate) { - this(delegate, null); - } - - public ScanCodec(Codec delegate, Codec mapValueCodec) { this.delegate = delegate; - this.mapValueCodec = mapValueCodec; } @Override public Decoder getValueDecoder() { - return delegate.getValueDecoder(); + return new Decoder() { + @Override + public Object decode(ByteBuf buf, State state) throws IOException { + ByteBuf b = Unpooled.copiedBuffer(buf); + Object val = delegate.getValueDecoder().decode(buf, state); + return new ScanObjectEntry(b, val); + } + }; } @Override @@ -56,40 +57,17 @@ public class ScanCodec implements Codec { @Override public Decoder getMapValueDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - ByteBuf b = Unpooled.copiedBuffer(buf); - Codec c = delegate; - if (mapValueCodec != null) { - c = mapValueCodec; - } - Object val = c.getMapValueDecoder().decode(buf, state); - return new ScanObjectEntry(b, val); - } - }; + return delegate.getMapValueDecoder(); } @Override public Encoder getMapValueEncoder() { - Codec c = delegate; - if (mapValueCodec != null) { - c = mapValueCodec; - } - - return c.getMapValueEncoder(); + return delegate.getMapValueEncoder(); } @Override public Decoder getMapKeyDecoder() { - return new Decoder() { - @Override - public Object decode(ByteBuf buf, State state) throws IOException { - ByteBuf b = Unpooled.copiedBuffer(buf); - Object val = delegate.getMapKeyDecoder().decode(buf, state); - return new ScanObjectEntry(b, val); - } - }; + return delegate.getMapKeyDecoder(); } @Override diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 5b3a2721c..677898bc2 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -804,22 +804,47 @@ public class CommandAsyncService implements CommandAsyncExecutor { } private void handleReference(RPromise mainPromise, R res) { - if (res instanceof List || res instanceof ListScanResult) { - List r = res instanceof ListScanResult ? ((ListScanResult)res).getValues() : (List) res; + if (res instanceof List) { + List r = (List)res; for (int i = 0; i < r.size(); i++) { if (r.get(i) instanceof RedissonReference) { try { - r.set(i ,(redisson != null - ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i)) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i)))); + r.set(i, redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) r.get(i)) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) r.get(i))); } catch (Exception exception) {//skip and carry on to next one. } } else if (r.get(i) instanceof ScoredEntry && ((ScoredEntry) r.get(i)).getValue() instanceof RedissonReference) { try { - ScoredEntry se = ((ScoredEntry) r.get(i)); - r.set(i ,new ScoredEntry(se.getScore(), redisson != null + ScoredEntry se = ((ScoredEntry) r.get(i)); + se = new ScoredEntry(se.getScore(), redisson != null ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) se.getValue()) - : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue()))); + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue())); + r.set(i, se); + } catch (Exception exception) {//skip and carry on to next one. + } + } + } + mainPromise.trySuccess(res); + } else if (res instanceof ListScanResult) { + List r = ((ListScanResult)res).getValues(); + for (int i = 0; i < r.size(); i++) { + ScanObjectEntry e = r.get(i); + if (e.getObj() instanceof RedissonReference) { + try { + r.set(i , new ScanObjectEntry(e.getBuf(), redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) e.getObj()) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) e.getObj()))); + } catch (Exception exception) {//skip and carry on to next one. + } + } else if (e.getObj() instanceof ScoredEntry && ((ScoredEntry) e.getObj()).getValue() instanceof RedissonReference) { + try { + ScoredEntry se = ((ScoredEntry) e.getObj()); + se = new ScoredEntry(se.getScore(), redisson != null + ? RedissonObjectFactory.fromReference(redisson, (RedissonReference) se.getValue()) + : RedissonObjectFactory.fromReference(redissonReactive, (RedissonReference) se.getValue())); + + r.set(i, new ScanObjectEntry(e.getBuf(), se)); } catch (Exception exception) {//skip and carry on to next one. } } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 7ede38699..635f9e494 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -201,7 +201,7 @@ abstract class ConnectionPool { return connectionManager.newFailedFuture(exception); } - private RFuture acquireConnection(RedisCommand command, ClientConnectionsEntry entry) { + private RFuture acquireConnection(RedisCommand command, final ClientConnectionsEntry entry) { final RPromise result = connectionManager.newPromise(); acquireConnection(entry, new Runnable() { @Override diff --git a/redisson/src/main/java/org/redisson/jcache/JCache.java b/redisson/src/main/java/org/redisson/jcache/JCache.java index 67490046b..4c9c0c21a 100644 --- a/redisson/src/main/java/org/redisson/jcache/JCache.java +++ b/redisson/src/main/java/org/redisson/jcache/JCache.java @@ -58,7 +58,7 @@ import org.redisson.api.RLock; import org.redisson.api.RSemaphore; import org.redisson.api.RTopic; import org.redisson.api.listener.MessageListener; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -2089,7 +2089,7 @@ public class JCache extends RedissonObject implements Cache { MapScanResult scanIterator(String name, InetSocketAddress client, long startPos) { RFuture> f - = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.HSCAN, name, startPos); + = commandExecutor.readAsync(client, name, new MapScanCodec(codec), RedisCommands.HSCAN, name, startPos); return get(f); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index adfd96518..7f53beccf 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -19,7 +19,6 @@ import java.util.List; import org.reactivestreams.Publisher; import org.redisson.EvictionScheduler; -import org.redisson.Redisson; import org.redisson.api.RAtomicLongReactive; import org.redisson.api.RBatchReactive; import org.redisson.api.RBitSetReactive; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java index dc7599072..d4b99f93f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapCacheReactive.java @@ -30,7 +30,7 @@ import org.redisson.EvictionScheduler; import org.redisson.api.RMapCacheReactive; import org.redisson.client.codec.Codec; import org.redisson.client.codec.LongCodec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommands; @@ -342,7 +342,7 @@ public class RedissonMapCacheReactive extends RedissonMapReactive im @Override Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { - return commandExecutor.evalReadReactive(client, getName(), new ScanCodec(codec), EVAL_HSCAN, + return commandExecutor.evalReadReactive(client, getName(), new MapScanCodec(codec), EVAL_HSCAN, "local result = {}; " + "local res = redis.call('hscan', KEYS[1], ARGV[1]); " + "for i, value in ipairs(res[2]) do " diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index 918e330dc..523a3c990 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -24,7 +24,7 @@ import org.reactivestreams.Publisher; import org.redisson.RedissonMap; import org.redisson.api.RMapReactive; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.ScanCodec; +import org.redisson.client.codec.MapScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -50,12 +50,12 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme public RedissonMapReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); - instance = new RedissonMap(codec, commandExecutor, name); + instance = new RedissonMap(null, codec, commandExecutor, name); } public RedissonMapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { super(codec, commandExecutor, name); - instance = new RedissonMap(codec, commandExecutor, name); + instance = new RedissonMap(null, codec, commandExecutor, name); } @Override @@ -130,7 +130,7 @@ public class RedissonMapReactive extends RedissonExpirableReactive impleme } Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { - return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.HSCAN, getName(), startPos); + return commandExecutor.readReactive(client, getName(), new MapScanCodec(codec), RedisCommands.HSCAN, getName(), startPos); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 927b4056d..7f87a584d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -23,6 +23,7 @@ import java.util.Collections; import org.reactivestreams.Publisher; import org.redisson.api.RScoredSortedSetReactive; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand.ValueType; @@ -30,6 +31,7 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; public class RedissonScoredSortedSetReactive extends RedissonExpirableReactive implements RScoredSortedSetReactive { @@ -122,15 +124,15 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv return commandExecutor.readReactive(getName(), codec, RedisCommands.ZRANK, getName(), o); } - private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { - return commandExecutor.readReactive(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); + private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.ZSCAN, getName(), startPos); } @Override public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index cac153850..be8fd738b 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -30,6 +30,7 @@ import org.redisson.api.RSetCacheReactive; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; /** @@ -76,7 +77,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple return reactive(instance.containsAsync(o)); } - Publisher> scanIterator(InetSocketAddress client, long startPos) { + Publisher> scanIterator(InetSocketAddress client, long startPos) { return reactive(instance.scanIteratorAsync(client, startPos)); } @@ -84,7 +85,7 @@ public class RedissonSetCacheReactive extends RedissonExpirableReactive imple public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 0a9ea284d..fa5db8e9f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -26,8 +26,10 @@ import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RSetReactive; import org.redisson.client.codec.Codec; +import org.redisson.client.codec.ScanCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; import org.redisson.command.CommandReactiveExecutor; /** @@ -66,8 +68,8 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements return reactive(instance.containsAsync(o)); } - private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { - return commandExecutor.readReactive(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); + private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + return commandExecutor.readReactive(client, getName(), new ScanCodec(codec), RedisCommands.SSCAN, getName(), startPos); } @Override @@ -156,7 +158,7 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements public Publisher iterator() { return new SetReactiveIterator() { @Override - protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { + protected Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos) { return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos); } }; diff --git a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java index 787866822..95adcd488 100644 --- a/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java +++ b/redisson/src/main/java/org/redisson/reactive/SetReactiveIterator.java @@ -16,13 +16,16 @@ package org.redisson.reactive; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ScanObjectEntry; +import io.netty.buffer.ByteBuf; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -32,28 +35,27 @@ public abstract class SetReactiveIterator extends Stream { public void subscribe(final Subscriber t) { t.onSubscribe(new ReactiveSubscription(this, t) { - private List firstValues; + private List firstValues; + private List lastValues; private long nextIterPos; private InetSocketAddress client; - private long currentIndex; + private boolean finished; @Override protected void onRequest(long n) { - currentIndex = n; - nextValues(); } - private void handle(List vals) { - for (V val : vals) { - onNext(val); + private void handle(List vals) { + for (ScanObjectEntry val : vals) { + onNext((V)val.getObj()); } } protected void nextValues() { final ReactiveSubscription m = this; - scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { + scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { @Override public void onSubscribe(Subscription s) { @@ -61,32 +63,68 @@ public abstract class SetReactiveIterator extends Stream { } @Override - public void onNext(ListScanResult res) { - client = res.getRedisClient(); - - long prevIterPos = nextIterPos; - if (nextIterPos == 0 && firstValues == null) { - firstValues = res.getValues(); - } else if (res.getValues().equals(firstValues)) { - m.onComplete(); - currentIndex = 0; + public void onNext(ListScanResult res) { + if (finished) { + free(firstValues); + free(lastValues); + + client = null; + firstValues = null; + lastValues = null; + nextIterPos = 0; return; } - nextIterPos = res.getPos(); - if (prevIterPos == nextIterPos) { - nextIterPos = -1; + long prevIterPos = nextIterPos; + if (lastValues != null) { + free(lastValues); + } + + lastValues = convert(res.getValues()); + client = res.getRedisClient(); + + if (nextIterPos == 0 && firstValues == null) { + firstValues = lastValues; + lastValues = null; + if (firstValues.isEmpty()) { + client = null; + firstValues = null; + nextIterPos = 0; + prevIterPos = -1; + } + } else { + if (firstValues.isEmpty()) { + firstValues = lastValues; + lastValues = null; + if (firstValues.isEmpty()) { + if (res.getPos() == 0) { + finished = true; + m.onComplete(); + return; + } + } + } else if (lastValues.removeAll(firstValues)) { + free(firstValues); + free(lastValues); + + client = null; + firstValues = null; + lastValues = null; + nextIterPos = 0; + prevIterPos = -1; + finished = true; + m.onComplete(); + return; + } } handle(res.getValues()); - if (currentIndex == 0) { - return; - } - - if (nextIterPos == -1) { + nextIterPos = res.getPos(); + + if (prevIterPos == nextIterPos) { + finished = true; m.onComplete(); - currentIndex = 0; } } @@ -97,7 +135,7 @@ public abstract class SetReactiveIterator extends Stream { @Override public void onComplete() { - if (currentIndex == 0) { + if (finished) { return; } nextValues(); @@ -106,7 +144,24 @@ public abstract class SetReactiveIterator extends Stream { } }); } + + private void free(List list) { + if (list == null) { + return; + } + for (ByteBuf byteBuf : list) { + byteBuf.release(); + } + } + + private List convert(List list) { + List result = new ArrayList(list.size()); + for (ScanObjectEntry entry : list) { + result.add(entry.getBuf()); + } + return result; + } - protected abstract Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos); + protected abstract Publisher> scanIteratorReactive(InetSocketAddress client, long nextIterPos); } diff --git a/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java b/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java index 40025dd99..fc44f108d 100644 --- a/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java +++ b/redisson/src/main/java/org/redisson/spring/cache/RedissonCache.java @@ -15,7 +15,6 @@ */ package org.redisson.spring.cache; -import java.io.IOException; import java.lang.reflect.Constructor; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -24,7 +23,6 @@ import org.redisson.api.RLock; import org.redisson.api.RMap; import org.redisson.api.RMapCache; import org.redisson.api.RedissonClient; -import org.redisson.misc.Hash; import org.springframework.cache.Cache; import org.springframework.cache.support.SimpleValueWrapper; @@ -126,8 +124,7 @@ public class RedissonCache implements Cache { public T get(Object key, Callable valueLoader) { Object value = map.get(key); if (value == null) { - String lockName = getLockName(key); - RLock lock = redisson.getLock(lockName); + RLock lock = map.getLock(key); lock.lock(); try { value = map.get(key); @@ -154,15 +151,6 @@ public class RedissonCache implements Cache { return (T) fromStoreValue(value); } - private String getLockName(Object key) { - try { - byte[] keyState = redisson.getConfig().getCodec().getMapKeyEncoder().encode(key); - return "{" + map.getName() + "}:" + Hash.hashToBase64(keyState) + ":key"; - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - protected Object fromStoreValue(Object storeValue) { if (storeValue == NullValue.INSTANCE) { return null; diff --git a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java index cc9c6605a..e295a9306 100644 --- a/redisson/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonScoredSortedSetReactiveTest.java @@ -86,9 +86,9 @@ public class RedissonScoredSortedSetReactiveTest extends BaseReactiveTest { @Test public void testRemoveAsync() throws InterruptedException, ExecutionException { RScoredSortedSetReactive set = redisson.getScoredSortedSet("simple"); - set.add(0.11, 1); - set.add(0.22, 3); - set.add(0.33, 7); + sync(set.add(0.11, 1)); + sync(set.add(0.22, 3)); + sync(set.add(0.33, 7)); Assert.assertTrue(sync(set.remove(1))); Assert.assertFalse(sync(set.contains(1))); diff --git a/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java index 37b3372f4..6fd89186f 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java @@ -75,10 +75,10 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest { assertThat(sync(set.add("123", 1, TimeUnit.SECONDS))).isFalse(); - Thread.sleep(50); + Thread.sleep(800); assertThat(sync(set.contains("123"))).isTrue(); - Thread.sleep(150); + Thread.sleep(250); assertThat(sync(set.contains("123"))).isFalse(); } @@ -104,12 +104,15 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest { } @Test - public void testIteratorSequence() { + public void testIteratorSequence() throws InterruptedException { RSetCacheReactive set = redisson.getSetCache("set"); for (int i = 0; i < 1000; i++) { - sync(set.add(Long.valueOf(i))); + set.add(Long.valueOf(i)); } + Thread.sleep(1000); + assertThat(sync(set.size())).isEqualTo(1000); + Set setCopy = new HashSet(); for (int i = 0; i < 1000; i++) { setCopy.add(Long.valueOf(i));