Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/api/RMapReactive.java
#	redisson/src/main/java/org/redisson/api/RSetCacheReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java
pull/1833/head
Nikita Koksharov 6 years ago
commit 52f476a398

@ -1,4 +1,4 @@
Redisson: Redis based In-Memory Data Grid for Java.<br/> State of the Art Redis client
Redisson: Redis based In-Memory Data Grid for Java.<br/> State of the Art Redis Java client
====
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.8.2) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)**
@ -12,17 +12,17 @@ Based on high-performance async and lock-free Java Redis client and [Netty](http
Features
================================
* Replicated servers mode (also supports [AWS ElastiCache](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Replication.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)):
* Replicated Redis servers mode (also supports [AWS ElastiCache](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Replication.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)):
1. automatic master server change discovery
* Cluster servers mode (also supports [AWS ElastiCache Cluster](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Clusters.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)):
* Clustered Redis servers mode (also supports [AWS ElastiCache Cluster](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/Clusters.html) and [Azure Redis Cache](https://azure.microsoft.com/en-us/services/cache/)):
1. automatic master and slave servers discovery
2. automatic status and topology update
3. automatic slots change discovery
* Sentinel servers mode:
* Sentinel Redis servers mode:
1. automatic master, slave and sentinel servers discovery
2. automatic status and topology update
* Master with Slave servers mode
* Single server mode
* Master with Slave Redis servers mode
* Single Redis server mode
* Thread-safe implementation
* [Reactive Streams](https://github.com/redisson/redisson/wiki/3.-operations-execution#32-reactive-way) API
* [Asynchronous](https://github.com/redisson/redisson/wiki/3.-operations-execution#31-async-way) API
@ -137,14 +137,14 @@ Config = ...
// 2. Create Redisson instance
RedissonClient redisson = Redisson.create(config);
// 3. Get object you need
// 3. Get Redis based object or service you need
RMap<MyKey, MyValue> map = redisson.getMap("myMap");
RLock lock = redisson.getLock("myLock");
RExecutorService executor = redisson.getExecutorService("myExecutorService");
// over 30 different objects and services ...
// over 30 different Redis based objects and services ...
```

@ -46,6 +46,8 @@ Usage
### 2. Add settings into `application.settings` file
Common spring boot settings or Redisson settings could be used.
```properties
# common spring boot settings

@ -124,6 +124,10 @@ public class Redisson implements RedissonClient {
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
}
public SemaphorePubSub getSemaphorePubSub() {
return semaphorePubSub;
}
public EvictionScheduler getEvictionScheduler() {
return evictionScheduler;
}

@ -45,7 +45,7 @@ public class RedissonFairLock extends RedissonLock implements RLock {
private final String threadsQueueName;
private final String timeoutSetName;
protected RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
threadsQueueName = prefixName("redisson_lock_queue", name);

@ -33,11 +33,14 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.MapOptions;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.MapOptions.WriteMode;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
@ -106,7 +109,31 @@ public class RedissonMap<K, V> extends RedissonExpirable implements RMap<K, V> {
public <KOut, VOut> RMapReduce<K, V, KOut, VOut> mapReduce() {
return new RedissonMapReduce<K, V, KOut, VOut>(this, redisson, commandExecutor.getConnectionManager());
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) {
String lockName = getLockName(key, "permitexpirablesemaphore");
return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RSemaphore getSemaphore(K key) {
String lockName = getLockName(key, "semaphore");
return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RCountDownLatch getCountDownLatch(K key) {
String lockName = getLockName(key, "countdownlatch");
return new RedissonCountDownLatch(commandExecutor, lockName);
}
@Override
public RLock getFairLock(K key) {
String lockName = getLockName(key, "fairlock");
return new RedissonFairLock(commandExecutor, lockName);
}
@Override
public RLock getLock(K key) {
String lockName = getLockName(key, "lock");

@ -112,6 +112,22 @@ public class RedissonReactive implements RedissonReactiveClient {
codecProvider = config.getReferenceCodecProvider();
}
public EvictionScheduler getEvictionScheduler() {
return evictionScheduler;
}
public ConnectionManager getConnectionManager() {
return connectionManager;
}
public CommandReactiveService getCommandExecutor() {
return commandExecutor;
}
public SemaphorePubSub getSemaphorePubSub() {
return semaphorePubSub;
}
@Override
public <K, V> RStreamReactive<K, V> getStream(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonStream<K, V>(commandExecutor, name), RStreamReactive.class);
@ -239,41 +255,41 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap<K, V>(commandExecutor, name),
new RedissonSetMultimapReactive<K, V>(commandExecutor, name), RSetMultimapReactive.class);
new RedissonSetMultimapReactive<K, V>(commandExecutor, name, this), RSetMultimapReactive.class);
}
@Override
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonSetMultimap<K, V>(codec, commandExecutor, name),
new RedissonSetMultimapReactive<K, V>(codec, commandExecutor, name), RSetMultimapReactive.class);
new RedissonSetMultimapReactive<K, V>(codec, commandExecutor, name, this), RSetMultimapReactive.class);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name) {
RedissonMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, null);
return ReactiveProxyBuilder.create(commandExecutor, map,
new RedissonMapReactive<K, V>(map), RMapReactive.class);
new RedissonMapReactive<K, V>(map, this), RMapReactive.class);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, null);
return ReactiveProxyBuilder.create(commandExecutor, map,
new RedissonMapReactive<K, V>(map), RMapReactive.class);
new RedissonMapReactive<K, V>(map, this), RMapReactive.class);
}
@Override
public <V> RSetReactive<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(set), RSetReactive.class);
new RedissonSetReactive<V>(set, this), RSetReactive.class);
}
@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(set), RSetReactive.class);
new RedissonSetReactive<V>(set, this), RSetReactive.class);
}
@Override
@ -362,14 +378,14 @@ public class RedissonReactive implements RedissonReactiveClient {
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set, this), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set, this), RSetCacheReactive.class);
}
@Override
@ -473,20 +489,18 @@ public class RedissonReactive implements RedissonReactiveClient {
new RedissonMapCacheReactive<K, V>(map), RMapCacheReactive.class);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, MapOptions<K, V> options) {
RedissonMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, options);
return ReactiveProxyBuilder.create(commandExecutor, map,
new RedissonMapReactive<K, V>(map), RMapReactive.class);
new RedissonMapReactive<K, V>(map, this), RMapReactive.class);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, options);
return ReactiveProxyBuilder.create(commandExecutor, map,
new RedissonMapReactive<K, V>(map), RMapReactive.class);
new RedissonMapReactive<K, V>(map, this), RMapReactive.class);
}
@Override

