diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index 1104409c6..6f7d08be0 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -143,16 +143,16 @@ public class RedissonRx implements RedissonRxClient { @Override public RMapCacheRx getMapCache(String name, Codec codec) { - RedissonMapCache map = new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, null, null); + RMap map = new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, null, null); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapCacheRx(map), RMapCacheRx.class); + new RedissonMapCacheRx(map, commandExecutor), RMapCacheRx.class); } @Override public RMapCacheRx getMapCache(String name) { RedissonMapCache map = new RedissonMapCache(evictionScheduler, commandExecutor, name, null, null, null); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapCacheRx(map), RMapCacheRx.class); + new RedissonMapCacheRx(map, commandExecutor), RMapCacheRx.class); } @Override @@ -264,14 +264,14 @@ public class RedissonRx implements RedissonRxClient { public RMapRx getMap(String name) { RedissonMap map = new RedissonMap(commandExecutor, name, null, null, null); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map, this), RMapRx.class); + new RedissonMapRx(map, commandExecutor), RMapRx.class); } @Override public RMapRx getMap(String name, Codec codec) { RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, null, null); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map, this), RMapRx.class); + new RedissonMapRx(map, commandExecutor), RMapRx.class); } @Override @@ -528,30 +528,30 @@ public class RedissonRx implements RedissonRxClient { public RMapCacheRx getMapCache(String name, Codec codec, MapOptions options) { RedissonMapCache map = new RedissonMapCache(codec, evictionScheduler, commandExecutor, name, null, options, writeBehindService); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapCacheRx(map), RMapCacheRx.class); + new RedissonMapCacheRx(map, commandExecutor), RMapCacheRx.class); } @Override public RMapCacheRx getMapCache(String name, MapOptions options) { - RedissonMapCache map = new RedissonMapCache(evictionScheduler, commandExecutor, name, null, options, writeBehindService); + RMap map = new RedissonMapCache(evictionScheduler, commandExecutor, name, null, options, writeBehindService); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapCacheRx(map), RMapCacheRx.class); + new RedissonMapCacheRx(map, commandExecutor), RMapCacheRx.class); } @Override public RMapRx getMap(String name, MapOptions options) { - RedissonMap map = new RedissonMap(commandExecutor, name, null, options, writeBehindService); + RMap map = new RedissonMap(commandExecutor, name, null, options, writeBehindService); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map, this), RMapRx.class); + new RedissonMapRx(map, commandExecutor), RMapRx.class); } @Override public RMapRx getMap(String name, Codec codec, MapOptions options) { - RedissonMap map = new RedissonMap(codec, commandExecutor, name, null, options, writeBehindService); + RMap map = new RedissonMap(codec, commandExecutor, name, null, options, writeBehindService); return RxProxyBuilder.create(commandExecutor, map, - new RedissonMapRx(map, this), RMapRx.class); + new RedissonMapRx(map, commandExecutor), RMapRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java index 44c5c95ee..d53b9fa96 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBatchRx.java @@ -100,15 +100,15 @@ public class RedissonBatchRx implements RBatchRx { @Override public RMapCacheRx getMapCache(String name, Codec codec) { RMapCache map = new RedissonMapCache(codec, evictionScheduler, executorService, name, null, null, null); - return RxProxyBuilder.create(executorService, map, - new RedissonMapCacheRx(map), RMapCacheRx.class); + return RxProxyBuilder.create(executorService, map, + new RedissonMapCacheRx<>(map, commandExecutor), RMapCacheRx.class); } @Override public RMapCacheRx getMapCache(String name) { RMapCache map = new RedissonMapCache(evictionScheduler, executorService, name, null, null, null); - return RxProxyBuilder.create(executorService, map, - new RedissonMapCacheRx(map), RMapCacheRx.class); + return RxProxyBuilder.create(executorService, map, + new RedissonMapCacheRx<>(map, commandExecutor), RMapCacheRx.class); } @Override diff --git a/redisson/src/main/java/org/redisson/rx/RedissonMapCacheRx.java b/redisson/src/main/java/org/redisson/rx/RedissonMapCacheRx.java index dc484f533..8d9c32a31 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonMapCacheRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonMapCacheRx.java @@ -15,12 +15,7 @@ */ package org.redisson.rx; -import java.util.Map; -import java.util.Map.Entry; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonMapCache; -import org.redisson.api.RMapCache; +import org.redisson.api.RMap; /** * @@ -29,70 +24,9 @@ import org.redisson.api.RMapCache; * @param key * @param value */ -public class RedissonMapCacheRx { - - private final RedissonMapCache instance; - - public RedissonMapCacheRx(RMapCache instance) { - this.instance = (RedissonMapCache) instance; - } - - public Publisher> entryIterator() { - return entryIterator(null); - } - - public Publisher> entryIterator(int count) { - return entryIterator(null, count); - } - - public Publisher> entryIterator(String pattern) { - return entryIterator(pattern, 10); - } - - public Publisher> entryIterator(String pattern, int count) { - return new RedissonMapRxIterator>(instance, pattern, count).create(); - } +public class RedissonMapCacheRx extends RedissonMapRx { - public Publisher valueIterator() { - return valueIterator(null); - } - - public Publisher valueIterator(String pattern) { - return valueIterator(pattern, 10); + public RedissonMapCacheRx(RMap instance, CommandRxExecutor executor) { + super(instance, executor); } - - public Publisher valueIterator(int count) { - return valueIterator(null, count); - } - - public Publisher valueIterator(String pattern, int count) { - return new RedissonMapRxIterator(instance, pattern, count) { - @Override - V getValue(Entry entry) { - return (V) entry.getValue(); - } - }.create(); - } - - public Publisher keyIterator() { - return keyIterator(null); - } - - public Publisher keyIterator(String pattern) { - return keyIterator(pattern, 10); - } - - public Publisher keyIterator(int count) { - return keyIterator(null, count); - } - - public Publisher keyIterator(String pattern, int count) { - return new RedissonMapRxIterator(instance, pattern, count) { - @Override - K getValue(Entry entry) { - return (K) entry.getKey(); - } - }.create(); - } - } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java b/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java index e6338ca0b..c3bcb9d0f 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonMapRx.java @@ -21,12 +21,8 @@ import java.util.Map.Entry; import org.reactivestreams.Publisher; import org.redisson.RedissonMap; import org.redisson.RedissonRx; -import org.redisson.api.RLockRx; -import org.redisson.api.RMap; -import org.redisson.api.RPermitExpirableSemaphoreRx; -import org.redisson.api.RReadWriteLockRx; -import org.redisson.api.RSemaphoreRx; -import org.redisson.api.RedissonRxClient; +import org.redisson.api.*; +import org.redisson.reactive.ReactiveProxyBuilder; /** * Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap} @@ -39,12 +35,12 @@ import org.redisson.api.RedissonRxClient; */ public class RedissonMapRx { - private final RedissonMap instance; - private final RedissonRxClient redisson; + private final RMap instance; + private final CommandRxExecutor executor; - public RedissonMapRx(RMap instance, RedissonRx redisson) { - this.instance = (RedissonMap) instance; - this.redisson = redisson; + public RedissonMapRx(RMap instance, CommandRxExecutor executor) { + this.instance = instance; + this.executor = executor; } public Publisher> entryIterator() { @@ -60,7 +56,7 @@ public class RedissonMapRx { } public Publisher> entryIterator(String pattern, int count) { - return new RedissonMapRxIterator>(instance, pattern, count).create(); + return new RedissonMapRxIterator>((RedissonMap) instance, pattern, count).create(); } public Publisher valueIterator() { @@ -76,7 +72,7 @@ public class RedissonMapRx { } public Publisher valueIterator(String pattern, int count) { - return new RedissonMapRxIterator(instance, pattern, count) { + return new RedissonMapRxIterator((RedissonMap) instance, pattern, count) { @Override V getValue(Entry entry) { return (V) entry.getValue(); @@ -97,7 +93,7 @@ public class RedissonMapRx { } public Publisher keyIterator(String pattern, int count) { - return new RedissonMapRxIterator(instance, pattern, count) { + return new RedissonMapRxIterator((RedissonMap) instance, pattern, count) { @Override K getValue(Entry entry) { return (K) entry.getKey(); @@ -106,28 +102,28 @@ public class RedissonMapRx { } public RPermitExpirableSemaphoreRx getPermitExpirableSemaphore(K key) { - String name = ((RedissonMap) instance).getLockByMapKey(key, "permitexpirablesemaphore"); - return redisson.getPermitExpirableSemaphore(name); + RPermitExpirableSemaphore s = instance.getPermitExpirableSemaphore(key); + return RxProxyBuilder.create(executor, s, RPermitExpirableSemaphoreRx.class); } public RSemaphoreRx getSemaphore(K key) { - String name = ((RedissonMap) instance).getLockByMapKey(key, "semaphore"); - return redisson.getSemaphore(name); + RSemaphore s = instance.getSemaphore(key); + return RxProxyBuilder.create(executor, s, RSemaphoreRx.class); } public RLockRx getFairLock(K key) { - String name = ((RedissonMap) instance).getLockByMapKey(key, "fairlock"); - return redisson.getFairLock(name); + RLock lock = instance.getFairLock(key); + return RxProxyBuilder.create(executor, lock, RLockRx.class); } public RReadWriteLockRx getReadWriteLock(K key) { - String name = ((RedissonMap) instance).getLockByMapKey(key, "rw_lock"); - return redisson.getReadWriteLock(name); + RReadWriteLock lock = instance.getReadWriteLock(key); + return RxProxyBuilder.create(executor, lock, RReadWriteLockRx.class); } public RLockRx getLock(K key) { - String name = ((RedissonMap) instance).getLockByMapKey(key, "lock"); - return redisson.getLock(name); + RLock lock = instance.getLock(key); + return RxProxyBuilder.create(executor, lock, RLockRx.class); } } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java b/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java index 50df4e75a..68be6eb6b 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonTransactionRx.java @@ -76,14 +76,14 @@ public class RedissonTransactionRx implements RTransactionRx { public RMapCacheRx getMapCache(String name, Codec codec) { RMapCache map = transaction.getMapCache(name, codec); return RxProxyBuilder.create(executorService, map, - new RedissonMapCacheRx(map), RMapCacheRx.class); + new RedissonMapCacheRx(map, executorService), RMapCacheRx.class); } @Override public RMapCacheRx getMapCache(String name) { RMapCache map = transaction.getMapCache(name); return RxProxyBuilder.create(executorService, map, - new RedissonMapCacheRx(map), RMapCacheRx.class); + new RedissonMapCacheRx(map, executorService), RMapCacheRx.class); } @Override