From cbafc09d2b17185b3c507964d54f085c28a7872e Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 5 Dec 2018 13:00:04 +0300 Subject: [PATCH 01/11] Fixed - Pooled connection closed after MOVED redirection #1778 --- .../main/java/org/redisson/connection/pool/ConnectionPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index e8fb19737..1bd1dba10 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -452,7 +452,7 @@ abstract class ConnectionPool { } public void returnConnection(ClientConnectionsEntry entry, T connection) { - if (entry.isFreezed() && !entry.isMasterForRead()) { + if (entry.isFreezed() && entry.getFreezeReason() != FreezeReason.SYSTEM) { connection.closeAsync(); entry.getAllConnections().remove(connection); } else { From 800d65034134703cc608f24ff4bc4375b4e62b13 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 5 Dec 2018 22:13:01 +0300 Subject: [PATCH 02/11] Update README.md --- redisson-spring-boot-starter/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/redisson-spring-boot-starter/README.md b/redisson-spring-boot-starter/README.md index 384965187..1e91dc9d0 100644 --- a/redisson-spring-boot-starter/README.md +++ b/redisson-spring-boot-starter/README.md @@ -46,6 +46,8 @@ Usage ### 2. Add settings into `application.settings` file +Common spring boot settings or Redisson settings could be used. + ```properties # common spring boot settings From 1ccab3f94f5890c5f31a0f95cd959ce16b0fc54b Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 00:49:11 +0300 Subject: [PATCH 03/11] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ae1bbea14..567907985 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis client +Redisson: Redis based In-Memory Data Grid for Java.
State of the Art Redis Java client ==== [Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.8.2) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)** From cede9e6b5125586d890d84a3c2da22d046f6df99 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 00:50:21 +0300 Subject: [PATCH 04/11] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 567907985..edccc40b8 100644 --- a/README.md +++ b/README.md @@ -137,14 +137,14 @@ Config = ... // 2. Create Redisson instance RedissonClient redisson = Redisson.create(config); -// 3. Get object you need +// 3. Get Redis based object or service you need RMap map = redisson.getMap("myMap"); RLock lock = redisson.getLock("myLock"); RExecutorService executor = redisson.getExecutorService("myExecutorService"); -// over 30 different objects and services ... +// over 30 different Redis based objects and services ... ``` From ddb763b7ff4e0106fe6e704bf5d23d3fbdb7fac7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 01:09:38 +0300 Subject: [PATCH 05/11] Update README.md --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index edccc40b8..936e10059 100644 --- a/README.md +++ b/README.md @@ -12,17 +12,17 @@ Based on high-performance async and lock-free Java Redis client and [Netty](http Features ================================ -* Replicated servers mode (also supports [AWS ElastiCache](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Replication.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)): +* Replicated Redis servers mode (also supports [AWS ElastiCache](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Replication.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)): 1. automatic master server change discovery -* Cluster servers mode (also supports [AWS ElastiCache Cluster](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Clusters.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)): +* Clustered Redis servers mode (also supports [AWS ElastiCache Cluster](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Clusters.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)): 1. automatic master and slave servers discovery 2. automatic status and topology update 3. automatic slots change discovery -* Sentinel servers mode: +* Sentinel Redis servers mode: 1. automatic master, slave and sentinel servers discovery 2. automatic status and topology update -* Master with Slave servers mode -* Single server mode +* Master with Slave Redis servers mode +* Single Redis server mode * Thread-safe implementation * [Reactive Streams](https://github.com/redisson/redisson/wiki/3.-operations-execution#32-reactive-way) API * [Asynchronous](https://github.com/redisson/redisson/wiki/3.-operations-execution#31-async-way) API From fae3ad84fb83fd533d33897e8171c1f85684ce88 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 13:46:13 +0300 Subject: [PATCH 06/11] Feature - getCountDownLatch, getSemaphore, getPermitExpirableSemaphore, getFairLock methods added to RMap #1783 --- .../src/main/java/org/redisson/Redisson.java | 4 +++ .../java/org/redisson/RedissonFairLock.java | 2 +- .../main/java/org/redisson/RedissonMap.java | 27 ++++++++++++++++ .../src/main/java/org/redisson/api/RMap.java | 32 +++++++++++++++++++ .../transaction/RedissonTransactionalMap.java | 23 +++++++++++++ .../RedissonTransactionalMapCache.java | 23 +++++++++++++ 6 files changed, 110 insertions(+), 1 deletion(-) diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index aee5fd6cf..45df4ab55 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -124,6 +124,10 @@ public class Redisson implements RedissonClient { evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor()); } + public SemaphorePubSub getSemaphorePubSub() { + return semaphorePubSub; + } + public EvictionScheduler getEvictionScheduler() { return evictionScheduler; } diff --git a/redisson/src/main/java/org/redisson/RedissonFairLock.java b/redisson/src/main/java/org/redisson/RedissonFairLock.java index 407a1fdd3..257e05505 100644 --- a/redisson/src/main/java/org/redisson/RedissonFairLock.java +++ b/redisson/src/main/java/org/redisson/RedissonFairLock.java @@ -45,7 +45,7 @@ public class RedissonFairLock extends RedissonLock implements RLock { private final String threadsQueueName; private final String timeoutSetName; - protected RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { + public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; threadsQueueName = prefixName("redisson_lock_queue", name); diff --git a/redisson/src/main/java/org/redisson/RedissonMap.java b/redisson/src/main/java/org/redisson/RedissonMap.java index 18b6f4bd9..a50e0f103 100644 --- a/redisson/src/main/java/org/redisson/RedissonMap.java +++ b/redisson/src/main/java/org/redisson/RedissonMap.java @@ -33,11 +33,14 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.MapOptions; +import org.redisson.api.RCountDownLatch; import org.redisson.api.MapOptions.WriteMode; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RMap; +import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; @@ -106,7 +109,31 @@ public class RedissonMap extends RedissonExpirable implements RMap { public RMapReduce mapReduce() { return new RedissonMapReduce(this, redisson, commandExecutor.getConnectionManager()); } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) { + String lockName = getLockName(key, "permitexpirablesemaphore"); + return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + @Override + public RSemaphore getSemaphore(K key) { + String lockName = getLockName(key, "semaphore"); + return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RCountDownLatch getCountDownLatch(K key) { + String lockName = getLockName(key, "countdownlatch"); + return new RedissonCountDownLatch(commandExecutor, lockName); + } + + @Override + public RLock getFairLock(K key) { + String lockName = getLockName(key, "fairlock"); + return new RedissonFairLock(commandExecutor, lockName); + } + @Override public RLock getLock(K key) { String lockName = getLockName(key, "lock"); diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index 75b48702b..077da9d28 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -103,6 +103,38 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsync RMapReduce mapReduce(); + + /** + * Returns RCountDownLatch instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RCountDownLatch getCountDownLatch(K key); + + /** + * Returns RPermitExpirableSemaphore instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RPermitExpirableSemaphore getPermitExpirableSemaphore(K key); + + /** + * Returns RSemaphore instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RSemaphore getSemaphore(K key); + + /** + * Returns RReadWriteLock instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RLock getFairLock(K key); /** * Returns RReadWriteLock instance associated with key diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java index 45f7af3e5..3325d1a56 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMap.java @@ -24,10 +24,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.RedissonMap; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; import org.redisson.api.RMap; +import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -272,6 +275,26 @@ public class RedissonTransactionalMap extends RedissonMap { throw new UnsupportedOperationException("loadAll method is not supported in transaction"); } + @Override + public RLock getFairLock(K key) { + throw new UnsupportedOperationException("getFairLock method is not supported in transaction"); + } + + @Override + public RCountDownLatch getCountDownLatch(K key) { + throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction"); + } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) { + throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction"); + } + + @Override + public RSemaphore getSemaphore(K key) { + throw new UnsupportedOperationException("getSemaphore method is not supported in transaction"); + } + @Override public RLock getLock(K key) { throw new UnsupportedOperationException("getLock method is not supported in transaction"); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java index 225d2c04e..8a743cfa8 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalMapCache.java @@ -24,9 +24,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.RedissonMapCache; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.mapreduce.RMapReduce; import org.redisson.client.RedisClient; import org.redisson.client.codec.Codec; @@ -298,6 +301,26 @@ public class RedissonTransactionalMapCache extends RedissonMapCache throw new UnsupportedOperationException("loadAll method is not supported in transaction"); } + @Override + public RLock getFairLock(K key) { + throw new UnsupportedOperationException("getFairLock method is not supported in transaction"); + } + + @Override + public RCountDownLatch getCountDownLatch(K key) { + throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction"); + } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) { + throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction"); + } + + @Override + public RSemaphore getSemaphore(K key) { + throw new UnsupportedOperationException("getSemaphore method is not supported in transaction"); + } + @Override public RLock getLock(K key) { throw new UnsupportedOperationException("getLock method is not supported in transaction"); From 9a5f2cfcd2a12beadd31ad36b8b6bdd6655cbe08 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 13:49:25 +0300 Subject: [PATCH 07/11] Feature - getSemaphore, getPermitExpirableSemaphore, getFairLock, getLock, getReadWriteLock methods added to RMapRx and RMapReactive objects #1785 --- .../java/org/redisson/RedissonReactive.java | 26 +++++++++--- .../main/java/org/redisson/RedissonRx.java | 8 ++-- .../java/org/redisson/api/RMapReactive.java | 40 +++++++++++++++++++ .../reactive/RedissonBatchReactive.java | 4 +- .../reactive/RedissonMapReactive.java | 34 +++++++++++++++- .../reactive/RedissonTransactionReactive.java | 4 +- .../java/org/redisson/rx/RedissonBatchRx.java | 4 +- .../java/org/redisson/rx/RedissonMapRx.java | 35 +++++++++++++++- .../redisson/rx/RedissonTransactionRx.java | 4 +- 9 files changed, 139 insertions(+), 20 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 547dd4e27..6763bec4e 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -109,6 +109,22 @@ public class RedissonReactive implements RedissonReactiveClient { codecProvider = config.getReferenceCodecProvider(); } + public EvictionScheduler getEvictionScheduler() { + return evictionScheduler; + } + + public ConnectionManager getConnectionManager() { + return connectionManager; + } + + public CommandReactiveService getCommandExecutor() { + return commandExecutor; + } + + public SemaphorePubSub getSemaphorePubSub() { + return semaphorePubSub; + } + @Override public RStreamReactive getStream(String name) { return ReactiveProxyBuilder.create(commandExecutor, new RedissonStream(commandExecutor, name), RStreamReactive.class); @@ -249,14 +265,14 @@ public class RedissonReactive implements RedissonReactiveClient { public RMapReactive getMap(String name) { RedissonMap map = new RedissonMap(commandExecutor, name, null, null); return ReactiveProxyBuilder.create(commandExecutor, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, this), RMapReactive.class); } @Override public RMapReactive getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, null); return ReactiveProxyBuilder.create(commandExecutor, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, this), RMapReactive.class); } @Override @@ -464,20 +480,18 @@ public class RedissonReactive implements RedissonReactiveClient { new RedissonMapCacheReactive(map), RMapCacheReactive.class); } - @Override public RMapReactive getMap(String name, MapOptions options) { RedissonMap map = new RedissonMap(commandExecutor, name, null, options); return ReactiveProxyBuilder.create(commandExecutor, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, this), RMapReactive.class); } - @Override public RMapReactive getMap(String name, Codec codec, MapOptions options) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, options); return ReactiveProxyBuilder.create(commandExecutor, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, this), RMapReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 53938b1fc..552513e89 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -239,14 +239,14 @@ public class RedissonRx implements RedissonRxClient { public RMapRx getMap(String name) { RedissonMap map = new RedissonMap(commandExecutor, name, null, null); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, this), RMapRx.class); } @Override public RMapRx getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, null); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, this), RMapRx.class); } @Override @@ -460,7 +460,7 @@ public class RedissonRx implements RedissonRxClient { public RMapRx getMap(String name, MapOptions options) { RedissonMap map = new RedissonMap(commandExecutor, name, null, options); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, this), RMapRx.class); } @@ -468,7 +468,7 @@ public class RedissonRx implements RedissonRxClient { public RMapRx getMap(String name, Codec codec, MapOptions options) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, options); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, this), RMapRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index c0ab0b7ef..7ad3bf39a 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -436,5 +436,45 @@ public interface RMapReactive extends RExpirableReactive { * @return iterator */ Publisher keyIterator(String pattern, int count); + + /** + * Returns RPermitExpirableSemaphore instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(K key); + + /** + * Returns RSemaphore instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RSemaphoreReactive getSemaphore(K key); + + /** + * Returns RReadWriteLock instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RLockReactive getFairLock(K key); + + /** + * Returns RReadWriteLock instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RReadWriteLockReactive getReadWriteLock(K key); + + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return lock + */ + RLockReactive getLock(K key); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index f561ef0d8..619b7189e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -140,14 +140,14 @@ public class RedissonBatchReactive implements RBatchReactive { public RMapReactive getMap(String name) { RedissonMap map = new RedissonMap(executorService, name, null, null); return ReactiveProxyBuilder.create(executorService, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, null), RMapReactive.class); } @Override public RMapReactive getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, executorService, name, null, null); return ReactiveProxyBuilder.create(executorService, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, null), RMapReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java index 3b38d3d84..de6897314 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonMapReactive.java @@ -20,7 +20,12 @@ import java.util.Map.Entry; import org.reactivestreams.Publisher; import org.redisson.RedissonMap; +import org.redisson.api.RLockReactive; import org.redisson.api.RMap; +import org.redisson.api.RPermitExpirableSemaphoreReactive; +import org.redisson.api.RReadWriteLockReactive; +import org.redisson.api.RSemaphoreReactive; +import org.redisson.api.RedissonReactiveClient; /** @@ -35,9 +40,11 @@ import org.redisson.api.RMap; public class RedissonMapReactive { private final RMap instance; + private final RedissonReactiveClient redisson; - public RedissonMapReactive(RMap instance) { + public RedissonMapReactive(RMap instance, RedissonReactiveClient redisson) { this.instance = instance; + this.redisson = redisson; } public Publisher> entryIterator() { @@ -97,5 +104,30 @@ public class RedissonMapReactive { } }.stream(); } + + public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(K key) { + String name = ((RedissonMap)instance).getLockName(key, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreReactive getSemaphore(K key) { + String name = ((RedissonMap)instance).getLockName(key, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockReactive getFairLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockReactive getReadWriteLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockReactive getLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "lock"); + return redisson.getLock(name); + } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java index d1a4ce91e..739a81a3e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -63,14 +63,14 @@ public class RedissonTransactionReactive implements RTransactionReactive { public RMapReactive getMap(String name) { RMap map = transaction.getMap(name); return ReactiveProxyBuilder.create(executorService, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, null), RMapReactive.class); } @Override public RMapReactive getMap(String name, Codec codec) { RMap map = transaction.getMap(name, codec); return ReactiveProxyBuilder.create(executorService, map, - new RedissonMapReactive(map), RMapReactive.class); + new RedissonMapReactive(map, null), RMapReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java index 8f834c464..73891c53a 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java @@ -142,14 +142,14 @@ public class RedissonBatchRx implements RBatchRx { public RMapRx getMap(String name) { RedissonMap map = new RedissonMap(executorService, name, null, null); return RxProxyBuilder.create(executorService, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, null), RMapRx.class); } @Override public RMapRx getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, executorService, name, null, null); return RxProxyBuilder.create(executorService, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, null), RMapRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java b/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java index 727329483..aec5aa216 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java @@ -20,7 +20,13 @@ import java.util.Map.Entry; import org.reactivestreams.Publisher; import org.redisson.RedissonMap; +import org.redisson.RedissonRx; +import org.redisson.api.RLockRx; import org.redisson.api.RMap; +import org.redisson.api.RPermitExpirableSemaphoreRx; +import org.redisson.api.RReadWriteLockRx; +import org.redisson.api.RSemaphoreRx; +import org.redisson.api.RedissonRxClient; /** * Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap} @@ -34,9 +40,11 @@ import org.redisson.api.RMap; public class RedissonMapRx { private final RedissonMap instance; + private RedissonRxClient redisson; - public RedissonMapRx(RMap instance) { + public RedissonMapRx(RMap instance, RedissonRx redisson) { this.instance = (RedissonMap) instance; + this.redisson = redisson; } public Publisher> entryIterator() { @@ -97,4 +105,29 @@ public class RedissonMapRx { }.create(); } + public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(K key) { + String name = ((RedissonMap)instance).getLockName(key, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreRx getSemaphore(K key) { + String name = ((RedissonMap)instance).getLockName(key, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockRx getFairLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockRx getReadWriteLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockRx getLock(K key) { + String name = ((RedissonMap)instance).getLockName(key, "lock"); + return redisson.getLock(name); + } + } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java b/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java index 867488083..914ebd8a8 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java @@ -65,14 +65,14 @@ public class RedissonTransactionRx implements RTransactionRx { public RMapRx getMap(String name) { RMap map = transaction.getMap(name); return RxProxyBuilder.create(executorService, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, null), RMapRx.class); } @Override public RMapRx getMap(String name, Codec codec) { RMap map = transaction.getMap(name, codec); return RxProxyBuilder.create(executorService, map, - new RedissonMapRx(map), RMapRx.class); + new RedissonMapRx(map, null), RMapRx.class); } @Override From 711586151e68f798a676ce31294d0c5e4c3a8489 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 14:17:53 +0300 Subject: [PATCH 08/11] Feature - getReadWriteLock, getCountDownLatch, getSemaphore, getPermitExpirableSemaphore, getFairLock methods added to RSet and RSetCache objects #1786 --- .../main/java/org/redisson/RedissonSet.java | 40 ++++++++++++++++-- .../java/org/redisson/RedissonSetCache.java | 40 ++++++++++++++++-- .../redisson/RedissonSetMultimapValues.java | 32 +++++++++++++- .../src/main/java/org/redisson/api/RSet.java | 42 ++++++++++++++++++- .../main/java/org/redisson/api/RSetCache.java | 42 ++++++++++++++++++- .../transaction/RedissonTransactionalSet.java | 35 ++++++++++++++++ .../transaction/TransactionalSet.java | 2 +- .../transaction/TransactionalSetCache.java | 2 +- .../operation/set/SetOperation.java | 4 +- 9 files changed, 226 insertions(+), 13 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonSet.java b/redisson/src/main/java/org/redisson/RedissonSet.java index a7318db63..8b266165a 100644 --- a/redisson/src/main/java/org/redisson/RedissonSet.java +++ b/redisson/src/main/java/org/redisson/RedissonSet.java @@ -23,8 +23,12 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.RSet; import org.redisson.api.RedissonClient; import org.redisson.api.SortOrder; @@ -602,20 +606,50 @@ public class RedissonSet extends RedissonExpirable implements RSet, ScanIt return commandExecutor.writeAsync(getName(), codec, RedisCommands.SORT_TO, params.toArray()); } - public String getLockName(Object value) { + public String getLockName(Object value, String suffix) { ByteBuf state = encode(value); try { - return suffixName(getName(value), Hash.hash128toBase64(state) + ":lock"); + return suffixName(getName(value), Hash.hash128toBase64(state) + ":" + suffix); } finally { state.release(); } } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) { + String lockName = getLockName(value, "permitexpirablesemaphore"); + return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RSemaphore getSemaphore(V value) { + String lockName = getLockName(value, "semaphore"); + return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RCountDownLatch getCountDownLatch(V value) { + String lockName = getLockName(value, "countdownlatch"); + return new RedissonCountDownLatch(commandExecutor, lockName); + } + + @Override + public RLock getFairLock(V value) { + String lockName = getLockName(value, "fairlock"); + return new RedissonFairLock(commandExecutor, lockName); + } @Override public RLock getLock(V value) { - String lockName = getLockName(value); + String lockName = getLockName(value, "lock"); return new RedissonLock(commandExecutor, lockName); } + + @Override + public RReadWriteLock getReadWriteLock(V value) { + String lockName = getLockName(value, "rw_lock"); + return new RedissonReadWriteLock(commandExecutor, lockName); + } @Override public RFuture> scanIteratorAsync(String name, RedisClient client, long startPos, diff --git a/redisson/src/main/java/org/redisson/RedissonSetCache.java b/redisson/src/main/java/org/redisson/RedissonSetCache.java index 76b77dc27..a03afe67d 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetCache.java +++ b/redisson/src/main/java/org/redisson/RedissonSetCache.java @@ -24,8 +24,12 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.RSetCache; import org.redisson.api.RedissonClient; import org.redisson.api.mapreduce.RCollectionMapReduce; @@ -366,20 +370,50 @@ public class RedissonSetCache extends RedissonExpirable implements RSetCache< delete(); } - public String getLockName(Object value) { + public String getLockName(Object value, String suffix) { ByteBuf state = encode(value); try { - return suffixName(getName(value), Hash.hash128toBase64(state) + ":lock"); + return suffixName(getName(value), Hash.hash128toBase64(state) + ":" + suffix); } finally { state.release(); } } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) { + String lockName = getLockName(value, "permitexpirablesemaphore"); + return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RSemaphore getSemaphore(V value) { + String lockName = getLockName(value, "semaphore"); + return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub()); + } + + @Override + public RCountDownLatch getCountDownLatch(V value) { + String lockName = getLockName(value, "countdownlatch"); + return new RedissonCountDownLatch(commandExecutor, lockName); + } + + @Override + public RLock getFairLock(V value) { + String lockName = getLockName(value, "fairlock"); + return new RedissonFairLock(commandExecutor, lockName); + } @Override public RLock getLock(V value) { - String lockName = getLockName(value); + String lockName = getLockName(value, "lock"); return new RedissonLock(commandExecutor, lockName); } + + @Override + public RReadWriteLock getReadWriteLock(V value) { + String lockName = getLockName(value, "rw_lock"); + return new RedissonReadWriteLock(commandExecutor, lockName); + } @Override public void destroy() { diff --git a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java index 5ab429cc3..d2af8aad5 100644 --- a/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonSetMultimapValues.java @@ -23,8 +23,12 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.RSet; import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; @@ -49,7 +53,8 @@ import org.redisson.command.CommandAsyncExecutor; */ public class RedissonSetMultimapValues extends RedissonExpirable implements RSet { - private static final RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE); + private static final RedisCommand> EVAL_SSCAN = new RedisCommand>("EVAL", + new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE); private final RSet set; private final Object key; @@ -468,6 +473,31 @@ public class RedissonSetMultimapValues extends RedissonExpirable implements R Arrays.asList(timeoutSetName, getName()), args.toArray()); } + @Override + public RCountDownLatch getCountDownLatch(V value) { + return set.getCountDownLatch(value); + } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) { + return set.getPermitExpirableSemaphore(value); + } + + @Override + public RSemaphore getSemaphore(V value) { + return set.getSemaphore(value); + } + + @Override + public RLock getFairLock(V value) { + return set.getFairLock(value); + } + + @Override + public RReadWriteLock getReadWriteLock(V value) { + return set.getReadWriteLock(value); + } + @Override public RLock getLock(V value) { return set.getLock(value); diff --git a/redisson/src/main/java/org/redisson/api/RSet.java b/redisson/src/main/java/org/redisson/api/RSet.java index 62debf2c9..062820b48 100644 --- a/redisson/src/main/java/org/redisson/api/RSet.java +++ b/redisson/src/main/java/org/redisson/api/RSet.java @@ -29,11 +29,51 @@ import org.redisson.api.mapreduce.RCollectionMapReduce; */ public interface RSet extends Set, RExpirable, RSetAsync, RSortable> { + /** + * Returns RCountDownLatch instance associated with value + * + * @param value - set value + * @return RCountDownLatch object + */ + RCountDownLatch getCountDownLatch(V value); + + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphore getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphore getSemaphore(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLock getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLock getReadWriteLock(V value); + /** * Returns lock instance associated with value * * @param value - set value - * @return lock + * @return RLock object */ RLock getLock(V value); diff --git a/redisson/src/main/java/org/redisson/api/RSetCache.java b/redisson/src/main/java/org/redisson/api/RSetCache.java index ae78e7d07..0104e07db 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCache.java +++ b/redisson/src/main/java/org/redisson/api/RSetCache.java @@ -40,11 +40,51 @@ import org.redisson.api.mapreduce.RCollectionMapReduce; */ public interface RSetCache extends Set, RExpirable, RSetCacheAsync, RDestroyable { + /** + * Returns RCountDownLatch instance associated with value + * + * @param value - set value + * @return RCountDownLatch object + */ + RCountDownLatch getCountDownLatch(V value); + + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphore getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphore getSemaphore(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLock getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLock getReadWriteLock(V value); + /** * Returns lock instance associated with value * * @param value - set value - * @return lock + * @return RLock object */ RLock getLock(V value); diff --git a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java index 25c200e68..9c45acb40 100644 --- a/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/RedissonTransactionalSet.java @@ -23,7 +23,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.redisson.RedissonSet; +import org.redisson.api.RCountDownLatch; import org.redisson.api.RFuture; +import org.redisson.api.RLock; +import org.redisson.api.RPermitExpirableSemaphore; +import org.redisson.api.RReadWriteLock; +import org.redisson.api.RSemaphore; import org.redisson.api.SortOrder; import org.redisson.api.mapreduce.RCollectionMapReduce; import org.redisson.client.RedisClient; @@ -99,6 +104,36 @@ public class RedissonTransactionalSet extends RedissonSet { checkState(); return transactionalSet.scanIterator(name, client, startPos, pattern, count); } + + @Override + public RLock getFairLock(V value) { + throw new UnsupportedOperationException("getFairLock method is not supported in transaction"); + } + + @Override + public RCountDownLatch getCountDownLatch(V value) { + throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction"); + } + + @Override + public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) { + throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction"); + } + + @Override + public RSemaphore getSemaphore(V value) { + throw new UnsupportedOperationException("getSemaphore method is not supported in transaction"); + } + + @Override + public RLock getLock(V value) { + throw new UnsupportedOperationException("getLock method is not supported in transaction"); + } + + @Override + public RReadWriteLock getReadWriteLock(V value) { + throw new UnsupportedOperationException("getReadWriteLock method is not supported in transaction"); + } @Override public RFuture containsAsync(Object o) { diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java index 47920c413..1d03bc1de 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSet.java @@ -77,7 +77,7 @@ public class TransactionalSet extends BaseTransactionalSet { @Override protected RLock getLock(RCollectionAsync set, V value) { - String lockName = ((RedissonSet)set).getLockName(value); + String lockName = ((RedissonSet)set).getLockName(value, "lock"); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } diff --git a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java index 483b359b4..a50b89f81 100644 --- a/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java +++ b/redisson/src/main/java/org/redisson/transaction/TransactionalSetCache.java @@ -82,7 +82,7 @@ public class TransactionalSetCache extends BaseTransactionalSet { @Override protected RLock getLock(RCollectionAsync set, V value) { - String lockName = ((RedissonSetCache)set).getLockName(value); + String lockName = ((RedissonSetCache)set).getLockName(value, "lock"); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } diff --git a/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java b/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java index edfffffdf..e2038871f 100644 --- a/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java +++ b/redisson/src/main/java/org/redisson/transaction/operation/set/SetOperation.java @@ -40,12 +40,12 @@ public abstract class SetOperation extends TransactionalOperation { } protected RLock getLock(RSetCache setCache, CommandAsyncExecutor commandExecutor, Object value) { - String lockName = ((RedissonSetCache)setCache).getLockName(value); + String lockName = ((RedissonSetCache)setCache).getLockName(value, "lock"); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } protected RLock getLock(RSet setCache, CommandAsyncExecutor commandExecutor, Object value) { - String lockName = ((RedissonSet)setCache).getLockName(value); + String lockName = ((RedissonSet)setCache).getLockName(value, "lock"); return new RedissonTransactionalLock(commandExecutor, lockName, transactionId); } From 0b63e7412d258906e0a79146e1d27ecfb0ed54ff Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 14:18:23 +0300 Subject: [PATCH 09/11] javadocs fixed --- .../src/main/java/org/redisson/api/RMap.java | 8 ++-- .../java/org/redisson/api/RMapReactive.java | 6 +-- .../main/java/org/redisson/api/RMapRx.java | 40 +++++++++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index 077da9d28..3f750baac 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -108,7 +108,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncRCountDownLatch instance associated with key * * @param key - map key - * @return readWriteLock + * @return countdownlatch */ RCountDownLatch getCountDownLatch(K key); @@ -116,7 +116,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncRPermitExpirableSemaphore instance associated with key * * @param key - map key - * @return readWriteLock + * @return permitExpirableSemaphore */ RPermitExpirableSemaphore getPermitExpirableSemaphore(K key); @@ -124,7 +124,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncRSemaphore instance associated with key * * @param key - map key - * @return readWriteLock + * @return semaphore */ RSemaphore getSemaphore(K key); @@ -132,7 +132,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncRReadWriteLock instance associated with key * * @param key - map key - * @return readWriteLock + * @return fairlock */ RLock getFairLock(K key); diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index 7ad3bf39a..15f260daf 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -441,7 +441,7 @@ public interface RMapReactive extends RExpirableReactive { * Returns RPermitExpirableSemaphore instance associated with key * * @param key - map key - * @return readWriteLock + * @return permitExpirableSemaphore */ RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(K key); @@ -449,7 +449,7 @@ public interface RMapReactive extends RExpirableReactive { * Returns RSemaphore instance associated with key * * @param key - map key - * @return readWriteLock + * @return semaphore */ RSemaphoreReactive getSemaphore(K key); @@ -457,7 +457,7 @@ public interface RMapReactive extends RExpirableReactive { * Returns RReadWriteLock instance associated with key * * @param key - map key - * @return readWriteLock + * @return fairLock */ RLockReactive getFairLock(K key); diff --git a/redisson/src/main/java/org/redisson/api/RMapRx.java b/redisson/src/main/java/org/redisson/api/RMapRx.java index ce5042692..e462c953d 100644 --- a/redisson/src/main/java/org/redisson/api/RMapRx.java +++ b/redisson/src/main/java/org/redisson/api/RMapRx.java @@ -437,5 +437,45 @@ public interface RMapRx extends RExpirableRx { * @return iterator */ Flowable keyIterator(String pattern, int count); + + /** + * Returns RPermitExpirableSemaphore instance associated with key + * + * @param key - map key + * @return permitExpirableSemaphore + */ + RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(K key); + + /** + * Returns RSemaphore instance associated with key + * + * @param key - map key + * @return semaphore + */ + RSemaphoreRx getSemaphore(K key); + + /** + * Returns RReadWriteLock instance associated with key + * + * @param key - map key + * @return fairLock + */ + RLockRx getFairLock(K key); + + /** + * Returns RReadWriteLock instance associated with key + * + * @param key - map key + * @return readWriteLock + */ + RReadWriteLockRx getReadWriteLock(K key); + + /** + * Returns RLock instance associated with key + * + * @param key - map key + * @return lock + */ + RLockRx getLock(K key); } From 4771e1c93daf34152e1ade8cd5becd47b7e248b5 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 14:46:16 +0300 Subject: [PATCH 10/11] Feature - getReadWriteLock, getSemaphore, getPermitExpirableSemaphore, getFairLock, getLock methods added to RSetRx, RSetReactive, RSetCacheReactive, RSetCacheRx objects #1787 --- .../java/org/redisson/RedissonReactive.java | 12 ++--- .../main/java/org/redisson/RedissonRx.java | 12 ++--- .../org/redisson/api/RSetCacheReactive.java | 53 ++++++++++++++++++- .../java/org/redisson/api/RSetCacheRx.java | 53 ++++++++++++++++++- .../java/org/redisson/api/RSetReactive.java | 42 ++++++++++++++- .../main/java/org/redisson/api/RSetRx.java | 42 ++++++++++++++- .../reactive/RedissonBatchReactive.java | 12 ++--- .../reactive/RedissonSetCacheReactive.java | 35 +++++++++++- .../reactive/RedissonSetMultimapReactive.java | 16 ++++-- .../reactive/RedissonSetReactive.java | 34 +++++++++++- .../reactive/RedissonTransactionReactive.java | 8 +-- .../java/org/redisson/rx/RedissonBatchRx.java | 12 ++--- .../org/redisson/rx/RedissonSetCacheRx.java | 35 +++++++++++- .../redisson/rx/RedissonSetMultimapRx.java | 20 ++++--- .../java/org/redisson/rx/RedissonSetRx.java | 36 ++++++++++++- .../redisson/rx/RedissonTransactionRx.java | 8 +-- 16 files changed, 377 insertions(+), 53 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 6763bec4e..59209e291 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -252,13 +252,13 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RSetMultimapReactive getSetMultimap(String name) { return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap(commandExecutor, name), - new RedissonSetMultimapReactive(commandExecutor, name), RSetMultimapReactive.class); + new RedissonSetMultimapReactive(commandExecutor, name, this), RSetMultimapReactive.class); } @Override public RSetMultimapReactive getSetMultimap(String name, Codec codec) { return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap(codec, commandExecutor, name), - new RedissonSetMultimapReactive(codec, commandExecutor, name), RSetMultimapReactive.class); + new RedissonSetMultimapReactive(codec, commandExecutor, name, this), RSetMultimapReactive.class); } @Override @@ -279,14 +279,14 @@ public class RedissonReactive implements RedissonReactiveClient { public RSetReactive getSet(String name) { RedissonSet set = new RedissonSet(commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, this), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, this), RSetReactive.class); } @Override @@ -369,14 +369,14 @@ public class RedissonReactive implements RedissonReactiveClient { public RSetCacheReactive getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, this), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, this), RSetCacheReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 552513e89..49e73df7d 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -226,13 +226,13 @@ public class RedissonRx implements RedissonRxClient { @Override public RSetMultimapRx getSetMultimap(String name) { return RxProxyBuilder.create(commandExecutor, new RedissonSetMultimap(commandExecutor, name), - new RedissonSetMultimapRx(commandExecutor, name), RSetMultimapRx.class); + new RedissonSetMultimapRx(commandExecutor, name, this), RSetMultimapRx.class); } @Override public RSetMultimapRx getSetMultimap(String name, Codec codec) { return RxProxyBuilder.create(commandExecutor, new RedissonSetMultimap(codec, commandExecutor, name), - new RedissonSetMultimapRx(codec, commandExecutor, name), RSetMultimapRx.class); + new RedissonSetMultimapRx(codec, commandExecutor, name, this), RSetMultimapRx.class); } @Override @@ -253,14 +253,14 @@ public class RedissonRx implements RedissonRxClient { public RSetRx getSet(String name) { RedissonSet set = new RedissonSet(commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, this), RSetRx.class); } @Override public RSetRx getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, this), RSetRx.class); } @Override @@ -350,14 +350,14 @@ public class RedissonRx implements RedissonRxClient { public RSetCacheRx getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, this), RSetCacheRx.class); } @Override public RSetCacheRx getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, this), RSetCacheRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java b/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java index abf621229..80ba62b92 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSetCacheReactive.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; /** - * Async set functions + * Reactive interface for RSetCache object * * @author Nikita Koksharov * @@ -29,6 +29,57 @@ import org.reactivestreams.Publisher; */ public interface RSetCacheReactive extends RCollectionReactive { + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphoreReactive getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockReactive getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLockReactive getReadWriteLock(V value); + + /** + * Returns lock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockReactive getLock(V value); + + /** + * Stores value with specified time to live. + * Value expires after specified time to live. + * + * @param value to add + * @param ttl - time to live for key\value entry. + * If 0 then stores infinitely. + * @param unit - time unit + * @return true if value has been added. false + * if value already been in collection. + */ Publisher add(V value, long ttl, TimeUnit unit); /** diff --git a/redisson/src/main/java/org/redisson/api/RSetCacheRx.java b/redisson/src/main/java/org/redisson/api/RSetCacheRx.java index b4a39eb9e..ab7b2f904 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCacheRx.java +++ b/redisson/src/main/java/org/redisson/api/RSetCacheRx.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import io.reactivex.Flowable; /** - * Async set functions + * RxJava2 interface for RSetCache object * * @author Nikita Koksharov * @@ -29,6 +29,57 @@ import io.reactivex.Flowable; */ public interface RSetCacheRx extends RCollectionRx { + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphoreRx getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockRx getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLockRx getReadWriteLock(V value); + + /** + * Returns lock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockRx getLock(V value); + + /** + * Stores value with specified time to live. + * Value expires after specified time to live. + * + * @param value to add + * @param ttl - time to live for key\value entry. + * If 0 then stores infinitely. + * @param unit - time unit + * @return true if value has been added. false + * if value already been in collection. + */ Flowable add(V value, long ttl, TimeUnit unit); /** diff --git a/redisson/src/main/java/org/redisson/api/RSetReactive.java b/redisson/src/main/java/org/redisson/api/RSetReactive.java index 652342b3b..0de5eea46 100644 --- a/redisson/src/main/java/org/redisson/api/RSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RSetReactive.java @@ -20,7 +20,7 @@ import java.util.Set; import org.reactivestreams.Publisher; /** - * Async set functions + * Reactive interface for RSet object * * @author Nikita Koksharov * @@ -28,6 +28,46 @@ import org.reactivestreams.Publisher; */ public interface RSetReactive extends RCollectionReactive, RSortableReactive> { + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphoreReactive getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockReactive getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLockReactive getReadWriteLock(V value); + + /** + * Returns lock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockReactive getLock(V value); + /** * Returns an iterator over elements in this set. * Elements are loaded in batch. Batch size is defined by count param. diff --git a/redisson/src/main/java/org/redisson/api/RSetRx.java b/redisson/src/main/java/org/redisson/api/RSetRx.java index 94cbae32e..0ca18c5fd 100644 --- a/redisson/src/main/java/org/redisson/api/RSetRx.java +++ b/redisson/src/main/java/org/redisson/api/RSetRx.java @@ -20,7 +20,7 @@ import java.util.Set; import io.reactivex.Flowable; /** - * Async set functions + * RxJava2 interface for RSetCache object * * @author Nikita Koksharov * @@ -28,6 +28,46 @@ import io.reactivex.Flowable; */ public interface RSetRx extends RCollectionRx, RSortableRx> { + /** + * Returns RPermitExpirableSemaphore instance associated with value + * + * @param value - set value + * @return RPermitExpirableSemaphore object + */ + RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value); + + /** + * Returns RSemaphore instance associated with value + * + * @param value - set value + * @return RSemaphore object + */ + RSemaphoreRx getSemaphore(V value); + + /** + * Returns RLock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockRx getFairLock(V value); + + /** + * Returns RReadWriteLock instance associated with value + * + * @param value - set value + * @return RReadWriteLock object + */ + RReadWriteLockRx getReadWriteLock(V value); + + /** + * Returns lock instance associated with value + * + * @param value - set value + * @return RLock object + */ + RLockRx getLock(V value); + /** * Returns an iterator over elements in this set. * Elements are loaded in batch. Batch size is defined by count param. diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 619b7189e..32bf168d0 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -168,14 +168,14 @@ public class RedissonBatchReactive implements RBatchReactive { public RSetReactive getSet(String name) { RedissonSet set = new RedissonSet(executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, null), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, null), RSetReactive.class); } @Override @@ -233,14 +233,14 @@ public class RedissonBatchReactive implements RBatchReactive { public RSetCacheReactive getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, null), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, null), RSetCacheReactive.class); } @Override @@ -347,13 +347,13 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public RSetMultimapReactive getSetMultimap(String name) { return ReactiveProxyBuilder.create(executorService, new RedissonSetMultimap(executorService, name), - new RedissonSetMultimapReactive(executorService, name), RSetMultimapReactive.class); + new RedissonSetMultimapReactive(executorService, name, null), RSetMultimapReactive.class); } @Override public RSetMultimapReactive getSetMultimap(String name, Codec codec) { return ReactiveProxyBuilder.create(executorService, new RedissonSetMultimap(codec, executorService, name), - new RedissonSetMultimapReactive(codec, executorService, name), RSetMultimapReactive.class); + new RedissonSetMultimapReactive(codec, executorService, name, null), RSetMultimapReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index cd09fed1f..fd3ec2dbc 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -16,9 +16,15 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; +import org.redisson.RedissonSetCache; import org.redisson.ScanIterator; import org.redisson.api.RFuture; +import org.redisson.api.RLockReactive; +import org.redisson.api.RPermitExpirableSemaphoreReactive; +import org.redisson.api.RReadWriteLockReactive; +import org.redisson.api.RSemaphoreReactive; import org.redisson.api.RSetCache; +import org.redisson.api.RedissonReactiveClient; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; @@ -31,9 +37,11 @@ import org.redisson.client.protocol.decoder.ListScanResult; public class RedissonSetCacheReactive { private final RSetCache instance; + private final RedissonReactiveClient redisson; - public RedissonSetCacheReactive(RSetCache instance) { + public RedissonSetCacheReactive(RSetCache instance, RedissonReactiveClient redisson) { this.instance = instance; + this.redisson = redisson; } public Publisher iterator() { @@ -53,5 +61,30 @@ public class RedissonSetCacheReactive { } }.addAll(c); } + + public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreReactive getSemaphore(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockReactive getFairLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockReactive getReadWriteLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockReactive getLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "lock"); + return redisson.getLock(name); + } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java index 5a9a90d3b..9834e0f79 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java @@ -19,6 +19,7 @@ import org.redisson.RedissonListMultimap; import org.redisson.api.RSet; import org.redisson.api.RSetMultimap; import org.redisson.api.RSetReactive; +import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; /** @@ -30,21 +31,26 @@ import org.redisson.client.codec.Codec; */ public class RedissonSetMultimapReactive { - private CommandReactiveExecutor commandExecutor; - private RedissonListMultimap instance; + private final RedissonReactiveClient redisson; + private final CommandReactiveExecutor commandExecutor; + private final RedissonListMultimap instance; - public RedissonSetMultimapReactive(CommandReactiveExecutor commandExecutor, String name) { + public RedissonSetMultimapReactive(CommandReactiveExecutor commandExecutor, String name, RedissonReactiveClient redisson) { this.instance = new RedissonListMultimap(commandExecutor, name); + this.redisson = redisson; + this.commandExecutor = commandExecutor; } - public RedissonSetMultimapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { + public RedissonSetMultimapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RedissonReactiveClient redisson) { this.instance = new RedissonListMultimap(codec, commandExecutor, name); + this.redisson = redisson; + this.commandExecutor = commandExecutor; } public RSetReactive get(K key) { RSet set = ((RSetMultimap)instance).get(key); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, redisson), RSetReactive.class); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index cf4e5f9a5..114dc6b4f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -18,7 +18,12 @@ package org.redisson.reactive; import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RFuture; +import org.redisson.api.RLockReactive; +import org.redisson.api.RPermitExpirableSemaphoreReactive; +import org.redisson.api.RReadWriteLockReactive; +import org.redisson.api.RSemaphoreReactive; import org.redisson.api.RSet; +import org.redisson.api.RedissonReactiveClient; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; @@ -32,9 +37,11 @@ import org.redisson.client.protocol.decoder.ListScanResult; public class RedissonSetReactive { private final RSet instance; + private final RedissonReactiveClient redisson; - public RedissonSetReactive(RSet instance) { + public RedissonSetReactive(RSet instance, RedissonReactiveClient redisson) { this.instance = instance; + this.redisson = redisson; } public Publisher addAll(Publisher c) { @@ -67,4 +74,29 @@ public class RedissonSetReactive { return iterator(null, 10); } + public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) { + String name = ((RedissonSet)instance).getLockName(value, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreReactive getSemaphore(V value) { + String name = ((RedissonSet)instance).getLockName(value, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockReactive getFairLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockReactive getReadWriteLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockReactive getLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "lock"); + return redisson.getLock(name); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java index 739a81a3e..723d6464f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -91,28 +91,28 @@ public class RedissonTransactionReactive implements RTransactionReactive { public RSetReactive getSet(String name) { RSet set = transaction.getSet(name); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, null), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RSet set = transaction.getSet(name, codec); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetReactive.class); + new RedissonSetReactive(set, null), RSetReactive.class); } @Override public RSetCacheReactive getSetCache(String name) { RSetCache set = transaction.getSetCache(name); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, null), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = transaction.getSetCache(name, codec); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set, null), RSetCacheReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java index 73891c53a..cc6d290bf 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java @@ -170,14 +170,14 @@ public class RedissonBatchRx implements RBatchRx { public RSetRx getSet(String name) { RedissonSet set = new RedissonSet(executorService, name, null); return RxProxyBuilder.create(executorService, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, null), RSetRx.class); } @Override public RSetRx getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, executorService, name, null); return RxProxyBuilder.create(executorService, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, null), RSetRx.class); } @Override @@ -241,14 +241,14 @@ public class RedissonBatchRx implements RBatchRx { public RSetCacheRx getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, executorService, name, null); return RxProxyBuilder.create(executorService, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, null), RSetCacheRx.class); } @Override public RSetCacheRx getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, executorService, name, null); return RxProxyBuilder.create(executorService, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, null), RSetCacheRx.class); } @Override @@ -359,13 +359,13 @@ public class RedissonBatchRx implements RBatchRx { @Override public RSetMultimapRx getSetMultimap(String name) { return RxProxyBuilder.create(executorService, new RedissonSetMultimap(executorService, name), - new RedissonSetMultimapRx(executorService, name), RSetMultimapRx.class); + new RedissonSetMultimapRx(executorService, name, null), RSetMultimapRx.class); } @Override public RSetMultimapRx getSetMultimap(String name, Codec codec) { return RxProxyBuilder.create(executorService, new RedissonSetMultimap(codec, executorService, name), - new RedissonSetMultimapRx(codec, executorService, name), RSetMultimapRx.class); + new RedissonSetMultimapRx(codec, executorService, name, null), RSetMultimapRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java index d5ba58e5e..1d47d841c 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetCacheRx.java @@ -16,9 +16,15 @@ package org.redisson.rx; import org.reactivestreams.Publisher; +import org.redisson.RedissonSetCache; import org.redisson.ScanIterator; import org.redisson.api.RFuture; +import org.redisson.api.RLockRx; +import org.redisson.api.RPermitExpirableSemaphoreRx; +import org.redisson.api.RReadWriteLockRx; +import org.redisson.api.RSemaphoreRx; import org.redisson.api.RSetCache; +import org.redisson.api.RedissonRxClient; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; @@ -31,9 +37,11 @@ import org.redisson.client.protocol.decoder.ListScanResult; public class RedissonSetCacheRx { private final RSetCache instance; + private final RedissonRxClient redisson; - public RedissonSetCacheRx(RSetCache instance) { + public RedissonSetCacheRx(RSetCache instance, RedissonRxClient redisson) { this.instance = instance; + this.redisson = redisson; } public Publisher iterator() { @@ -54,4 +62,29 @@ public class RedissonSetCacheRx { }.addAll(c); } + public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreRx getSemaphore(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockRx getFairLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockRx getReadWriteLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockRx getLock(V value) { + String name = ((RedissonSetCache)instance).getLockName(value, "lock"); + return redisson.getLock(name); + } + } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetMultimapRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetMultimapRx.java index dfb8b33fc..097be9637 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetMultimapRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetMultimapRx.java @@ -18,7 +18,8 @@ package org.redisson.rx; import org.redisson.RedissonListMultimap; import org.redisson.RedissonSet; import org.redisson.api.RSetMultimap; -import org.redisson.api.RSetReactive; +import org.redisson.api.RSetRx; +import org.redisson.api.RedissonRxClient; import org.redisson.client.codec.Codec; /** @@ -30,21 +31,26 @@ import org.redisson.client.codec.Codec; */ public class RedissonSetMultimapRx { - private CommandRxExecutor commandExecutor; - private RedissonListMultimap instance; + private final RedissonRxClient redisson; + private final CommandRxExecutor commandExecutor; + private final RedissonListMultimap instance; - public RedissonSetMultimapRx(CommandRxExecutor commandExecutor, String name) { + public RedissonSetMultimapRx(CommandRxExecutor commandExecutor, String name, RedissonRxClient redisson) { this.instance = new RedissonListMultimap(commandExecutor, name); + this.redisson = redisson; + this.commandExecutor = commandExecutor; } - public RedissonSetMultimapRx(Codec codec, CommandRxExecutor commandExecutor, String name) { + public RedissonSetMultimapRx(Codec codec, CommandRxExecutor commandExecutor, String name, RedissonRxClient redisson) { this.instance = new RedissonListMultimap(codec, commandExecutor, name); + this.redisson = redisson; + this.commandExecutor = commandExecutor; } - public RSetReactive get(K key) { + public RSetRx get(K key) { RedissonSet set = (RedissonSet) ((RSetMultimap)instance).get(key); return RxProxyBuilder.create(commandExecutor, set, - new RedissonSetRx(set), RSetReactive.class); + new RedissonSetRx(set, redisson), RSetRx.class); } } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java b/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java index 59cc84355..c957b793e 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonSetRx.java @@ -18,7 +18,12 @@ package org.redisson.rx; import org.reactivestreams.Publisher; import org.redisson.RedissonSet; import org.redisson.api.RFuture; +import org.redisson.api.RLockRx; +import org.redisson.api.RPermitExpirableSemaphoreRx; +import org.redisson.api.RReadWriteLockRx; +import org.redisson.api.RSemaphoreRx; import org.redisson.api.RSet; +import org.redisson.api.RedissonRxClient; import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; @@ -34,9 +39,11 @@ import io.reactivex.Flowable; public class RedissonSetRx { private final RSet instance; + private final RedissonRxClient redisson; - public RedissonSetRx(RSet instance) { + public RedissonSetRx(RSet instance, RedissonRxClient redisson) { this.instance = instance; + this.redisson = redisson; } public Flowable addAll(Publisher c) { @@ -60,7 +67,7 @@ public class RedissonSetRx { return new SetRxIterator() { @Override protected RFuture> scanIterator(RedisClient client, long nextIterPos) { - return ((RedissonSet)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, pattern, count); + return ((RedissonSet)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, pattern, count); } }.create(); } @@ -69,4 +76,29 @@ public class RedissonSetRx { return iterator(null, 10); } + public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value) { + String name = ((RedissonSet)instance).getLockName(value, "permitexpirablesemaphore"); + return redisson.getPermitExpirableSemaphore(name); + } + + public RSemaphoreRx getSemaphore(V value) { + String name = ((RedissonSet)instance).getLockName(value, "semaphore"); + return redisson.getSemaphore(name); + } + + public RLockRx getFairLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "fairlock"); + return redisson.getFairLock(name); + } + + public RReadWriteLockRx getReadWriteLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "rw_lock"); + return redisson.getReadWriteLock(name); + } + + public RLockRx getLock(V value) { + String name = ((RedissonSet)instance).getLockName(value, "lock"); + return redisson.getLock(name); + } + } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java b/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java index 914ebd8a8..d784589a3 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java @@ -93,28 +93,28 @@ public class RedissonTransactionRx implements RTransactionRx { public RSetRx getSet(String name) { RSet set = transaction.getSet(name); return RxProxyBuilder.create(executorService, set, - new RedissonSetReactive(set), RSetRx.class); + new RedissonSetReactive(set, null), RSetRx.class); } @Override public RSetRx getSet(String name, Codec codec) { RSet set = transaction.getSet(name, codec); return RxProxyBuilder.create(executorService, set, - new RedissonSetRx(set), RSetRx.class); + new RedissonSetRx(set, null), RSetRx.class); } @Override public RSetCacheRx getSetCache(String name) { RSetCache set = transaction.getSetCache(name); return RxProxyBuilder.create(executorService, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, null), RSetCacheRx.class); } @Override public RSetCacheRx getSetCache(String name, Codec codec) { RSetCache set = transaction.getSetCache(name, codec); return RxProxyBuilder.create(executorService, set, - new RedissonSetCacheRx(set), RSetCacheRx.class); + new RedissonSetCacheRx(set, null), RSetCacheRx.class); } @Override From 6165e6211b3be3e5eb623020bb002804714bc5c6 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Dec 2018 14:46:31 +0300 Subject: [PATCH 11/11] javadocs fixed --- redisson/src/main/java/org/redisson/api/RMap.java | 2 +- redisson/src/main/java/org/redisson/api/RMapReactive.java | 2 +- redisson/src/main/java/org/redisson/api/RMapRx.java | 2 +- redisson/src/main/java/org/redisson/api/RSet.java | 2 +- redisson/src/main/java/org/redisson/api/RSetCache.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/redisson/src/main/java/org/redisson/api/RMap.java b/redisson/src/main/java/org/redisson/api/RMap.java index 3f750baac..64de17b13 100644 --- a/redisson/src/main/java/org/redisson/api/RMap.java +++ b/redisson/src/main/java/org/redisson/api/RMap.java @@ -129,7 +129,7 @@ public interface RMap extends ConcurrentMap, RExpirable, RMapAsyncRReadWriteLock instance associated with key + * Returns RLock instance associated with key * * @param key - map key * @return fairlock diff --git a/redisson/src/main/java/org/redisson/api/RMapReactive.java b/redisson/src/main/java/org/redisson/api/RMapReactive.java index 15f260daf..3cd126cef 100644 --- a/redisson/src/main/java/org/redisson/api/RMapReactive.java +++ b/redisson/src/main/java/org/redisson/api/RMapReactive.java @@ -454,7 +454,7 @@ public interface RMapReactive extends RExpirableReactive { RSemaphoreReactive getSemaphore(K key); /** - * Returns RReadWriteLock instance associated with key + * Returns RLock instance associated with key * * @param key - map key * @return fairLock diff --git a/redisson/src/main/java/org/redisson/api/RMapRx.java b/redisson/src/main/java/org/redisson/api/RMapRx.java index e462c953d..e2c918e23 100644 --- a/redisson/src/main/java/org/redisson/api/RMapRx.java +++ b/redisson/src/main/java/org/redisson/api/RMapRx.java @@ -455,7 +455,7 @@ public interface RMapRx extends RExpirableRx { RSemaphoreRx getSemaphore(K key); /** - * Returns RReadWriteLock instance associated with key + * Returns RLock instance associated with key * * @param key - map key * @return fairLock diff --git a/redisson/src/main/java/org/redisson/api/RSet.java b/redisson/src/main/java/org/redisson/api/RSet.java index 062820b48..f9afb4649 100644 --- a/redisson/src/main/java/org/redisson/api/RSet.java +++ b/redisson/src/main/java/org/redisson/api/RSet.java @@ -54,7 +54,7 @@ public interface RSet extends Set, RExpirable, RSetAsync, RSortableRReadWriteLock instance associated with value + * Returns RLock instance associated with value * * @param value - set value * @return RLock object diff --git a/redisson/src/main/java/org/redisson/api/RSetCache.java b/redisson/src/main/java/org/redisson/api/RSetCache.java index 0104e07db..a507b26a4 100644 --- a/redisson/src/main/java/org/redisson/api/RSetCache.java +++ b/redisson/src/main/java/org/redisson/api/RSetCache.java @@ -65,7 +65,7 @@ public interface RSetCache extends Set, RExpirable, RSetCacheAsync, RDe RSemaphore getSemaphore(V value); /** - * Returns RReadWriteLock instance associated with value + * Returns RLock instance associated with value * * @param value - set value * @return RLock object