@ -226,41 +226,41 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <K, V> RSetMultimapRx<K, V> getSetMultimap(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonSetMultimap<K, V>(commandExecutor, name),
new RedissonSetMultimapRx<K, V>(commandExecutor, name), RSetMultimapRx.class);
new RedissonSetMultimapRx<K, V>(commandExecutor, name, this), RSetMultimapRx.class);
}
@Override
public <K, V> RSetMultimapRx<K, V> getSetMultimap(String name, Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonSetMultimap<K, V>(codec, commandExecutor, name),
new RedissonSetMultimapRx<K, V>(codec, commandExecutor, name), RSetMultimapRx.class);
new RedissonSetMultimapRx<K, V>(codec, commandExecutor, name, this), RSetMultimapRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name) {
RedissonMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
new RedissonMapRx<K, V>(map, this), RMapRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, Codec codec) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
new RedissonMapRx<K, V>(map, this), RMapRx.class);
}
@Override
public <V> RSetRx<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetRx<V>(set), RSetRx.class);
new RedissonSetRx<V>(set, this), RSetRx.class);
}
@Override
public <V> RSetRx<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetRx<V>(set), RSetRx.class);
new RedissonSetRx<V>(set, this), RSetRx.class);
}
@Override
@ -350,14 +350,14 @@ public class RedissonRx implements RedissonRxClient {
public <V> RSetCacheRx<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheRx<V>(set), RSetCacheRx.class);
new RedissonSetCacheRx<V>(set, this), RSetCacheRx.class);
}
@Override
public <V> RSetCacheRx<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheRx<V>(set), RSetCacheRx.class);
new RedissonSetCacheRx<V>(set, this), RSetCacheRx.class);
}
@Override
@ -460,7 +460,7 @@ public class RedissonRx implements RedissonRxClient {
public <K, V> RMapRx<K, V> getMap(String name, MapOptions<K, V> options) {
RedissonMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, options);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
new RedissonMapRx<K, V>(map, this), RMapRx.class);
}
@ -468,7 +468,7 @@ public class RedissonRx implements RedissonRxClient {
public <K, V> RMapRx<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, options);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
new RedissonMapRx<K, V>(map, this), RMapRx.class);
}
@Override

