Fixed - RMapCacheRx.getLock() returns org.redisson.RedissonLock instead of org.redisson.api.RLockRx #3389

pull/3417/head
Nikita Koksharov 4 years ago
parent 6066ea992e
commit 7fdbb44158

@ -143,16 +143,16 @@ public class RedissonRx implements RedissonRxClient {
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, null, null);
RMap<K, V> map = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
new RedissonMapCacheRx<K, V>(map, commandExecutor), RMapCacheRx.class);
}
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
new RedissonMapCacheRx<K, V>(map, commandExecutor), RMapCacheRx.class);
}
@Override
@ -264,14 +264,14 @@ public class RedissonRx implements RedissonRxClient {
public <K, V> RMapRx<K, V> getMap(String name) {
RedissonMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map, this), RMapRx.class);
new RedissonMapRx<K, V>(map, commandExecutor), RMapRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, Codec codec) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, null, null);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map, this), RMapRx.class);
new RedissonMapRx<K, V>(map, commandExecutor), RMapRx.class);
}
@Override
@ -528,30 +528,30 @@ public class RedissonRx implements RedissonRxClient {
public <K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(codec, evictionScheduler, commandExecutor, name, null, options, writeBehindService);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
new RedissonMapCacheRx<K, V>(map, commandExecutor), RMapCacheRx.class);
}
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name, MapOptions<K, V> options) {
RedissonMapCache<K, V> map = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, options, writeBehindService);
RMap<K, V> map = new RedissonMapCache<K, V>(evictionScheduler, commandExecutor, name, null, options, writeBehindService);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
new RedissonMapCacheRx<K, V>(map, commandExecutor), RMapCacheRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, MapOptions<K, V> options) {
RedissonMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, options, writeBehindService);
RMap<K, V> map = new RedissonMap<K, V>(commandExecutor, name, null, options, writeBehindService);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map, this), RMapRx.class);
new RedissonMapRx<K, V>(map, commandExecutor), RMapRx.class);
}
@Override
public <K, V> RMapRx<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
RedissonMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, options, writeBehindService);
RMap<K, V> map = new RedissonMap<K, V>(codec, commandExecutor, name, null, options, writeBehindService);
return RxProxyBuilder.create(commandExecutor, map,
new RedissonMapRx<K, V>(map, this), RMapRx.class);
new RedissonMapRx<K, V>(map, commandExecutor), RMapRx.class);
}
@Override

@ -100,15 +100,15 @@ public class RedissonBatchRx implements RBatchRx {
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec) {
RMapCache<K, V> map = new RedissonMapCache<K, V>(codec, evictionScheduler, executorService, name, null, null, null);
return RxProxyBuilder.create(executorService, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
return RxProxyBuilder.create(executorService, map,
new RedissonMapCacheRx<>(map, commandExecutor), RMapCacheRx.class);
}
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name) {
RMapCache<K, V> map = new RedissonMapCache<K, V>(evictionScheduler, executorService, name, null, null, null);
return RxProxyBuilder.create(executorService, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
return RxProxyBuilder.create(executorService, map,
new RedissonMapCacheRx<>(map, commandExecutor), RMapCacheRx.class);
}
@Override

@ -15,12 +15,7 @@
*/
package org.redisson.rx;
import java.util.Map;
import java.util.Map.Entry;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMapCache;
import org.redisson.api.RMapCache;
import org.redisson.api.RMap;
/**
*
@ -29,70 +24,9 @@ import org.redisson.api.RMapCache;
* @param <K> key
* @param <V> value
*/
public class RedissonMapCacheRx<K, V> {
private final RedissonMapCache<K, V> instance;
public RedissonMapCacheRx(RMapCache<K, V> instance) {
this.instance = (RedissonMapCache<K, V>) instance;
}
public Publisher<Map.Entry<K, V>> entryIterator() {
return entryIterator(null);
}
public Publisher<Map.Entry<K, V>> entryIterator(int count) {
return entryIterator(null, count);
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern) {
return entryIterator(pattern, 10);
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, Map.Entry<K, V>>(instance, pattern, count).create();
}
public class RedissonMapCacheRx<K, V> extends RedissonMapRx<K, V> {
public Publisher<V> valueIterator() {
return valueIterator(null);
}
public Publisher<V> valueIterator(String pattern) {
return valueIterator(pattern, 10);
public RedissonMapCacheRx(RMap<K, V> instance, CommandRxExecutor executor) {
super(instance, executor);
}
public Publisher<V> valueIterator(int count) {
return valueIterator(null, count);
}
public Publisher<V> valueIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, V>(instance, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
}
}.create();
}
public Publisher<K> keyIterator() {
return keyIterator(null);
}
public Publisher<K> keyIterator(String pattern) {
return keyIterator(pattern, 10);
}
public Publisher<K> keyIterator(int count) {
return keyIterator(null, count);
}
public Publisher<K> keyIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, K>(instance, pattern, count) {
@Override
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
}
}.create();
}
}

