diff --git a/README.md b/README.md index ae1bbea14..936e10059 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis client +Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis Java client ==== [Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.8.2) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)** @@ -12,17 +12,17 @@ Based on high-performance async and lock-free Java Redis client and [Netty](http Features ================================ -* Replicated servers mode (also supports [AWS ElastiCache](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Replication.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)): +* Replicated Redis servers mode (also supports [AWS ElastiCache](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Replication.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)): 1. automatic master server change discovery -* Cluster servers mode (also supports [AWS ElastiCache Cluster](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Clusters.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)): +* Clustered Redis servers mode (also supports [AWS ElastiCache Cluster](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Clusters.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)): 1. automatic master and slave servers discovery 2. automatic status and topology update 3. automatic slots change discovery -* Sentinel servers mode: +* Sentinel Redis servers mode: 1. automatic master, slave and sentinel servers discovery 2. automatic status and topology update -* Master with Slave servers mode -* Single server mode +* Master with Slave Redis servers mode +* Single Redis server mode * Thread-safe implementation * [Reactive Streams](https://github.com/redisson/redisson/wiki/3.-operations-execution#32-reactive-way) API * [Asynchronous](https://github.com/redisson/redisson/wiki/3.-operations-execution#31-async-way) API @@ -137,14 +137,14 @@ Config = ... // 2. Create Redisson instance RedissonClient redisson = Redisson.create(config); -// 3. Get object you need +// 3. Get Redis based object or service you need RMap map = redisson.getMap("myMap"); RLock lock = redisson.getLock("myLock"); RExecutorService executor = redisson.getExecutorService("myExecutorService"); -// over 30 different objects and services ... +// over 30 different Redis based objects and services ... ``` diff --git a/redisson-spring-boot-starter/README.md b/redisson-spring-boot-starter/README.md index 384965187..1e91dc9d0 100644 --- a/redisson-spring-boot-starter/README.md +++ b/redisson-spring-boot-starter/README.md @@ -46,6 +46,8 @@ Usage ### 2. Add settings into `application.settings` file +Common spring boot settings or Redisson settings could be used. + ```properties # common spring boot settings diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index aee5fd6cf..45df4ab55 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -124,6 +124,10 @@ public class Redisson implements RedissonClient { evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor()); } + public SemaphorePubSub getSemaphorePubSub() { + return semaphorePubSub; + } + public EvictionScheduler getEvictionScheduler() { return evictionScheduler; } diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index 407a1fdd3..257e05505 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -45,7 +45,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { private final String threadsQueueName; private final String timeoutSetName; - protected RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { + public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; threadsQueueName = prefixName("redisson_lock_queue", name); diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 18b6f4bd9..a50e0f103 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -33,11 +33,14 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.MapOptions; +import org.redisson.api.RCountDownLatch; import org.redisson.api.MapOptions.WriteMode; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RMap; +import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; @@ -106,7 +109,31 @@ public class RedissonMap extends RedissonExpirable implements RMap { public RMapReduce mapReduce() { return new RedissonMapReduce(this, redisson, commandExecutor.getConnectionManager()); } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) { + String lockName = getLockName(key, "permitexpirablesemaphore"); + return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + @Override + public RSemaphore getSemaphore(K key) { + String lockName = getLockName(key, "semaphore"); + return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RCountDownLatch getCountDownLatch(K key) { + String lockName = getLockName(key, "countdownlatch"); + return new RedissonCountDownLatch(commandExecutor, lockName); + } + + @Override + public RLock getFairLock(K key) { + String lockName = getLockName(key, "fairlock"); + return new RedissonFairLock(commandExecutor, lockName); + } + @Override public RLock getLock(K key) { String lockName = getLockName(key, "lock"); diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 6e8586411..eaf1e60e9 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -112,6 +112,22 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getReferenceCodecProvider(); } + public EvictionScheduler getEvictionScheduler() { + return evictionScheduler; + } + + public ConnectionManager getConnectionManager() { + return connectionManager; + } + + public CommandReactiveService getCommandExecutor() { + return commandExecutor; + } + + public SemaphorePubSub getSemaphorePubSub() { + return semaphorePubSub; + } + @Override public RStreamReactive getStream(String name) { return ReactiveProxyBuilder.create(commandExecutor, new RedissonStream(commandExecutor, name), RStreamReactive.class); @@ -239,41 +255,41 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RSetMultimapReactive getSetMultimap(String name) { return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap(commandExecutor, name), - new RedissonSetMultimapReactive(commandExecutor, name), RSetMultimapReactive.class); + new RedissonSetMultimapReactive(commandExecutor, name, this), RSetMultimapReactive.class); } @Override public RSetMultimapReactive getSetMultimap(String name, Codec codec) { return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap(codec, commandExecutor, name), - new RedissonSetMultimapReactive(codec, commandExecutor, name), RSetMultimapReactive.class); + new RedissonSetMultimapReactive(codec, commandExecutor, name, this), RSetMultimapReactive.class); } @Override public RMapReactive getMap(String name) { RedissonMap map = new RedissonMap(commandExecutor, name, null, null); return ReactiveProxyBuilder.create(commandExecutor, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, this), RMapReactive.class); } @Override public RMapReactive getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, null); return ReactiveProxyBuilder.create(commandExecutor, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, this), RMapReactive.class); } @Override public RSetReactive getSet(String name) { RedissonSet set = new RedissonSet(commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, this), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, this), RSetReactive.class); } @Override @@ -362,14 +378,14 @@ public class RedissonReactive implements RedissonReactiveClient { public RSetCacheReactive getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, this), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, this), RSetCacheReactive.class); } @Override @@ -473,20 +489,18 @@ public class RedissonReactive implements RedissonReactiveClient { new RedissonMapCacheReactive(map), RMapCacheReactive.class); } - @Override public RMapReactive getMap(String name, MapOptions options) { RedissonMap map = new RedissonMap(commandExecutor, name, null, options); return ReactiveProxyBuilder.create(commandExecutor, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, this), RMapReactive.class); } - @Override public RMapReactive getMap(String name, Codec codec, MapOptions options) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, options); return ReactiveProxyBuilder.create(commandExecutor, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, this), RMapReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 53938b1fc..49e73df7d 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -226,41 +226,41 @@ public class RedissonRx implements RedissonRxClient { @Override public RSetMultimapRx getSetMultimap(String name) { return RxProxyBuilder.create(commandExecutor, new RedissonSetMultimap(commandExecutor, name), - new RedissonSetMultimapRx(commandExecutor, name), RSetMultimapRx.class); + new RedissonSetMultimapRx(commandExecutor, name, this), RSetMultimapRx.class); } @Override public RSetMultimapRx getSetMultimap(String name, Codec codec) { return RxProxyBuilder.create(commandExecutor, new RedissonSetMultimap(codec, commandExecutor, name), - new RedissonSetMultimapRx(codec, commandExecutor, name), RSetMultimapRx.class); + new RedissonSetMultimapRx(codec, commandExecutor, name, this), RSetMultimapRx.class); } @Override public RMapRx getMap(String name) { RedissonMap map = new RedissonMap(commandExecutor, name, null, null); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, this), RMapRx.class); } @Override public RMapRx getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, null); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, this), RMapRx.class); } @Override public RSetRx getSet(String name) { RedissonSet set = new RedissonSet(commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, this), RSetRx.class); } @Override public RSetRx getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, this), RSetRx.class); } @Override @@ -350,14 +350,14 @@ public class RedissonRx implements RedissonRxClient { public RSetCacheRx getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, this), RSetCacheRx.class); } @Override public RSetCacheRx getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, this), RSetCacheRx.class); } @Override @@ -460,7 +460,7 @@ public class RedissonRx implements RedissonRxClient { public RMapRx getMap(String name, MapOptions options) { RedissonMap map = new RedissonMap(commandExecutor, name, null, options); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, this), RMapRx.class); } @@ -468,7 +468,7 @@ public class RedissonRx implements RedissonRxClient { public RMapRx getMap(String name, Codec codec, MapOptions options) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, options); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, this), RMapRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index 6d0c76fec..badf4cf9c 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -24,8 +24,12 @@ import java.util.List; import java.util.Set; import java.util.stream.Stream; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.RSet; import org.redisson.api.RedissonClient; import org.redisson.api.SortOrder; @@ -603,20 +607,50 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt return commandExecutor.writeAsync(getName(), codec, RedisCommands.SORT_TO, params.toArray()); } - public String getLockName(Object value) { + public String getLockName(Object value, String suffix) { ByteBuf state = encode(value); try { - return suffixName(getName(value), Hash.hash128toBase64(state) + ":lock"); + return suffixName(getName(value), Hash.hash128toBase64(state) + ":" + suffix); } finally { state.release(); } } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) { + String lockName = getLockName(value, "permitexpirablesemaphore"); + return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RSemaphore getSemaphore(V value) { + String lockName = getLockName(value, "semaphore"); + return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RCountDownLatch getCountDownLatch(V value) { + String lockName = getLockName(value, "countdownlatch"); + return new RedissonCountDownLatch(commandExecutor, lockName); + } + + @Override + public RLock getFairLock(V value) { + String lockName = getLockName(value, "fairlock"); + return new RedissonFairLock(commandExecutor, lockName); + } @Override public RLock getLock(V value) { - String lockName = getLockName(value); + String lockName = getLockName(value, "lock"); return new RedissonLock(commandExecutor, lockName); } + + @Override + public RReadWriteLock getReadWriteLock(V value) { + String lockName = getLockName(value, "rw_lock"); + return new RedissonReadWriteLock(commandExecutor, lockName); + } @Override public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 9c50b43b4..96d4c0ae9 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -25,8 +25,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.RSetCache; import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RCollectionMapReduce; @@ -367,20 +371,50 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< delete(); } - public String getLockName(Object value) { + public String getLockName(Object value, String suffix) { ByteBuf state = encode(value); try { - return suffixName(getName(value), Hash.hash128toBase64(state) + ":lock"); + return suffixName(getName(value), Hash.hash128toBase64(state) + ":" + suffix); } finally { state.release(); } } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) { + String lockName = getLockName(value, "permitexpirablesemaphore"); + return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RSemaphore getSemaphore(V value) { + String lockName = getLockName(value, "semaphore"); + return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RCountDownLatch getCountDownLatch(V value) { + String lockName = getLockName(value, "countdownlatch"); + return new RedissonCountDownLatch(commandExecutor, lockName); + } + + @Override + public RLock getFairLock(V value) { + String lockName = getLockName(value, "fairlock"); + return new RedissonFairLock(commandExecutor, lockName); + } @Override public RLock getLock(V value) { - String lockName = getLockName(value); + String lockName = getLockName(value, "lock"); return new RedissonLock(commandExecutor, lockName); } + + @Override + public RReadWriteLock getReadWriteLock(V value) { + String lockName = getLockName(value, "rw_lock"); + return new RedissonReadWriteLock(commandExecutor, lockName); + } @Override public void destroy() { diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index fd28a83e7..20b066712 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -24,8 +24,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.RSet; import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; @@ -50,7 +54,8 @@ import org.redisson.command.CommandAsyncExecutor; */ public class RedissonSetMultimapValues extends RedissonExpirable implements RSet { - private static final RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE); + private static final RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", + new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE); private final RSet set; private final Object key; @@ -469,6 +474,31 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R Arrays.asList(timeoutSetName, getName()), args.toArray()); } + @Override + public RCountDownLatch getCountDownLatch(V value) { + return set.getCountDownLatch(value); + } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) { + return set.getPermitExpirableSemaphore(value); + } + + @Override + public RSemaphore getSemaphore(V value) { + return set.getSemaphore(value); + } + + @Override + public RLock getFairLock(V value) { + return set.getFairLock(value); + } + + @Override + public RReadWriteLock getReadWriteLock(V value) { + return set.getReadWriteLock(value); + } + @Override public RLock getLock(V value) { return set.getLock(value); diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index 75b48702b..64de17b13 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -103,6 +103,38 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsync RMapReduce mapReduce(); + + /** + * Returns RCountDownLatch instance associated with key + * + * @param key - map key + * @return countdownlatch + */ + RCountDownLatch getCountDownLatch(K key); + + /** + * Returns RPermitExpirableSemaphore instance associated with key + * + * @param key - map key + * @return permitExpirableSemaphore + */ + RPermitExpirableSemaphore getPermitExpirableSemaphore(K key); + + /** + * Returns RSemaphore instance associated with key + * + * @param key - map key + * @return semaphore + */ + RSemaphore getSemaphore(K key); + + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return fairlock + */ + RLock getFairLock(K key); /** * Returns RReadWriteLock instance associated with key diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index ad07221c1..018e9cdfb 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -438,5 +438,45 @@ public interface RMapReactive extends RExpirableReactive { * @return iterator */ Flux keyIterator(String pattern, int count); + + /** + * Returns RPermitExpirableSemaphore instance associated with key + * + * @param key - map key + * @return permitExpirableSemaphore + */ + RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(K key); + + /** + * Returns RSemaphore instance associated with key + * + * @param key - map key + * @return semaphore + */ + RSemaphoreReactive getSemaphore(K key); + + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return fairLock + */ + RLockReactive getFairLock(K key); + + /** + * Returns RReadWriteLock instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RReadWriteLockReactive getReadWriteLock(K key); + + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return lock + */ + RLockReactive getLock(K key); } diff --git a/redisson/src/main/java/org/redisson/api/RMapRx.java b/redisson/src/main/java/org/redisson/api/RMapRx.java index ce5042692..e2c918e23 100644 --- a/redisson/src/main/java/org/redisson/api/RMapRx.java +++ b/redisson/src/main/java/org/redisson/api/RMapRx.java @@ -437,5 +437,45 @@ public interface RMapRx extends RExpirableRx { * @return iterator */ Flowable keyIterator(String pattern, int count); + + /** + * Returns RPermitExpirableSemaphore instance associated with key + * + * @param key - map key + * @return permitExpirableSemaphore + */ + RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(K key); + + /** + * Returns RSemaphore instance associated with key + * + * @param key - map key + * @return semaphore + */ + RSemaphoreRx getSemaphore(K key); + + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return fairLock + */ + RLockRx getFairLock(K key); + + /** + * Returns RReadWriteLock instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RReadWriteLockRx getReadWriteLock(K key); + + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return lock + */ + RLockRx getLock(K key); } diff --git a/redisson/src/main/java/org/redisson/api/RSet.java b/redisson/src/main/java/org/redisson/api/RSet.java index 56d6c800e..b4e6bcf44 100644 --- a/redisson/src/main/java/org/redisson/api/RSet.java +++ b/redisson/src/main/java/org/redisson/api/RSet.java @@ -30,11 +30,51 @@ import org.redisson.api.mapreduce.RCollectionMapReduce; */ public interface RSet extends Set, RExpirable, RSetAsync, RSortable> { + /** + * Returns RCountDownLatch instance associated with value + * + * @param value - set value + * @return RCountDownLatch object + */ + RCountDownLatch getCountDownLatch(V value); + + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphore getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphore getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLock getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLock getReadWriteLock(V value); + /** * Returns lock instance associated with value * * @param value - set value - * @return lock + * @return RLock object */ RLock getLock(V value); diff --git a/redisson/src/main/java/org/redisson/api/RSetCache.java b/redisson/src/main/java/org/redisson/api/RSetCache.java index 2f4f53dee..fdffd2049 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCache.java +++ b/redisson/src/main/java/org/redisson/api/RSetCache.java @@ -41,11 +41,51 @@ import org.redisson.api.mapreduce.RCollectionMapReduce; */ public interface RSetCache extends Set, RExpirable, RSetCacheAsync, RDestroyable { + /** + * Returns RCountDownLatch instance associated with value + * + * @param value - set value + * @return RCountDownLatch object + */ + RCountDownLatch getCountDownLatch(V value); + + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphore getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphore getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLock getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLock getReadWriteLock(V value); + /** * Returns lock instance associated with value * * @param value - set value - * @return lock + * @return RLock object */ RLock getLock(V value); diff --git a/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java b/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java index 1a20d154c..a503a9044 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import reactor.core.publisher.Mono; /** - * Async set functions + * Reactive interface for RSetCache object * * @author Nikita Koksharov * @@ -29,6 +29,57 @@ import reactor.core.publisher.Mono; */ public interface RSetCacheReactive extends RCollectionReactive { + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphoreReactive getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockReactive getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLockReactive getReadWriteLock(V value); + + /** + * Returns lock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockReactive getLock(V value); + + /** + * Stores value with specified time to live. + * Value expires after specified time to live. + * + * @param value to add + * @param ttl - time to live for key\value entry. + * If 0 then stores infinitely. + * @param unit - time unit + * @return true if value has been added. false + * if value already been in collection. + */ Mono add(V value, long ttl, TimeUnit unit); /** diff --git a/redisson/src/main/java/org/redisson/api/RSetCacheRx.java b/redisson/src/main/java/org/redisson/api/RSetCacheRx.java index b4a39eb9e..ab7b2f904 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCacheRx.java +++ b/redisson/src/main/java/org/redisson/api/RSetCacheRx.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import io.reactivex.Flowable; /** - * Async set functions + * RxJava2 interface for RSetCache object * * @author Nikita Koksharov * @@ -29,6 +29,57 @@ import io.reactivex.Flowable; */ public interface RSetCacheRx extends RCollectionRx { + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphoreRx getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockRx getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLockRx getReadWriteLock(V value); + + /** + * Returns lock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockRx getLock(V value); + + /** + * Stores value with specified time to live. + * Value expires after specified time to live. + * + * @param value to add + * @param ttl - time to live for key\value entry. + * If 0 then stores infinitely. + * @param unit - time unit + * @return true if value has been added. false + * if value already been in collection. + */ Flowable add(V value, long ttl, TimeUnit unit); /** diff --git a/redisson/src/main/java/org/redisson/api/RSetReactive.java b/redisson/src/main/java/org/redisson/api/RSetReactive.java index 6e2b2c87a..dd52eacef 100644 --- a/redisson/src/main/java/org/redisson/api/RSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSetReactive.java @@ -21,7 +21,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Async set functions + * Reactive interface for RSet object * * @author Nikita Koksharov * @@ -29,6 +29,46 @@ import reactor.core.publisher.Mono; */ public interface RSetReactive extends RCollectionReactive, RSortableReactive> { + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphoreReactive getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockReactive getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLockReactive getReadWriteLock(V value); + + /** + * Returns lock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockReactive getLock(V value); + /** * Returns an iterator over elements in this set. * Elements are loaded in batch. Batch size is defined by count param. diff --git a/redisson/src/main/java/org/redisson/api/RSetRx.java b/redisson/src/main/java/org/redisson/api/RSetRx.java index 94cbae32e..0ca18c5fd 100644 --- a/redisson/src/main/java/org/redisson/api/RSetRx.java +++ b/redisson/src/main/java/org/redisson/api/RSetRx.java @@ -20,7 +20,7 @@ import java.util.Set; import io.reactivex.Flowable; /** - * Async set functions + * RxJava2 interface for RSetCache object * * @author Nikita Koksharov * @@ -28,6 +28,46 @@ import io.reactivex.Flowable; */ public interface RSetRx extends RCollectionRx, RSortableRx> { + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphoreRx getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockRx getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLockRx getReadWriteLock(V value); + + /** + * Returns lock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockRx getLock(V value); + /** * Returns an iterator over elements in this set. * Elements are loaded in batch. Batch size is defined by count param. diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index e8fb19737..1bd1dba10 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -452,7 +452,7 @@ abstract class ConnectionPool { } public void returnConnection(ClientConnectionsEntry entry, T connection) { - if (entry.isFreezed() && !entry.isMasterForRead()) { + if (entry.isFreezed() && entry.getFreezeReason() != FreezeReason.SYSTEM) { connection.closeAsync(); entry.getAllConnections().remove(connection); } else { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 615fffaa0..105f336ec 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -142,14 +142,14 @@ public class RedissonBatchReactive implements RBatchReactive { public RMapReactive getMap(String name) { RedissonMap map = new RedissonMap(executorService, name, null, null); return ReactiveProxyBuilder.create(executorService, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, null), RMapReactive.class); } @Override public RMapReactive getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, executorService, name, null, null); return ReactiveProxyBuilder.create(executorService, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, null), RMapReactive.class); } @Override @@ -170,14 +170,14 @@ public class RedissonBatchReactive implements RBatchReactive { public RSetReactive getSet(String name) { RedissonSet set = new RedissonSet(executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, null), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, null), RSetReactive.class); } @Override @@ -235,14 +235,14 @@ public class RedissonBatchReactive implements RBatchReactive { public RSetCacheReactive getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, null), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, null), RSetCacheReactive.class); } @Override @@ -349,13 +349,13 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public RSetMultimapReactive getSetMultimap(String name) { return ReactiveProxyBuilder.create(executorService, new RedissonSetMultimap(executorService, name), - new RedissonSetMultimapReactive(executorService, name), RSetMultimapReactive.class); + new RedissonSetMultimapReactive(executorService, name, null), RSetMultimapReactive.class); } @Override public RSetMultimapReactive getSetMultimap(String name, Codec codec) { return ReactiveProxyBuilder.create(executorService, new RedissonSetMultimap(codec, executorService, name), - new RedissonSetMultimapReactive(codec, executorService, name), RSetMultimapReactive.class); + new RedissonSetMultimapReactive(codec, executorService, name, null), RSetMultimapReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index 6e6e5299c..73c50ef7b 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -23,7 +23,12 @@ import java.util.function.Supplier; import org.reactivestreams.Publisher; import org.redisson.RedissonMap; +import org.redisson.api.RLockReactive; import org.redisson.api.RMap; +import org.redisson.api.RPermitExpirableSemaphoreReactive; +import org.redisson.api.RReadWriteLockReactive; +import org.redisson.api.RSemaphoreReactive; +import org.redisson.api.RedissonReactiveClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,9 +45,11 @@ import reactor.core.publisher.Mono; public class RedissonMapReactive { private final RMap instance; + private final RedissonReactiveClient redisson; - public RedissonMapReactive(RMap instance) { + public RedissonMapReactive(RMap instance, RedissonReactiveClient redisson) { this.instance = instance; + this.redisson = redisson; } public Publisher> entryIterator() { @@ -102,5 +109,30 @@ public class RedissonMapReactive { } }); } + + public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(K key) { + String name = ((RedissonMap)instance).getLockName(key, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreReactive getSemaphore(K key) { + String name = ((RedissonMap)instance).getLockName(key, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockReactive getFairLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockReactive getReadWriteLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockReactive getLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "lock"); + return redisson.getLock(name); + } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 93623f838..bc76cb495 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -22,9 +22,15 @@ import java.util.List; import java.util.function.Supplier; import org.reactivestreams.Publisher; +import org.redisson.RedissonSetCache; import org.redisson.ScanIterator; import org.redisson.api.RFuture; +import org.redisson.api.RLockReactive; +import org.redisson.api.RPermitExpirableSemaphoreReactive; +import org.redisson.api.RReadWriteLockReactive; +import org.redisson.api.RSemaphoreReactive; import org.redisson.api.RSetCache; +import org.redisson.api.RedissonReactiveClient; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; @@ -41,9 +47,11 @@ import reactor.core.publisher.Mono; public class RedissonSetCacheReactive { private final RSetCache instance; + private final RedissonReactiveClient redisson; - public RedissonSetCacheReactive(RSetCache instance) { + public RedissonSetCacheReactive(RSetCache instance, RedissonReactiveClient redisson) { this.instance = instance; + this.redisson = redisson; } public Publisher iterator() { @@ -63,5 +71,30 @@ public class RedissonSetCacheReactive { } }.addAll(c); } + + public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreReactive getSemaphore(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockReactive getFairLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockReactive getReadWriteLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockReactive getLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "lock"); + return redisson.getLock(name); + } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java index 9705ba0c2..1040e056d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java @@ -19,6 +19,7 @@ import org.redisson.RedissonListMultimap; import org.redisson.api.RSet; import org.redisson.api.RSetMultimap; import org.redisson.api.RSetReactive; +import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; /** @@ -30,21 +31,26 @@ import org.redisson.client.codec.Codec; */ public class RedissonSetMultimapReactive { - private CommandReactiveExecutor commandExecutor; - private RedissonListMultimap instance; + private final RedissonReactiveClient redisson; + private final CommandReactiveExecutor commandExecutor; + private final RedissonListMultimap instance; - public RedissonSetMultimapReactive(CommandReactiveExecutor commandExecutor, String name) { + public RedissonSetMultimapReactive(CommandReactiveExecutor commandExecutor, String name, RedissonReactiveClient redisson) { this.instance = new RedissonListMultimap(commandExecutor, name); + this.redisson = redisson; + this.commandExecutor = commandExecutor; } - public RedissonSetMultimapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + public RedissonSetMultimapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RedissonReactiveClient redisson) { this.instance = new RedissonListMultimap(codec, commandExecutor, name); + this.redisson = redisson; + this.commandExecutor = commandExecutor; } public RSetReactive get(K key) { RSet set = ((RSetMultimap)instance).get(key); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, redisson), RSetReactive.class); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index ad8c614e2..a44205391 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -24,7 +24,12 @@ import java.util.function.Supplier; import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RFuture; +import org.redisson.api.RLockReactive; +import org.redisson.api.RPermitExpirableSemaphoreReactive; +import org.redisson.api.RReadWriteLockReactive; +import org.redisson.api.RSemaphoreReactive; import org.redisson.api.RSet; +import org.redisson.api.RedissonReactiveClient; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; @@ -40,9 +45,11 @@ import reactor.core.publisher.Flux; public class RedissonSetReactive { private final RSet instance; + private final RedissonReactiveClient redisson; - public RedissonSetReactive(RSet instance) { + public RedissonSetReactive(RSet instance, RedissonReactiveClient redisson) { this.instance = instance; + this.redisson = redisson; } public Publisher addAll(Publisher c) { @@ -75,4 +82,29 @@ public class RedissonSetReactive { return iterator(null, 10); } - } + public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) { + String name = ((RedissonSet)instance).getLockName(value, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreReactive getSemaphore(V value) { + String name = ((RedissonSet)instance).getLockName(value, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockReactive getFairLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockReactive getReadWriteLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockReactive getLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "lock"); + return redisson.getLock(name); + } + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java index a9d9d0d10..5a090adcd 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -64,14 +64,14 @@ public class RedissonTransactionReactive implements RTransactionReactive { public RMapReactive getMap(String name) { RMap map = transaction.getMap(name); return ReactiveProxyBuilder.create(executorService, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, null), RMapReactive.class); } @Override public RMapReactive getMap(String name, Codec codec) { RMap map = transaction.getMap(name, codec); return ReactiveProxyBuilder.create(executorService, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, null), RMapReactive.class); } @Override @@ -92,28 +92,28 @@ public class RedissonTransactionReactive implements RTransactionReactive { public RSetReactive getSet(String name) { RSet set = transaction.getSet(name); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, null), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RSet set = transaction.getSet(name, codec); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, null), RSetReactive.class); } @Override public RSetCacheReactive getSetCache(String name) { RSetCache set = transaction.getSetCache(name); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, null), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = transaction.getSetCache(name, codec); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, null), RSetCacheReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java index 8f834c464..cc6d290bf 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java @@ -142,14 +142,14 @@ public class RedissonBatchRx implements RBatchRx { public RMapRx getMap(String name) { RedissonMap map = new RedissonMap(executorService, name, null, null); return RxProxyBuilder.create(executorService, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, null), RMapRx.class); } @Override public RMapRx getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, executorService, name, null, null); return RxProxyBuilder.create(executorService, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, null), RMapRx.class); } @Override @@ -170,14 +170,14 @@ public class RedissonBatchRx implements RBatchRx { public RSetRx getSet(String name) { RedissonSet set = new RedissonSet(executorService, name, null); return RxProxyBuilder.create(executorService, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, null), RSetRx.class); } @Override public RSetRx getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, executorService, name, null); return RxProxyBuilder.create(executorService, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, null), RSetRx.class); } @Override @@ -241,14 +241,14 @@ public class RedissonBatchRx implements RBatchRx { public RSetCacheRx getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, executorService, name, null); return RxProxyBuilder.create(executorService, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, null), RSetCacheRx.class); } @Override public RSetCacheRx getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, executorService, name, null); return RxProxyBuilder.create(executorService, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, null), RSetCacheRx.class); } @Override @@ -359,13 +359,13 @@ public class RedissonBatchRx implements RBatchRx { @Override public RSetMultimapRx getSetMultimap(String name) { return RxProxyBuilder.create(executorService, new RedissonSetMultimap(executorService, name), - new RedissonSetMultimapRx(executorService, name), RSetMultimapRx.class); + new RedissonSetMultimapRx(executorService, name, null), RSetMultimapRx.class); } @Override public RSetMultimapRx getSetMultimap(String name, Codec codec) { return RxProxyBuilder.create(executorService, new RedissonSetMultimap(codec, executorService, name), - new RedissonSetMultimapRx(codec, executorService, name), RSetMultimapRx.class); + new RedissonSetMultimapRx(codec, executorService, name, null), RSetMultimapRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java b/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java index 727329483..aec5aa216 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java @@ -20,7 +20,13 @@ import java.util.Map.Entry; import org.reactivestreams.Publisher; import org.redisson.RedissonMap; +import org.redisson.RedissonRx; +import org.redisson.api.RLockRx; import org.redisson.api.RMap; +import org.redisson.api.RPermitExpirableSemaphoreRx; +import org.redisson.api.RReadWriteLockRx; +import org.redisson.api.RSemaphoreRx; +import org.redisson.api.RedissonRxClient; /** * Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap} @@ -34,9 +40,11 @@ import org.redisson.api.RMap; public class RedissonMapRx { private final RedissonMap instance; + private RedissonRxClient redisson; - public RedissonMapRx(RMap instance) { + public RedissonMapRx(RMap instance, RedissonRx redisson) { this.instance = (RedissonMap) instance; + this.redisson = redisson; } public Publisher> entryIterator() { @@ -97,4 +105,29 @@ public class RedissonMapRx { }.create(); } + public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(K key) { + String name = ((RedissonMap)instance).getLockName(key, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreRx getSemaphore(K key) { + String name = ((RedissonMap)instance).getLockName(key, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockRx getFairLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockRx getReadWriteLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockRx getLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "lock"); + return redisson.getLock(name); + } + } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java index d5ba58e5e..1d47d841c 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java @@ -16,9 +16,15 @@ package org.redisson.rx; import org.reactivestreams.Publisher; +import org.redisson.RedissonSetCache; import org.redisson.ScanIterator; import org.redisson.api.RFuture; +import org.redisson.api.RLockRx; +import org.redisson.api.RPermitExpirableSemaphoreRx; +import org.redisson.api.RReadWriteLockRx; +import org.redisson.api.RSemaphoreRx; import org.redisson.api.RSetCache; +import org.redisson.api.RedissonRxClient; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; @@ -31,9 +37,11 @@ import org.redisson.client.protocol.decoder.ListScanResult; public class RedissonSetCacheRx { private final RSetCache instance; + private final RedissonRxClient redisson; - public RedissonSetCacheRx(RSetCache instance) { + public RedissonSetCacheRx(RSetCache instance, RedissonRxClient redisson) { this.instance = instance; + this.redisson = redisson; } public Publisher iterator() { @@ -54,4 +62,29 @@ public class RedissonSetCacheRx { }.addAll(c); } + public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreRx getSemaphore(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockRx getFairLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockRx getReadWriteLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockRx getLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "lock"); + return redisson.getLock(name); + } + } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetMultimapRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetMultimapRx.java index dfb8b33fc..097be9637 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetMultimapRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetMultimapRx.java @@ -18,7 +18,8 @@ package org.redisson.rx; import org.redisson.RedissonListMultimap; import org.redisson.RedissonSet; import org.redisson.api.RSetMultimap; -import org.redisson.api.RSetReactive; +import org.redisson.api.RSetRx; +import org.redisson.api.RedissonRxClient; import org.redisson.client.codec.Codec; /** @@ -30,21 +31,26 @@ import org.redisson.client.codec.Codec; */ public class RedissonSetMultimapRx { - private CommandRxExecutor commandExecutor; - private RedissonListMultimap instance; + private final RedissonRxClient redisson; + private final CommandRxExecutor commandExecutor; + private final RedissonListMultimap instance; - public RedissonSetMultimapRx(CommandRxExecutor commandExecutor, String name) { + public RedissonSetMultimapRx(CommandRxExecutor commandExecutor, String name, RedissonRxClient redisson) { this.instance = new RedissonListMultimap(commandExecutor, name); + this.redisson = redisson; + this.commandExecutor = commandExecutor; } - public RedissonSetMultimapRx(Codec codec, CommandRxExecutor commandExecutor, String name) { + public RedissonSetMultimapRx(Codec codec, CommandRxExecutor commandExecutor, String name, RedissonRxClient redisson) { this.instance = new RedissonListMultimap(codec, commandExecutor, name); + this.redisson = redisson; + this.commandExecutor = commandExecutor; } - public RSetReactive get(K key) { + public RSetRx get(K key) { RedissonSet set = (RedissonSet) ((RSetMultimap)instance).get(key); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetRx(set), RSetReactive.class); + new RedissonSetRx(set, redisson), RSetRx.class); } } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java index 59cc84355..c957b793e 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java @@ -18,7 +18,12 @@ package org.redisson.rx; import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RFuture; +import org.redisson.api.RLockRx; +import org.redisson.api.RPermitExpirableSemaphoreRx; +import org.redisson.api.RReadWriteLockRx; +import org.redisson.api.RSemaphoreRx; import org.redisson.api.RSet; +import org.redisson.api.RedissonRxClient; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; @@ -34,9 +39,11 @@ import io.reactivex.Flowable; public class RedissonSetRx { private final RSet instance; + private final RedissonRxClient redisson; - public RedissonSetRx(RSet instance) { + public RedissonSetRx(RSet instance, RedissonRxClient redisson) { this.instance = instance; + this.redisson = redisson; } public Flowable addAll(Publisher c) { @@ -60,7 +67,7 @@ public class RedissonSetRx { return new SetRxIterator() { @Override protected RFuture> scanIterator(RedisClient client, long nextIterPos) { - return ((RedissonSet)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, pattern, count); + return ((RedissonSet)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, pattern, count); } }.create(); } @@ -69,4 +76,29 @@ public class RedissonSetRx { return iterator(null, 10); } + public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value) { + String name = ((RedissonSet)instance).getLockName(value, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreRx getSemaphore(V value) { + String name = ((RedissonSet)instance).getLockName(value, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockRx getFairLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockRx getReadWriteLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockRx getLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "lock"); + return redisson.getLock(name); + } + } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java b/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java index 867488083..d784589a3 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java @@ -65,14 +65,14 @@ public class RedissonTransactionRx implements RTransactionRx { public RMapRx getMap(String name) { RMap map = transaction.getMap(name); return RxProxyBuilder.create(executorService, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, null), RMapRx.class); } @Override public RMapRx getMap(String name, Codec codec) { RMap map = transaction.getMap(name, codec); return RxProxyBuilder.create(executorService, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, null), RMapRx.class); } @Override @@ -93,28 +93,28 @@ public class RedissonTransactionRx implements RTransactionRx { public RSetRx getSet(String name) { RSet set = transaction.getSet(name); return RxProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetRx.class); + new RedissonSetReactive(set, null), RSetRx.class); } @Override public RSetRx getSet(String name, Codec codec) { RSet set = transaction.getSet(name, codec); return RxProxyBuilder.create(executorService, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, null), RSetRx.class); } @Override public RSetCacheRx getSetCache(String name) { RSetCache set = transaction.getSetCache(name); return RxProxyBuilder.create(executorService, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, null), RSetCacheRx.class); } @Override public RSetCacheRx getSetCache(String name, Codec codec) { RSetCache set = transaction.getSetCache(name, codec); return RxProxyBuilder.create(executorService, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, null), RSetCacheRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java index 45f7af3e5..3325d1a56 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java @@ -24,10 +24,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.RedissonMap; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RMap; +import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -272,6 +275,26 @@ public class RedissonTransactionalMap extends RedissonMap { throw new UnsupportedOperationException("loadAll method is not supported in transaction"); } + @Override + public RLock getFairLock(K key) { + throw new UnsupportedOperationException("getFairLock method is not supported in transaction"); + } + + @Override + public RCountDownLatch getCountDownLatch(K key) { + throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction"); + } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) { + throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction"); + } + + @Override + public RSemaphore getSemaphore(K key) { + throw new UnsupportedOperationException("getSemaphore method is not supported in transaction"); + } + @Override public RLock getLock(K key) { throw new UnsupportedOperationException("getLock method is not supported in transaction"); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java index 225d2c04e..8a743cfa8 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java @@ -24,9 +24,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.RedissonMapCache; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -298,6 +301,26 @@ public class RedissonTransactionalMapCache extends RedissonMapCache throw new UnsupportedOperationException("loadAll method is not supported in transaction"); } + @Override + public RLock getFairLock(K key) { + throw new UnsupportedOperationException("getFairLock method is not supported in transaction"); + } + + @Override + public RCountDownLatch getCountDownLatch(K key) { + throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction"); + } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) { + throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction"); + } + + @Override + public RSemaphore getSemaphore(K key) { + throw new UnsupportedOperationException("getSemaphore method is not supported in transaction"); + } + @Override public RLock getLock(K key) { throw new UnsupportedOperationException("getLock method is not supported in transaction"); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java index 25c200e68..9c45acb40 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java @@ -23,7 +23,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.RedissonSet; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; +import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; @@ -99,6 +104,36 @@ public class RedissonTransactionalSet extends RedissonSet { checkState(); return transactionalSet.scanIterator(name, client, startPos, pattern, count); } + + @Override + public RLock getFairLock(V value) { + throw new UnsupportedOperationException("getFairLock method is not supported in transaction"); + } + + @Override + public RCountDownLatch getCountDownLatch(V value) { + throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction"); + } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) { + throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction"); + } + + @Override + public RSemaphore getSemaphore(V value) { + throw new UnsupportedOperationException("getSemaphore method is not supported in transaction"); + } + + @Override + public RLock getLock(V value) { + throw new UnsupportedOperationException("getLock method is not supported in transaction"); + } + + @Override + public RReadWriteLock getReadWriteLock(V value) { + throw new UnsupportedOperationException("getReadWriteLock method is not supported in transaction"); + } @Override public RFuture containsAsync(Object o) { diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java index 47920c413..1d03bc1de 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java @@ -77,7 +77,7 @@ public class TransactionalSet extends BaseTransactionalSet { @Override protected RLock getLock(RCollectionAsync set, V value) { - String lockName = ((RedissonSet)set).getLockName(value); + String lockName = ((RedissonSet)set).getLockName(value, "lock"); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java index 483b359b4..a50b89f81 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java @@ -82,7 +82,7 @@ public class TransactionalSetCache extends BaseTransactionalSet { @Override protected RLock getLock(RCollectionAsync set, V value) { - String lockName = ((RedissonSetCache)set).getLockName(value); + String lockName = ((RedissonSetCache)set).getLockName(value, "lock"); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java index edfffffdf..e2038871f 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java @@ -40,12 +40,12 @@ public abstract class SetOperation extends TransactionalOperation { } protected RLock getLock(RSetCache setCache, CommandAsyncExecutor commandExecutor, Object value) { - String lockName = ((RedissonSetCache)setCache).getLockName(value); + String lockName = ((RedissonSetCache)setCache).getLockName(value, "lock"); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } protected RLock getLock(RSet setCache, CommandAsyncExecutor commandExecutor, Object value) { - String lockName = ((RedissonSet)setCache).getLockName(value); + String lockName = ((RedissonSet)setCache).getLockName(value, "lock"); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); }