@ -24,8 +24,12 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
@ -603,20 +607,50 @@ public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIt
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SORT_TO, params.toArray());
}
public String getLockName(Object value) {
public String getLockName(Object value, String suffix) {
ByteBuf state = encode(value);
try {
return suffixName(getName(value), Hash.hash128toBase64(state) + ":lock");
return suffixName(getName(value), Hash.hash128toBase64(state) + ":" + suffix);
} finally {
state.release();
}
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) {
String lockName = getLockName(value, "permitexpirablesemaphore");
return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RSemaphore getSemaphore(V value) {
String lockName = getLockName(value, "semaphore");
return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RCountDownLatch getCountDownLatch(V value) {
String lockName = getLockName(value, "countdownlatch");
return new RedissonCountDownLatch(commandExecutor, lockName);
}
@Override
public RLock getFairLock(V value) {
String lockName = getLockName(value, "fairlock");
return new RedissonFairLock(commandExecutor, lockName);
}
@Override
public RLock getLock(V value) {
String lockName = getLockName(value);
String lockName = getLockName(value, "lock");
return new RedissonLock(commandExecutor, lockName);
}
@Override
public RReadWriteLock getReadWriteLock(V value) {
String lockName = getLockName(value, "rw_lock");
return new RedissonReadWriteLock(commandExecutor, lockName);
}
@Override
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos,

@ -25,8 +25,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSetCache;
import org.redisson.api.RedissonClient;
import org.redisson.api.mapreduce.RCollectionMapReduce;
@ -367,20 +371,50 @@ public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<
delete();
}
public String getLockName(Object value) {
public String getLockName(Object value, String suffix) {
ByteBuf state = encode(value);
try {
return suffixName(getName(value), Hash.hash128toBase64(state) + ":lock");
return suffixName(getName(value), Hash.hash128toBase64(state) + ":" + suffix);
} finally {
state.release();
}
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) {
String lockName = getLockName(value, "permitexpirablesemaphore");
return new RedissonPermitExpirableSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RSemaphore getSemaphore(V value) {
String lockName = getLockName(value, "semaphore");
return new RedissonSemaphore(commandExecutor, lockName, ((Redisson)redisson).getSemaphorePubSub());
}
@Override
public RCountDownLatch getCountDownLatch(V value) {
String lockName = getLockName(value, "countdownlatch");
return new RedissonCountDownLatch(commandExecutor, lockName);
}
@Override
public RLock getFairLock(V value) {
String lockName = getLockName(value, "fairlock");
return new RedissonFairLock(commandExecutor, lockName);
}
@Override
public RLock getLock(V value) {
String lockName = getLockName(value);
String lockName = getLockName(value, "lock");
return new RedissonLock(commandExecutor, lockName);
}
@Override
public RReadWriteLock getReadWriteLock(V value) {
String lockName = getLockName(value, "rw_lock");
return new RedissonReadWriteLock(commandExecutor, lockName);
}
@Override
public void destroy() {

@ -24,8 +24,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.RSet;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
@ -50,7 +54,8 @@ import org.redisson.command.CommandAsyncExecutor;
*/
public class RedissonSetMultimapValues<V> extends RedissonExpirable implements RSet<V> {
private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE);
private static final RedisCommand<ListScanResult<Object>> EVAL_SSCAN = new RedisCommand<ListScanResult<Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectListReplayDecoder(), new ListScanResultReplayDecoder()), ValueType.MAP_VALUE);
private final RSet<V> set;
private final Object key;
@ -469,6 +474,31 @@ public class RedissonSetMultimapValues<V> extends RedissonExpirable implements R
Arrays.<Object>asList(timeoutSetName, getName()), args.toArray());
}
@Override
public RCountDownLatch getCountDownLatch(V value) {
return set.getCountDownLatch(value);
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) {
return set.getPermitExpirableSemaphore(value);
}
@Override
public RSemaphore getSemaphore(V value) {
return set.getSemaphore(value);
}
@Override
public RLock getFairLock(V value) {
return set.getFairLock(value);
}
@Override
public RReadWriteLock getReadWriteLock(V value) {
return set.getReadWriteLock(value);
}
@Override
public RLock getLock(V value) {
return set.getLock(value);

@ -103,6 +103,38 @@ public interface RMap<K, V> extends ConcurrentMap<K, V>, RExpirable, RMapAsync<K
* @return MapReduce instance
*/
<KOut, VOut> RMapReduce<K, V, KOut, VOut> mapReduce();
/**
* Returns <code>RCountDownLatch</code> instance associated with key
*
* @param key - map key
* @return countdownlatch
*/
RCountDownLatch getCountDownLatch(K key);
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with key
*
* @param key - map key
* @return permitExpirableSemaphore
*/
RPermitExpirableSemaphore getPermitExpirableSemaphore(K key);
/**
* Returns <code>RSemaphore</code> instance associated with key
*
* @param key - map key
* @return semaphore
*/
RSemaphore getSemaphore(K key);
/**
* Returns <code>RLock</code> instance associated with key
*
* @param key - map key
* @return fairlock
*/
RLock getFairLock(K key);
/**
* Returns <code>RReadWriteLock</code> instance associated with key

@ -438,5 +438,45 @@ public interface RMapReactive<K, V> extends RExpirableReactive {
* @return iterator
*/
Flux<K> keyIterator(String pattern, int count);
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with key
*
* @param key - map key
* @return permitExpirableSemaphore
*/
RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(K key);
/**
* Returns <code>RSemaphore</code> instance associated with key
*
* @param key - map key
* @return semaphore
*/
RSemaphoreReactive getSemaphore(K key);
/**
* Returns <code>RLock</code> instance associated with key
*
* @param key - map key
* @return fairLock
*/
RLockReactive getFairLock(K key);
/**
* Returns <code>RReadWriteLock</code> instance associated with key
*
* @param key - map key
* @return readWriteLock
*/
RReadWriteLockReactive getReadWriteLock(K key);
/**
* Returns <code>RLock</code> instance associated with key
*
* @param key - map key
* @return lock
*/
RLockReactive getLock(K key);
}

@ -437,5 +437,45 @@ public interface RMapRx<K, V> extends RExpirableRx {
* @return iterator
*/
Flowable<K> keyIterator(String pattern, int count);
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with key
*
* @param key - map key
* @return permitExpirableSemaphore
*/
RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(K key);
/**
* Returns <code>RSemaphore</code> instance associated with key
*
* @param key - map key
* @return semaphore
*/
RSemaphoreRx getSemaphore(K key);
/**
* Returns <code>RLock</code> instance associated with key
*
* @param key - map key
* @return fairLock
*/
RLockRx getFairLock(K key);
/**
* Returns <code>RReadWriteLock</code> instance associated with key
*
* @param key - map key
* @return readWriteLock
*/
RReadWriteLockRx getReadWriteLock(K key);
/**
* Returns <code>RLock</code> instance associated with key
*
* @param key - map key
* @return lock
*/
RLockRx getLock(K key);
}

@ -30,11 +30,51 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
*/
public interface RSet<V> extends Set<V>, RExpirable, RSetAsync<V>, RSortable<Set<V>> {
/**
* Returns <code>RCountDownLatch</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RCountDownLatch object
*/
RCountDownLatch getCountDownLatch(V value);
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphore getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphore getSemaphore(V value);
/**
* Returns <code>RLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLock getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLock getReadWriteLock(V value);
/**
* Returns lock instance associated with <code>value</code>
*
* @param value - set value
* @return lock
* @return RLock object
*/
RLock getLock(V value);

@ -41,11 +41,51 @@ import org.redisson.api.mapreduce.RCollectionMapReduce;
*/
public interface RSetCache<V> extends Set<V>, RExpirable, RSetCacheAsync<V>, RDestroyable {
/**
* Returns <code>RCountDownLatch</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RCountDownLatch object
*/
RCountDownLatch getCountDownLatch(V value);
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphore getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphore getSemaphore(V value);
/**
* Returns <code>RLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLock getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLock getReadWriteLock(V value);
/**
* Returns lock instance associated with <code>value</code>
*
* @param value - set value
* @return lock
* @return RLock object
*/
RLock getLock(V value);

@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Mono;
/**
* Async set functions
* Reactive interface for RSetCache object
*
* @author Nikita Koksharov
*
@ -29,6 +29,57 @@ import reactor.core.publisher.Mono;
*/
public interface RSetCacheReactive<V> extends RCollectionReactive<V> {
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphoreReactive getSemaphore(V value);
/**
* Returns <code>RLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLockReactive getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLockReactive getReadWriteLock(V value);
/**
* Returns lock instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLockReactive getLock(V value);
/**
* Stores value with specified time to live.
* Value expires after specified time to live.
*
* @param value to add
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
* @return <code>true</code> if value has been added. <code>false</code>
* if value already been in collection.
*/
Mono<Boolean> add(V value, long ttl, TimeUnit unit);
/**

@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit;
import io.reactivex.Flowable;
/**
* Async set functions
* RxJava2 interface for RSetCache object
*
* @author Nikita Koksharov
*
@ -29,6 +29,57 @@ import io.reactivex.Flowable;
*/
public interface RSetCacheRx<V> extends RCollectionRx<V> {
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphoreRx getSemaphore(V value);
/**
* Returns <code>RLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLockRx getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLockRx getReadWriteLock(V value);
/**
* Returns lock instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLockRx getLock(V value);
/**
* Stores value with specified time to live.
* Value expires after specified time to live.
*
* @param value to add
* @param ttl - time to live for key\value entry.
* If <code>0</code> then stores infinitely.
* @param unit - time unit
* @return <code>true</code> if value has been added. <code>false</code>
* if value already been in collection.
*/
Flowable<Boolean> add(V value, long ttl, TimeUnit unit);
/**

@ -21,7 +21,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Async set functions
* Reactive interface for RSet object
*
* @author Nikita Koksharov
*
@ -29,6 +29,46 @@ import reactor.core.publisher.Mono;
*/
public interface RSetReactive<V> extends RCollectionReactive<V>, RSortableReactive<Set<V>> {
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphoreReactive getSemaphore(V value);
/**
* Returns <code>RLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLockReactive getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLockReactive getReadWriteLock(V value);
/**
* Returns lock instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLockReactive getLock(V value);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.

@ -20,7 +20,7 @@ import java.util.Set;
import io.reactivex.Flowable;
/**
* Async set functions
* RxJava2 interface for RSetCache object
*
* @author Nikita Koksharov
*
@ -28,6 +28,46 @@ import io.reactivex.Flowable;
*/
public interface RSetRx<V> extends RCollectionRx<V>, RSortableRx<Set<V>> {
/**
* Returns <code>RPermitExpirableSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RPermitExpirableSemaphore object
*/
RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value);
/**
* Returns <code>RSemaphore</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RSemaphore object
*/
RSemaphoreRx getSemaphore(V value);
/**
* Returns <code>RLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLockRx getFairLock(V value);
/**
* Returns <code>RReadWriteLock</code> instance associated with <code>value</code>
*
* @param value - set value
* @return RReadWriteLock object
*/
RReadWriteLockRx getReadWriteLock(V value);
/**
* Returns lock instance associated with <code>value</code>
*
* @param value - set value
* @return RLock object
*/
RLockRx getLock(V value);
/**
* Returns an iterator over elements in this set.
* Elements are loaded in batch. Batch size is defined by <code>count</code> param.

@ -452,7 +452,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
public void returnConnection(ClientConnectionsEntry entry, T connection) {
if (entry.isFreezed() && !entry.isMasterForRead()) {
if (entry.isFreezed() && entry.getFreezeReason() != FreezeReason.SYSTEM) {
connection.closeAsync();
entry.getAllConnections().remove(connection);
} else {

@ -142,14 +142,14 @@ public class RedissonBatchReactive implements RBatchReactive {
public <K, V> RMapReactive<K, V> getMap(String name) {
RedissonMap<K, V> map = new RedissonMap<K, V>(executorService, name, null, null);
return ReactiveProxyBuilder.create(executorService, map,
new RedissonMapReactive<K, V>(map), RMapReactive.class);
new RedissonMapReactive<K, V>(map, null), RMapReactive.class);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, executorService, name, null, null);
return ReactiveProxyBuilder.create(executorService, map,
new RedissonMapReactive<K, V>(map), RMapReactive.class);
new RedissonMapReactive<K, V>(map, null), RMapReactive.class);
}
@Override
@ -170,14 +170,14 @@ public class RedissonBatchReactive implements RBatchReactive {
public <V> RSetReactive<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(set), RSetReactive.class);
new RedissonSetReactive<V>(set, null), RSetReactive.class);
}
@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(set), RSetReactive.class);
new RedissonSetReactive<V>(set, null), RSetReactive.class);
}
@Override
@ -235,14 +235,14 @@ public class RedissonBatchReactive implements RBatchReactive {
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set, null), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set, null), RSetCacheReactive.class);
}
@Override
@ -349,13 +349,13 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name) {
return ReactiveProxyBuilder.create(executorService, new RedissonSetMultimap<K, V>(executorService, name),
new RedissonSetMultimapReactive<K, V>(executorService, name), RSetMultimapReactive.class);
new RedissonSetMultimapReactive<K, V>(executorService, name, null), RSetMultimapReactive.class);
}
@Override
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name, Codec codec) {
return ReactiveProxyBuilder.create(executorService, new RedissonSetMultimap<K, V>(codec, executorService, name),
new RedissonSetMultimapReactive<K, V>(codec, executorService, name), RSetMultimapReactive.class);
new RedissonSetMultimapReactive<K, V>(codec, executorService, name, null), RSetMultimapReactive.class);
}
@Override

@ -23,7 +23,12 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
import org.redisson.api.RLockReactive;
import org.redisson.api.RMap;
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RedissonReactiveClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -40,9 +45,11 @@ import reactor.core.publisher.Mono;
public class RedissonMapReactive<K, V> {
private final RMap<K, V> instance;
private final RedissonReactiveClient redisson;
public RedissonMapReactive(RMap<K, V> instance) {
public RedissonMapReactive(RMap<K, V> instance, RedissonReactiveClient redisson) {
this.instance = instance;
this.redisson = redisson;
}
public Publisher<Map.Entry<K, V>> entryIterator() {
@ -102,5 +109,30 @@ public class RedissonMapReactive<K, V> {
}
});
}
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
}
public RSemaphoreReactive getSemaphore(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "semaphore");
return redisson.getSemaphore(name);
}
public RLockReactive getFairLock(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "fairlock");
return redisson.getFairLock(name);
}
public RReadWriteLockReactive getReadWriteLock(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "rw_lock");
return redisson.getReadWriteLock(name);
}
public RLockReactive getLock(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "lock");
return redisson.getLock(name);
}
}

@ -22,9 +22,15 @@ import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.api.RFuture;
import org.redisson.api.RLockReactive;
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RSetCache;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
@ -41,9 +47,11 @@ import reactor.core.publisher.Mono;
public class RedissonSetCacheReactive<V> {
private final RSetCache<V> instance;
private final RedissonReactiveClient redisson;
public RedissonSetCacheReactive(RSetCache<V> instance) {
public RedissonSetCacheReactive(RSetCache<V> instance, RedissonReactiveClient redisson) {
this.instance = instance;
this.redisson = redisson;
}
public Publisher<V> iterator() {
@ -63,5 +71,30 @@ public class RedissonSetCacheReactive<V> {
}
}.addAll(c);
}
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
}
public RSemaphoreReactive getSemaphore(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "semaphore");
return redisson.getSemaphore(name);
}
public RLockReactive getFairLock(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "fairlock");
return redisson.getFairLock(name);
}
public RReadWriteLockReactive getReadWriteLock(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "rw_lock");
return redisson.getReadWriteLock(name);
}
public RLockReactive getLock(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "lock");
return redisson.getLock(name);
}
}

@ -19,6 +19,7 @@ import org.redisson.RedissonListMultimap;
import org.redisson.api.RSet;
import org.redisson.api.RSetMultimap;
import org.redisson.api.RSetReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
/**
@ -30,21 +31,26 @@ import org.redisson.client.codec.Codec;
*/
public class RedissonSetMultimapReactive<K, V> {
private CommandReactiveExecutor commandExecutor;
private RedissonListMultimap<K, V> instance;
private final RedissonReactiveClient redisson;
private final CommandReactiveExecutor commandExecutor;
private final RedissonListMultimap<K, V> instance;
public RedissonSetMultimapReactive(CommandReactiveExecutor commandExecutor, String name) {
public RedissonSetMultimapReactive(CommandReactiveExecutor commandExecutor, String name, RedissonReactiveClient redisson) {
this.instance = new RedissonListMultimap<K, V>(commandExecutor, name);
this.redisson = redisson;
this.commandExecutor = commandExecutor;
}
public RedissonSetMultimapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
public RedissonSetMultimapReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name, RedissonReactiveClient redisson) {
this.instance = new RedissonListMultimap<K, V>(codec, commandExecutor, name);
this.redisson = redisson;
this.commandExecutor = commandExecutor;
}
public RSetReactive<V> get(K key) {
RSet<V> set = ((RSetMultimap<K, V>)instance).get(key);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(set), RSetReactive.class);
new RedissonSetReactive<V>(set, redisson), RSetReactive.class);
}
}

@ -24,7 +24,12 @@ import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
import org.redisson.api.RFuture;
import org.redisson.api.RLockReactive;
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.api.RSet;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
@ -40,9 +45,11 @@ import reactor.core.publisher.Flux;
public class RedissonSetReactive<V> {
private final RSet<V> instance;
private final RedissonReactiveClient redisson;
public RedissonSetReactive(RSet<V> instance) {
public RedissonSetReactive(RSet<V> instance, RedissonReactiveClient redisson) {
this.instance = instance;
this.redisson = redisson;
}
public Publisher<Boolean> addAll(Publisher<? extends V> c) {
@ -75,4 +82,29 @@ public class RedissonSetReactive<V> {
return iterator(null, 10);
}
}
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
}
public RSemaphoreReactive getSemaphore(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "semaphore");
return redisson.getSemaphore(name);
}
public RLockReactive getFairLock(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "fairlock");
return redisson.getFairLock(name);
}
public RReadWriteLockReactive getReadWriteLock(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "rw_lock");
return redisson.getReadWriteLock(name);
}
public RLockReactive getLock(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "lock");
return redisson.getLock(name);
}
}