@ -21,12 +21,8 @@ import java.util.Map.Entry;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
import org.redisson.RedissonRx;
import org.redisson.api.RLockRx;
import org.redisson.api.RMap;
import org.redisson.api.RPermitExpirableSemaphoreRx;
import org.redisson.api.RReadWriteLockRx;
import org.redisson.api.RSemaphoreRx;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.*;
import org.redisson.reactive.ReactiveProxyBuilder;
/**
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
@ -39,12 +35,12 @@ import org.redisson.api.RedissonRxClient;
*/
public class RedissonMapRx<K, V> {
private final RedissonMap<K, V> instance;
private final RedissonRxClient redisson;
private final RMap<K, V> instance;
private final CommandRxExecutor executor;
public RedissonMapRx(RMap<K, V> instance, RedissonRx redisson) {
this.instance = (RedissonMap<K, V>) instance;
this.redisson = redisson;
public RedissonMapRx(RMap<K, V> instance, CommandRxExecutor executor) {
this.instance = instance;
this.executor = executor;
}
public Publisher<Map.Entry<K, V>> entryIterator() {
@ -60,7 +56,7 @@ public class RedissonMapRx<K, V> {
}
public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, Map.Entry<K, V>>(instance, pattern, count).create();
return new RedissonMapRxIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) instance, pattern, count).create();
}
public Publisher<V> valueIterator() {
@ -76,7 +72,7 @@ public class RedissonMapRx<K, V> {
}
public Publisher<V> valueIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, V>(instance, pattern, count) {
return new RedissonMapRxIterator<K, V, V>((RedissonMap<K, V>) instance, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
@ -97,7 +93,7 @@ public class RedissonMapRx<K, V> {
}
public Publisher<K> keyIterator(String pattern, int count) {
return new RedissonMapRxIterator<K, V, K>(instance, pattern, count) {
return new RedissonMapRxIterator<K, V, K>((RedissonMap<K, V>) instance, pattern, count) {
@Override
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
@ -106,28 +102,28 @@ public class RedissonMapRx<K, V> {
}
public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(K key) {
String name = ((RedissonMap<K, V>) instance).getLockByMapKey(key, "permitexpirablesemaphore");
return redisson.getPermitExpirableSemaphore(name);
RPermitExpirableSemaphore s = instance.getPermitExpirableSemaphore(key);
return RxProxyBuilder.create(executor, s, RPermitExpirableSemaphoreRx.class);
}
public RSemaphoreRx getSemaphore(K key) {
String name = ((RedissonMap<K, V>) instance).getLockByMapKey(key, "semaphore");
return redisson.getSemaphore(name);
RSemaphore s = instance.getSemaphore(key);
return RxProxyBuilder.create(executor, s, RSemaphoreRx.class);
}
public RLockRx getFairLock(K key) {
String name = ((RedissonMap<K, V>) instance).getLockByMapKey(key, "fairlock");
return redisson.getFairLock(name);
RLock lock = instance.getFairLock(key);
return RxProxyBuilder.create(executor, lock, RLockRx.class);
}
public RReadWriteLockRx getReadWriteLock(K key) {
String name = ((RedissonMap<K, V>) instance).getLockByMapKey(key, "rw_lock");
return redisson.getReadWriteLock(name);
RReadWriteLock lock = instance.getReadWriteLock(key);
return RxProxyBuilder.create(executor, lock, RReadWriteLockRx.class);
}
public RLockRx getLock(K key) {
String name = ((RedissonMap<K, V>) instance).getLockByMapKey(key, "lock");
return redisson.getLock(name);
RLock lock = instance.getLock(key);
return RxProxyBuilder.create(executor, lock, RLockRx.class);
}
}

@ -76,14 +76,14 @@ public class RedissonTransactionRx implements RTransactionRx {
public <K, V> RMapCacheRx<K, V> getMapCache(String name, Codec codec) {
RMapCache<K, V> map = transaction.<K, V>getMapCache(name, codec);
return RxProxyBuilder.create(executorService, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
new RedissonMapCacheRx<K, V>(map, executorService), RMapCacheRx.class);
}
@Override
public <K, V> RMapCacheRx<K, V> getMapCache(String name) {
RMapCache<K, V> map = transaction.<K, V>getMapCache(name);
return RxProxyBuilder.create(executorService, map,
new RedissonMapCacheRx<K, V>(map), RMapCacheRx.class);
new RedissonMapCacheRx<K, V>(map, executorService), RMapCacheRx.class);
}
@Override

Loading…
Cancel
Save