@ -64,14 +64,14 @@ public class RedissonTransactionReactive implements RTransactionReactive {
public <K, V> RMapReactive<K, V> getMap(String name) {
RMap<K, V> map = transaction.<K, V>getMap(name);
return ReactiveProxyBuilder.create(executorService, map,
new RedissonMapReactive<K, V>(map), RMapReactive.class);
new RedissonMapReactive<K, V>(map, null), RMapReactive.class);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
RMap<K, V> map = transaction.<K, V>getMap(name, codec);
return ReactiveProxyBuilder.create(executorService, map,
new RedissonMapReactive<K, V>(map), RMapReactive.class);
new RedissonMapReactive<K, V>(map, null), RMapReactive.class);
}
@Override
@ -92,28 +92,28 @@ public class RedissonTransactionReactive implements RTransactionReactive {
public <V> RSetReactive<V> getSet(String name) {
RSet<V> set = transaction.<V>getSet(name);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(set), RSetReactive.class);
new RedissonSetReactive<V>(set, null), RSetReactive.class);
}
@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
RSet<V> set = transaction.<V>getSet(name, codec);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(set), RSetReactive.class);
new RedissonSetReactive<V>(set, null), RSetReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = transaction.<V>getSetCache(name);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set, null), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = transaction.<V>getSetCache(name, codec);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set, null), RSetCacheReactive.class);
}
@Override

@ -142,14 +142,14 @@ public class RedissonBatchRx implements RBatchRx {
public <K, V> RMapRx<K, V> getMap(String name) {
RedissonMap<K, V> map = new RedissonMap<K, V>(executorService, name, null, null);
return RxProxyBuilder.create(executorService, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
new RedissonMapRx<K, V>(map, null), RMapRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, Codec codec) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, executorService, name, null, null);
return RxProxyBuilder.create(executorService, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
new RedissonMapRx<K, V>(map, null), RMapRx.class);
}
@Override
@ -170,14 +170,14 @@ public class RedissonBatchRx implements RBatchRx {
public <V> RSetRx<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(executorService, name, null);
return RxProxyBuilder.create(executorService, set,
new RedissonSetRx<V>(set), RSetRx.class);
new RedissonSetRx<V>(set, null), RSetRx.class);
}
@Override
public <V> RSetRx<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, executorService, name, null);
return RxProxyBuilder.create(executorService, set,
new RedissonSetRx<V>(set), RSetRx.class);
new RedissonSetRx<V>(set, null), RSetRx.class);
}
@Override
@ -241,14 +241,14 @@ public class RedissonBatchRx implements RBatchRx {
public <V> RSetCacheRx<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, executorService, name, null);
return RxProxyBuilder.create(executorService, set,
new RedissonSetCacheRx<V>(set), RSetCacheRx.class);
new RedissonSetCacheRx<V>(set, null), RSetCacheRx.class);
}
@Override
public <V> RSetCacheRx<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
return RxProxyBuilder.create(executorService, set,
new RedissonSetCacheRx<V>(set), RSetCacheRx.class);
new RedissonSetCacheRx<V>(set, null), RSetCacheRx.class);
}
@Override
@ -359,13 +359,13 @@ public class RedissonBatchRx implements RBatchRx {
@Override
public <K, V> RSetMultimapRx<K, V> getSetMultimap(String name) {
return RxProxyBuilder.create(executorService, new RedissonSetMultimap<K, V>(executorService, name),
new RedissonSetMultimapRx<K, V>(executorService, name), RSetMultimapRx.class);
new RedissonSetMultimapRx<K, V>(executorService, name, null), RSetMultimapRx.class);
}
@Override
public <K, V> RSetMultimapRx<K, V> getSetMultimap(String name, Codec codec) {
return RxProxyBuilder.create(executorService, new RedissonSetMultimap<K, V>(codec, executorService, name),
new RedissonSetMultimapRx<K, V>(codec, executorService, name), RSetMultimapRx.class);
new RedissonSetMultimapRx<K, V>(codec, executorService, name, null), RSetMultimapRx.class);
}
@Override

@ -20,7 +20,13 @@ import java.util.Map.Entry;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
import org.redisson.RedissonRx;
import org.redisson.api.RLockRx;
import org.redisson.api.RMap;
import org.redisson.api.RPermitExpirableSemaphoreRx;
import org.redisson.api.RReadWriteLockRx;
import org.redisson.api.RSemaphoreRx;
import org.redisson.api.RedissonRxClient;
/**
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
@ -34,9 +40,11 @@ import org.redisson.api.RMap;
public class RedissonMapRx<K, V> {
private final RedissonMap<K, V> instance;
private RedissonRxClient redisson;
public RedissonMapRx(RMap<K, V> instance) {
public RedissonMapRx(RMap<K, V> instance, RedissonRx redisson) {
this.instance = (RedissonMap<K, V>) instance;
this.redisson = redisson;
}
public Publisher<Map.Entry<K, V>> entryIterator() {
@ -97,4 +105,29 @@ public class RedissonMapRx<K, V> {
}.create();
}
public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
}
public RSemaphoreRx getSemaphore(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "semaphore");
return redisson.getSemaphore(name);
}
public RLockRx getFairLock(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "fairlock");
return redisson.getFairLock(name);
}
public RReadWriteLockRx getReadWriteLock(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "rw_lock");
return redisson.getReadWriteLock(name);
}
public RLockRx getLock(K key) {
String name = ((RedissonMap<K, V>)instance).getLockName(key, "lock");
return redisson.getLock(name);
}
}

@ -16,9 +16,15 @@
package org.redisson.rx;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.api.RFuture;
import org.redisson.api.RLockRx;
import org.redisson.api.RPermitExpirableSemaphoreRx;
import org.redisson.api.RReadWriteLockRx;
import org.redisson.api.RSemaphoreRx;
import org.redisson.api.RSetCache;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
@ -31,9 +37,11 @@ import org.redisson.client.protocol.decoder.ListScanResult;
public class RedissonSetCacheRx<V> {
private final RSetCache<V> instance;
private final RedissonRxClient redisson;
public RedissonSetCacheRx(RSetCache<V> instance) {
public RedissonSetCacheRx(RSetCache<V> instance, RedissonRxClient redisson) {
this.instance = instance;
this.redisson = redisson;
}
public Publisher<V> iterator() {
@ -54,4 +62,29 @@ public class RedissonSetCacheRx<V> {
}.addAll(c);
}
public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
}
public RSemaphoreRx getSemaphore(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "semaphore");
return redisson.getSemaphore(name);
}
public RLockRx getFairLock(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "fairlock");
return redisson.getFairLock(name);
}
public RReadWriteLockRx getReadWriteLock(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "rw_lock");
return redisson.getReadWriteLock(name);
}
public RLockRx getLock(V value) {
String name = ((RedissonSetCache<V>)instance).getLockName(value, "lock");
return redisson.getLock(name);
}
}

@ -18,7 +18,8 @@ package org.redisson.rx;
import org.redisson.RedissonListMultimap;
import org.redisson.RedissonSet;
import org.redisson.api.RSetMultimap;
import org.redisson.api.RSetReactive;
import org.redisson.api.RSetRx;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.codec.Codec;
/**
@ -30,21 +31,26 @@ import org.redisson.client.codec.Codec;
*/
public class RedissonSetMultimapRx<K, V> {
private CommandRxExecutor commandExecutor;
private RedissonListMultimap<K, V> instance;
private final RedissonRxClient redisson;
private final CommandRxExecutor commandExecutor;
private final RedissonListMultimap<K, V> instance;
public RedissonSetMultimapRx(CommandRxExecutor commandExecutor, String name) {
public RedissonSetMultimapRx(CommandRxExecutor commandExecutor, String name, RedissonRxClient redisson) {
this.instance = new RedissonListMultimap<K, V>(commandExecutor, name);
this.redisson = redisson;
this.commandExecutor = commandExecutor;
}
public RedissonSetMultimapRx(Codec codec, CommandRxExecutor commandExecutor, String name) {
public RedissonSetMultimapRx(Codec codec, CommandRxExecutor commandExecutor, String name, RedissonRxClient redisson) {
this.instance = new RedissonListMultimap<K, V>(codec, commandExecutor, name);
this.redisson = redisson;
this.commandExecutor = commandExecutor;
}
public RSetReactive<V> get(K key) {
public RSetRx<V> get(K key) {
RedissonSet<V> set = (RedissonSet<V>) ((RSetMultimap<K, V>)instance).get(key);
return RxProxyBuilder.create(commandExecutor, set,
new RedissonSetRx<V>(set), RSetReactive.class);
new RedissonSetRx<V>(set, redisson), RSetRx.class);
}
}

@ -18,7 +18,12 @@ package org.redisson.rx;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
import org.redisson.api.RFuture;
import org.redisson.api.RLockRx;
import org.redisson.api.RPermitExpirableSemaphoreRx;
import org.redisson.api.RReadWriteLockRx;
import org.redisson.api.RSemaphoreRx;
import org.redisson.api.RSet;
import org.redisson.api.RedissonRxClient;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
@ -34,9 +39,11 @@ import io.reactivex.Flowable;
public class RedissonSetRx<V> {
private final RSet<V> instance;
private final RedissonRxClient redisson;
public RedissonSetRx(RSet<V> instance) {
public RedissonSetRx(RSet<V> instance, RedissonRxClient redisson) {
this.instance = instance;
this.redisson = redisson;
}
public Flowable<Boolean> addAll(Publisher<? extends V> c) {
@ -60,7 +67,7 @@ public class RedissonSetRx<V> {
return new SetRxIterator<V>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
return ((RedissonSet)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, pattern, count);
return ((RedissonSet<V>)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, pattern, count);
}
}.create();
}
@ -69,4 +76,29 @@ public class RedissonSetRx<V> {
return iterator(null, 10);
}
public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
}
public RSemaphoreRx getSemaphore(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "semaphore");
return redisson.getSemaphore(name);
}
public RLockRx getFairLock(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "fairlock");
return redisson.getFairLock(name);
}
public RReadWriteLockRx getReadWriteLock(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "rw_lock");
return redisson.getReadWriteLock(name);
}
public RLockRx getLock(V value) {
String name = ((RedissonSet<V>)instance).getLockName(value, "lock");
return redisson.getLock(name);
}
}

@ -65,14 +65,14 @@ public class RedissonTransactionRx implements RTransactionRx {
public <K, V> RMapRx<K, V> getMap(String name) {
RMap<K, V> map = transaction.<K, V>getMap(name);
return RxProxyBuilder.create(executorService, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
new RedissonMapRx<K, V>(map, null), RMapRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, Codec codec) {
RMap<K, V> map = transaction.<K, V>getMap(name, codec);
return RxProxyBuilder.create(executorService, map,
new RedissonMapRx<K, V>(map), RMapRx.class);
new RedissonMapRx<K, V>(map, null), RMapRx.class);
}
@Override
@ -93,28 +93,28 @@ public class RedissonTransactionRx implements RTransactionRx {
public <V> RSetRx<V> getSet(String name) {
RSet<V> set = transaction.<V>getSet(name);
return RxProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(set), RSetRx.class);
new RedissonSetReactive<V>(set, null), RSetRx.class);
}
@Override
public <V> RSetRx<V> getSet(String name, Codec codec) {
RSet<V> set = transaction.<V>getSet(name, codec);
return RxProxyBuilder.create(executorService, set,
new RedissonSetRx<V>(set), RSetRx.class);
new RedissonSetRx<V>(set, null), RSetRx.class);
}
@Override
public <V> RSetCacheRx<V> getSetCache(String name) {
RSetCache<V> set = transaction.<V>getSetCache(name);
return RxProxyBuilder.create(executorService, set,
new RedissonSetCacheRx<V>(set), RSetCacheRx.class);
new RedissonSetCacheRx<V>(set, null), RSetCacheRx.class);
}
@Override
public <V> RSetCacheRx<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = transaction.<V>getSetCache(name, codec);
return RxProxyBuilder.create(executorService, set,
new RedissonSetCacheRx<V>(set), RSetCacheRx.class);
new RedissonSetCacheRx<V>(set, null), RSetCacheRx.class);
}
@Override

@ -24,10 +24,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedissonMap;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
@ -272,6 +275,26 @@ public class RedissonTransactionalMap<K, V> extends RedissonMap<K, V> {
throw new UnsupportedOperationException("loadAll method is not supported in transaction");
}
@Override
public RLock getFairLock(K key) {
throw new UnsupportedOperationException("getFairLock method is not supported in transaction");
}
@Override
public RCountDownLatch getCountDownLatch(K key) {
throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction");
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) {
throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction");
}
@Override
public RSemaphore getSemaphore(K key) {
throw new UnsupportedOperationException("getSemaphore method is not supported in transaction");
}
@Override
public RLock getLock(K key) {
throw new UnsupportedOperationException("getLock method is not supported in transaction");

@ -24,9 +24,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedissonMapCache;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.mapreduce.RMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
@ -298,6 +301,26 @@ public class RedissonTransactionalMapCache<K, V> extends RedissonMapCache<K, V>
throw new UnsupportedOperationException("loadAll method is not supported in transaction");
}
@Override
public RLock getFairLock(K key) {
throw new UnsupportedOperationException("getFairLock method is not supported in transaction");
}
@Override
public RCountDownLatch getCountDownLatch(K key) {
throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction");
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(K key) {
throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction");
}
@Override
public RSemaphore getSemaphore(K key) {
throw new UnsupportedOperationException("getSemaphore method is not supported in transaction");
}
@Override
public RLock getLock(K key) {
throw new UnsupportedOperationException("getLock method is not supported in transaction");

@ -23,7 +23,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.redisson.RedissonSet;
import org.redisson.api.RCountDownLatch;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RPermitExpirableSemaphore;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RSemaphore;
import org.redisson.api.SortOrder;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
@ -99,6 +104,36 @@ public class RedissonTransactionalSet<V> extends RedissonSet<V> {
checkState();
return transactionalSet.scanIterator(name, client, startPos, pattern, count);
}
@Override
public RLock getFairLock(V value) {
throw new UnsupportedOperationException("getFairLock method is not supported in transaction");
}
@Override
public RCountDownLatch getCountDownLatch(V value) {
throw new UnsupportedOperationException("getCountDownLatch method is not supported in transaction");
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(V value) {
throw new UnsupportedOperationException("getPermitExpirableSemaphore method is not supported in transaction");
}
@Override
public RSemaphore getSemaphore(V value) {
throw new UnsupportedOperationException("getSemaphore method is not supported in transaction");
}
@Override
public RLock getLock(V value) {
throw new UnsupportedOperationException("getLock method is not supported in transaction");
}
@Override
public RReadWriteLock getReadWriteLock(V value) {
throw new UnsupportedOperationException("getReadWriteLock method is not supported in transaction");
}
@Override
public RFuture<Boolean> containsAsync(Object o) {

@ -77,7 +77,7 @@ public class TransactionalSet<V> extends BaseTransactionalSet<V> {
@Override
protected RLock getLock(RCollectionAsync<V> set, V value) {
String lockName = ((RedissonSet<V>)set).getLockName(value);
String lockName = ((RedissonSet<V>)set).getLockName(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
}

@ -82,7 +82,7 @@ public class TransactionalSetCache<V> extends BaseTransactionalSet<V> {
@Override
protected RLock getLock(RCollectionAsync<V> set, V value) {
String lockName = ((RedissonSetCache<V>)set).getLockName(value);
String lockName = ((RedissonSetCache<V>)set).getLockName(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
}

@ -40,12 +40,12 @@ public abstract class SetOperation extends TransactionalOperation {
}
protected RLock getLock(RSetCache<?> setCache, CommandAsyncExecutor commandExecutor, Object value) {
String lockName = ((RedissonSetCache<?>)setCache).getLockName(value);
String lockName = ((RedissonSetCache<?>)setCache).getLockName(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
}
protected RLock getLock(RSet<?> setCache, CommandAsyncExecutor commandExecutor, Object value) {
String lockName = ((RedissonSet<?>)setCache).getLockName(value);
String lockName = ((RedissonSet<?>)setCache).getLockName(value, "lock");
return new RedissonTransactionalLock(commandExecutor, lockName, transactionId);
}

Loading…
Cancel
Save