From 482cede42e173624c84f2cbf6ad397b08ee1fbe4 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 22 Sep 2018 17:41:53 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/RedissonKeys.java | 10 +- .../java/org/redisson/RedissonReactive.java | 33 +-- .../java/org/redisson/RedissonReference.java | 80 ++++--- .../java/org/redisson/api/RBatchReactive.java | 26 ++ .../org/redisson/api/RBitSetReactive.java | 2 - .../codec/DefaultReferenceCodecProvider.java | 41 ---- .../codec/ReferenceCodecProvider.java | 67 ------ .../command/CommandReactiveExecutor.java | 2 - .../command/CommandReactiveService.java | 11 - .../core/RedissonObjectBuilder.java | 2 +- .../redisson/misc/RedissonObjectFactory.java | 36 +-- .../reactive/NettyFuturePublisher.java | 30 ++- .../RedissonAtomicDoubleReactive.java | 147 ------------ .../reactive/RedissonAtomicLongReactive.java | 145 ----------- .../reactive/RedissonBatchReactive.java | 31 ++- .../reactive/RedissonBitSetReactive.java | 226 ------------------ .../reactive/RedissonBucketReactive.java | 148 ------------ .../reactive/RedissonHyperLogLogReactive.java | 104 -------- .../reactive/RedissonKeysReactive.java | 46 ++-- .../reactive/RedissonLockReactive.java | 167 ------------- ...issonPermitExpirableSemaphoreReactive.java | 161 ------------- .../reactive/RedissonRateLimiterReactive.java | 120 ---------- .../RedissonReadWriteLockReactive.java | 4 +- .../reactive/RedissonSemaphoreReactive.java | 144 ----------- .../reactive/RedissonTransactionReactive.java | 4 +- .../redisson/RedissonBitSetReactiveTest.java | 13 - .../RedissonReferenceReactiveTest.java | 29 ++- 27 files changed, 190 insertions(+), 1639 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java delete mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 964c518bb..1ea390084 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -111,13 +111,11 @@ public class RedissonKeys implements RKeys { return getKeysByPattern(null, count); } - private ListScanResult scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) { + public RFuture> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) { if (pattern == null) { - RFuture> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); - return commandExecutor.get(f); + return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); } - RFuture> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); - return commandExecutor.get(f); + return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); } private Iterator createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) { @@ -125,7 +123,7 @@ public class RedissonKeys implements RKeys { @Override protected ListScanResult iterator(RedisClient client, long nextIterPos) { - return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count); + return commandExecutor.get(RedissonKeys.this.scanIteratorAsync(client, entry, nextIterPos, pattern, count)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 5eb25909f..1d44e0c23 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -66,30 +66,21 @@ import org.redisson.connection.ConnectionManager; import org.redisson.eviction.EvictionScheduler; import org.redisson.pubsub.SemaphorePubSub; import org.redisson.reactive.ReactiveProxyBuilder; -import org.redisson.reactive.RedissonAtomicDoubleReactive; -import org.redisson.reactive.RedissonAtomicLongReactive; import org.redisson.reactive.RedissonBatchReactive; -import org.redisson.reactive.RedissonBitSetReactive; import org.redisson.reactive.RedissonBlockingDequeReactive; import org.redisson.reactive.RedissonBlockingQueueReactive; -import org.redisson.reactive.RedissonBucketReactive; import org.redisson.reactive.RedissonDequeReactive; import org.redisson.reactive.RedissonGeoReactive; -import org.redisson.reactive.RedissonHyperLogLogReactive; import org.redisson.reactive.RedissonKeysReactive; import org.redisson.reactive.RedissonLexSortedSetReactive; import org.redisson.reactive.RedissonListMultimapReactive; import org.redisson.reactive.RedissonListReactive; -import org.redisson.reactive.RedissonLockReactive; import org.redisson.reactive.RedissonMapCacheReactive; import org.redisson.reactive.RedissonMapReactive; import org.redisson.reactive.RedissonPatternTopicReactive; -import org.redisson.reactive.RedissonPermitExpirableSemaphoreReactive; import org.redisson.reactive.RedissonQueueReactive; -import org.redisson.reactive.RedissonRateLimiterReactive; import org.redisson.reactive.RedissonReadWriteLockReactive; import org.redisson.reactive.RedissonScoredSortedSetReactive; -import org.redisson.reactive.RedissonSemaphoreReactive; import org.redisson.reactive.RedissonSetCacheReactive; import org.redisson.reactive.RedissonSetMultimapReactive; import org.redisson.reactive.RedissonSetReactive; @@ -145,22 +136,22 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RLockReactive getFairLock(String name) { - return new RedissonLockReactive(commandExecutor, name, new RedissonFairLock(commandExecutor, name)); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonFairLock(commandExecutor, name), RLockReactive.class); } @Override public RRateLimiterReactive getRateLimiter(String name) { - return new RedissonRateLimiterReactive(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonRateLimiter(commandExecutor, name), RRateLimiterReactive.class); } @Override public RSemaphoreReactive getSemaphore(String name) { - return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonSemaphore(commandExecutor, name, semaphorePubSub), RSemaphoreReactive.class); } @Override public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name) { - return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub), RPermitExpirableSemaphoreReactive.class); } @Override @@ -170,7 +161,7 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RLockReactive getLock(String name) { - return new RedissonLockReactive(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonLock(commandExecutor, name), RLockReactive.class); } @Override @@ -185,12 +176,12 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBucketReactive getBucket(String name) { - return new RedissonBucketReactive(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonBucket(commandExecutor, name), RBucketReactive.class); } @Override public RBucketReactive getBucket(String name, Codec codec) { - return new RedissonBucketReactive(codec, commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonBucket(codec, commandExecutor, name), RBucketReactive.class); } @Override @@ -211,12 +202,12 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RHyperLogLogReactive getHyperLogLog(String name) { - return new RedissonHyperLogLogReactive(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonHyperLogLog(commandExecutor, name), RHyperLogLogReactive.class); } @Override public RHyperLogLogReactive getHyperLogLog(String name, Codec codec) { - return new RedissonHyperLogLogReactive(codec, commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonHyperLogLog(codec, commandExecutor, name), RHyperLogLogReactive.class); } @Override @@ -346,17 +337,17 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RAtomicLongReactive getAtomicLong(String name) { - return new RedissonAtomicLongReactive(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicLong(commandExecutor, name), RAtomicLongReactive.class); } @Override public RAtomicDoubleReactive getAtomicDouble(String name) { - return new RedissonAtomicDoubleReactive(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(commandExecutor, name), RAtomicDoubleReactive.class); } @Override public RBitSetReactive getBitSet(String name) { - return new RedissonBitSetReactive(commandExecutor, name); + return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(commandExecutor, name), RBitSetReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonReference.java b/redisson/src/main/java/org/redisson/RedissonReference.java index 898a11cd5..e415cc87d 100644 --- a/redisson/src/main/java/org/redisson/RedissonReference.java +++ b/redisson/src/main/java/org/redisson/RedissonReference.java @@ -16,26 +16,41 @@ package org.redisson; import java.io.Serializable; -import org.redisson.client.codec.Codec; + +import org.redisson.api.RAtomicLong; +import org.redisson.api.RAtomicLongReactive; +import org.redisson.api.RBitSet; +import org.redisson.api.RBitSetReactive; +import org.redisson.api.RBlockingQueue; +import org.redisson.api.RBlockingQueueReactive; +import org.redisson.api.RBucket; +import org.redisson.api.RBucketReactive; +import org.redisson.api.RDeque; +import org.redisson.api.RDequeReactive; +import org.redisson.api.RHyperLogLog; +import org.redisson.api.RHyperLogLogReactive; +import org.redisson.api.RLexSortedSet; +import org.redisson.api.RLexSortedSetReactive; +import org.redisson.api.RList; +import org.redisson.api.RListReactive; +import org.redisson.api.RMap; +import org.redisson.api.RMapCache; +import org.redisson.api.RMapCacheReactive; +import org.redisson.api.RMapReactive; import org.redisson.api.RObject; import org.redisson.api.RObjectReactive; +import org.redisson.api.RQueue; +import org.redisson.api.RQueueReactive; +import org.redisson.api.RScoredSortedSet; +import org.redisson.api.RScoredSortedSetReactive; +import org.redisson.api.RSet; +import org.redisson.api.RSetCache; +import org.redisson.api.RSetCacheReactive; +import org.redisson.api.RSetReactive; import org.redisson.api.annotation.REntity; +import org.redisson.client.codec.Codec; import org.redisson.liveobject.misc.ClassUtils; import org.redisson.misc.BiHashMap; -import org.redisson.reactive.RedissonAtomicLongReactive; -import org.redisson.reactive.RedissonBitSetReactive; -import org.redisson.reactive.RedissonBlockingQueueReactive; -import org.redisson.reactive.RedissonBucketReactive; -import org.redisson.reactive.RedissonDequeReactive; -import org.redisson.reactive.RedissonHyperLogLogReactive; -import org.redisson.reactive.RedissonLexSortedSetReactive; -import org.redisson.reactive.RedissonListReactive; -import org.redisson.reactive.RedissonMapCacheReactive; -import org.redisson.reactive.RedissonMapReactive; -import org.redisson.reactive.RedissonQueueReactive; -import org.redisson.reactive.RedissonScoredSortedSetReactive; -import org.redisson.reactive.RedissonSetCacheReactive; -import org.redisson.reactive.RedissonSetReactive; /** * @@ -46,20 +61,20 @@ public class RedissonReference implements Serializable { private static final BiHashMap reactiveMap = new BiHashMap(); static { - reactiveMap.put(RedissonAtomicLongReactive.class.getName(), RedissonAtomicLong.class.getName()); - reactiveMap.put(RedissonBitSetReactive.class.getName(), RedissonBitSet.class.getName()); - reactiveMap.put(RedissonBlockingQueueReactive.class.getName(), RedissonBlockingQueue.class.getName()); - reactiveMap.put(RedissonBucketReactive.class.getName(), RedissonBucket.class.getName()); - reactiveMap.put(RedissonDequeReactive.class.getName(), RedissonDeque.class.getName()); - reactiveMap.put(RedissonHyperLogLogReactive.class.getName(), RedissonHyperLogLog.class.getName()); - reactiveMap.put(RedissonLexSortedSetReactive.class.getName(), RedissonLexSortedSet.class.getName()); - reactiveMap.put(RedissonListReactive.class.getName(), RedissonList.class.getName()); - reactiveMap.put(RedissonMapCacheReactive.class.getName(), RedissonMapCache.class.getName()); - reactiveMap.put(RedissonMapReactive.class.getName(), RedissonMap.class.getName()); - reactiveMap.put(RedissonQueueReactive.class.getName(), RedissonQueue.class.getName()); - reactiveMap.put(RedissonScoredSortedSetReactive.class.getName(), RedissonScoredSortedSet.class.getName()); - reactiveMap.put(RedissonSetCacheReactive.class.getName(), RedissonSetCache.class.getName()); - reactiveMap.put(RedissonSetReactive.class.getName(), RedissonSet.class.getName()); + reactiveMap.put(RAtomicLongReactive.class.getName(), RAtomicLong.class.getName()); + reactiveMap.put(RBitSetReactive.class.getName(), RBitSet.class.getName()); + reactiveMap.put(RBlockingQueueReactive.class.getName(), RBlockingQueue.class.getName()); + reactiveMap.put(RBucketReactive.class.getName(), RBucket.class.getName()); + reactiveMap.put(RDequeReactive.class.getName(), RDeque.class.getName()); + reactiveMap.put(RHyperLogLogReactive.class.getName(), RHyperLogLog.class.getName()); + reactiveMap.put(RLexSortedSetReactive.class.getName(), RLexSortedSet.class.getName()); + reactiveMap.put(RListReactive.class.getName(), RList.class.getName()); + reactiveMap.put(RMapCacheReactive.class.getName(), RMapCache.class.getName()); + reactiveMap.put(RMapReactive.class.getName(), RMap.class.getName()); + reactiveMap.put(RQueueReactive.class.getName(), RQueue.class.getName()); + reactiveMap.put(RScoredSortedSetReactive.class.getName(), RScoredSortedSet.class.getName()); + reactiveMap.put(RSetCacheReactive.class.getName(), RSetCache.class.getName()); + reactiveMap.put(RSetReactive.class.getName(), RSet.class.getName()); reactiveMap.makeImmutable(); } @@ -73,11 +88,11 @@ public class RedissonReference implements Serializable { public RedissonReference() { } - public RedissonReference(Class type, String keyName) { + public RedissonReference(Class type, String keyName) { this(type, keyName, null); } - public RedissonReference(Class type, String keyName, Codec codec) { + public RedissonReference(Class type, String keyName, Codec codec) { if (!ClassUtils.isAnnotationPresent(type, REntity.class) && !RObject.class.isAssignableFrom(type) && !RObjectReactive.class.isAssignableFrom(type)) { throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive"); } @@ -131,7 +146,8 @@ public class RedissonReference implements Serializable { * @param type the type to set */ public void setType(Class type) { - if (!ClassUtils.isAnnotationPresent(type, REntity.class) && (!RObject.class.isAssignableFrom(type) || !RObjectReactive.class.isAssignableFrom(type))) { + if (!ClassUtils.isAnnotationPresent(type, REntity.class) + && (!RObject.class.isAssignableFrom(type) || !RObjectReactive.class.isAssignableFrom(type))) { throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive"); } this.type = type.getName(); diff --git a/redisson/src/main/java/org/redisson/api/RBatchReactive.java b/redisson/src/main/java/org/redisson/api/RBatchReactive.java index 9a26e8494..af2357241 100644 --- a/redisson/src/main/java/org/redisson/api/RBatchReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBatchReactive.java @@ -33,6 +33,32 @@ import org.redisson.client.codec.Codec; */ public interface RBatchReactive { + /** + * Returns stream instance by name + *

+ * Requires Redis 5.0.0 and higher. + * + * @param type of key + * @param type of value + * @param name of stream + * @return RStream object + */ + RStreamReactive getStream(String name); + + /** + * Returns stream instance by name + * using provided codec for entries. + *

+ * Requires Redis 5.0.0 and higher. + * + * @param type of key + * @param type of value + * @param name - name of stream + * @param codec - codec for entry + * @return RStream object + */ + RStreamReactive getStream(String name, Codec codec); + /** * Returns geospatial items holder instance by name. * diff --git a/redisson/src/main/java/org/redisson/api/RBitSetReactive.java b/redisson/src/main/java/org/redisson/api/RBitSetReactive.java index eac201faa..6ed7bd93b 100644 --- a/redisson/src/main/java/org/redisson/api/RBitSetReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBitSetReactive.java @@ -27,8 +27,6 @@ import org.reactivestreams.Publisher; */ public interface RBitSetReactive extends RExpirableReactive { - Publisher asBitSet(); - Publisher toByteArray(); /** diff --git a/redisson/src/main/java/org/redisson/codec/DefaultReferenceCodecProvider.java b/redisson/src/main/java/org/redisson/codec/DefaultReferenceCodecProvider.java index acf57db0f..715571ec1 100644 --- a/redisson/src/main/java/org/redisson/codec/DefaultReferenceCodecProvider.java +++ b/redisson/src/main/java/org/redisson/codec/DefaultReferenceCodecProvider.java @@ -95,45 +95,4 @@ public class DefaultReferenceCodecProvider implements ReferenceCodecProvider { codecCache.putIfAbsent(cls, codec); } - @Override - public void registerCodec(REntity anno, Class cls, T codec) { - if (!cls.isAnnotationPresent(anno.getClass())) { - throw new IllegalArgumentException("Annotation REntity does not present on type [" + cls.getCanonicalName() + "]"); - } - registerCodec((Class) anno.codec(), codec); - } - - @Override - public void registerCodec(RObjectField anno, Class cls, Class rObjectClass, String fieldName, T codec) { - try { - if (!cls.getField(fieldName).isAnnotationPresent(anno.getClass())) { - throw new IllegalArgumentException("Annotation RObjectField does not present on field " + fieldName + " of type [" + cls.getCanonicalName() + "]"); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - if (rObjectClass.isInterface()) { - throw new IllegalArgumentException("Cannot lookup an interface class of RObject [" + rObjectClass.getCanonicalName() + "]. Concrete class only."); - } - registerCodec((Class) anno.codec(), codec); - } - - @Override - public void registerCodec(Class codecClass, Class rObjectClass, T codec) { - if (rObjectClass.isInterface()) { - throw new IllegalArgumentException("Cannot register an interface class of RObject [" + rObjectClass.getCanonicalName() + "]. Concrete class only."); - } - registerCodec((Class) codecClass, codec); - } - - @Override - public void registerCodec(Class codecClass, Class rObjectClass, String name, T codec) { - registerCodec(codecClass, rObjectClass, codec); - } - - @Override - public void registerCodec(Class codecClass, RObject rObject, T codec) { - registerCodec(codecClass, rObject.getClass(), rObject.getName(), codec); - } - } diff --git a/redisson/src/main/java/org/redisson/codec/ReferenceCodecProvider.java b/redisson/src/main/java/org/redisson/codec/ReferenceCodecProvider.java index e6da443c1..3d561a84a 100644 --- a/redisson/src/main/java/org/redisson/codec/ReferenceCodecProvider.java +++ b/redisson/src/main/java/org/redisson/codec/ReferenceCodecProvider.java @@ -109,71 +109,4 @@ public interface ReferenceCodecProvider { */ void registerCodec(Class codecClass, T codec); - /** - * Register a codec by the REntity annotation and the class annotated with - * it. - * - * @param the codec type to register. - * @param anno REntity annotation used on the class. - * @param cls The class that has the REntity annotation. - * @param codec the codec instance. - */ - void registerCodec(REntity anno, Class cls, T codec); - - /** - * Register a codec by the RObjectField annotation, the class annotated with - * REntity, the implementation class of RObject the field is going to - * be transformed into and the name of the field with this RObjectField - * annotation. - * - * @param the codec type to register. - * @param the type of the RObject. - * @param anno RObjectField annotation used on the field. - * @param cls The class that has the REntity annotation. - * @param rObjectClass the implementation class of RObject the field is going - * to be transformed into. - * @param fieldName the name of the field with this RObjectField annotation. - * @param codec the codec instance. - */ - void registerCodec(RObjectField anno, Class cls, Class rObjectClass, String fieldName, T codec); - - /** - * Register a codec by its class or super class and the class of the RObject - * implementation. - * - * @param the codec type to register. - * @param the RObjectField type. - * @param codecClass the codec Class to register it can be a super class of - * the instance. - * @param rObjectClass the class of the RObject implementation. - * @param codec the codec instance. - */ - void registerCodec(Class codecClass, Class rObjectClass, T codec); - - /** - * Register a codec by its class or super class, the class of the RObject - * implementation and the name of RObject (the value returned by - * RObjectField.getName() method). - * - * @param the codec type to register. - * @param the RObjectField type. - * @param codecClass the codec Class to register it can be a super class of - * the instance. - * @param rObjectClass the class of the RObject implementation. - * @param name the name of RObject. - * @param codec the codec instance. - */ - void registerCodec(Class codecClass, Class rObjectClass, String name, T codec); - - /** - * Register a codec by its class or super class and an instance of the - * RObject. - * - * @param the codec type to register. - * @param codecClass the codec Class to register it can be a super class of - * the instance. - * @param rObject instance of the RObject implementation. - * @param codec the codec instance. - */ - void registerCodec(Class codecClass, RObject rObject, T codec); } diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java index 7079db5fc..c27aba5c6 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveExecutor.java @@ -34,8 +34,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor { Publisher reactive(Supplier> supplier); - Publisher writeReactive(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object ... params); - Publisher evalWriteReactive(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); Publisher writeReactive(String key, Codec codec, RedisCommand command, Object ... params); diff --git a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java b/redisson/src/main/java/org/redisson/command/CommandReactiveService.java index 92f5e5f89..785e7f8b7 100644 --- a/redisson/src/main/java/org/redisson/command/CommandReactiveService.java +++ b/redisson/src/main/java/org/redisson/command/CommandReactiveService.java @@ -22,7 +22,6 @@ import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; -import org.redisson.connection.MasterSlaveEntry; import org.redisson.reactive.NettyFuturePublisher; import reactor.fn.Supplier; @@ -52,16 +51,6 @@ public class CommandReactiveService extends CommandAsyncService implements Comma }); } - @Override - public Publisher writeReactive(final MasterSlaveEntry entry, final Codec codec, final RedisCommand command, final Object ... params) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return writeAsync(entry, codec, command, params); - }; - }); - } - @Override public Publisher readReactive(final String key, final Codec codec, final RedisCommand command, final Object ... params) { return reactive(new Supplier>() { diff --git a/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java b/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java index 0d100f424..c55587938 100644 --- a/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java +++ b/redisson/src/main/java/org/redisson/liveobject/core/RedissonObjectBuilder.java @@ -84,7 +84,7 @@ public class RedissonObjectBuilder { public void store(RObject ar, String fieldName, RMap liveMap) { Codec codec = ar.getCodec(); - codecProvider.registerCodec((Class) codec.getClass(), ar, codec); + codecProvider.registerCodec((Class) codec.getClass(), codec); liveMap.fastPut(fieldName, new RedissonReference(ar.getClass(), ar.getName(), codec)); } diff --git a/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java b/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java index a51df6bba..b0bc2096a 100644 --- a/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java +++ b/redisson/src/main/java/org/redisson/misc/RedissonObjectFactory.java @@ -114,14 +114,13 @@ public class RedissonObjectFactory { Object id = ns.resolveId(rr.getKeyName()); return (T) liveObjectService.createLiveObject(type, id); } - List> interfaces = Arrays.asList(type.getInterfaces()); - for (Class iType : interfaces) { - if (builders.containsKey(iType)) {// user cache to speed up things a little. - Method builder = builders.get(iType).get(isDefaultCodec(rr)); - return (T) (isDefaultCodec(rr) - ? builder.invoke(redisson, rr.getKeyName()) - : builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType()))); - } + + RedissonObjectBuilder b = builders.get(type); + if (b != null) { + Method builder = b.get(isDefaultCodec(rr)); + return (T) (isDefaultCodec(rr) + ? builder.invoke(redisson, rr.getKeyName()) + : builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType()))); } } throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName()); @@ -139,14 +138,12 @@ public class RedissonObjectFactory { * Live Object from reference in reactive client is not supported yet. */ if (type != null) { - List> interfaces = Arrays.asList(type.getInterfaces()); - for (Class iType : interfaces) { - if (builders.containsKey(iType)) {// user cache to speed up things a little. - Method builder = builders.get(iType).get(isDefaultCodec(rr)); + RedissonObjectBuilder b = builders.get(type); + if (b != null) { + Method builder = b.get(isDefaultCodec(rr)); return (T) (isDefaultCodec(rr) ? builder.invoke(redisson, rr.getKeyName()) : builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType()))); - } } } throw new ClassNotFoundException("No RObjectReactive is found to match class type of " + rr.getReactiveTypeName()+ " with codec type of " + rr.getCodecName()); @@ -158,14 +155,18 @@ public class RedissonObjectFactory { } if (object instanceof RObject && !(object instanceof RLiveObject)) { + Class clazz = object.getClass().getInterfaces()[0]; + RObject rObject = ((RObject) object); - config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec()); - return new RedissonReference(object.getClass(), ((RObject) object).getName(), ((RObject) object).getCodec()); + config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec()); + return new RedissonReference(clazz, rObject.getName(), rObject.getCodec()); } if (object instanceof RObjectReactive && !(object instanceof RLiveObject)) { + Class clazz = object.getClass().getInterfaces()[0]; + RObjectReactive rObject = ((RObjectReactive) object); - config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec()); - return new RedissonReference(object.getClass(), ((RObjectReactive) object).getName(), ((RObjectReactive) object).getCodec()); + config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec()); + return new RedissonReference(clazz, rObject.getName(), rObject.getCodec()); } try { @@ -179,6 +180,7 @@ public class RedissonObjectFactory { .getFieldsWithAnnotation(rEntity, RId.class) .getOnly().getName(); Class type = ClassUtils.getDeclaredField(rEntity, name).getType(); + return new RedissonReference(rEntity, ns.getName(rEntity, type, name, ((RLiveObject) object).getLiveObjectId())); } diff --git a/redisson/src/main/java/org/redisson/reactive/NettyFuturePublisher.java b/redisson/src/main/java/org/redisson/reactive/NettyFuturePublisher.java index 252f587f1..62566bd2f 100644 --- a/redisson/src/main/java/org/redisson/reactive/NettyFuturePublisher.java +++ b/redisson/src/main/java/org/redisson/reactive/NettyFuturePublisher.java @@ -46,20 +46,24 @@ public class NettyFuturePublisher extends Stream { @Override protected void onRequest(long n) { - supplier.get().addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - onError(future.cause()); - return; + try { + supplier.get().addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + onError(future.cause()); + return; + } + + if (future.getNow() != null) { + onNext(future.getNow()); + } + onComplete(); } - - if (future.getNow() != null) { - onNext(future.getNow()); - } - onComplete(); - } - }); + }); + } catch (Exception e) { + onError(e); + } } }); } catch (Throwable throwable) { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java deleted file mode 100644 index 5c1767890..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonAtomicDouble; -import org.redisson.api.RAtomicDoubleAsync; -import org.redisson.api.RAtomicDoubleReactive; -import org.redisson.api.RFuture; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; - -/** - * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} - * - * @author Nikita Koksharov - * - */ -public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive implements RAtomicDoubleReactive { - - private final RAtomicDoubleAsync instance; - - public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) { - this(commandExecutor, name, new RedissonAtomicDouble(commandExecutor, name)); - } - - public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicDoubleAsync instance) { - super(commandExecutor, name, instance); - this.instance = instance; - } - - - @Override - public Publisher addAndGet(final double delta) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.addAndGetAsync(delta); - } - }); - } - - @Override - public Publisher compareAndSet(final double expect, final double update) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.compareAndSetAsync(expect, update); - } - }); - } - - @Override - public Publisher decrementAndGet() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.decrementAndGetAsync(); - } - }); - } - - @Override - public Publisher get() { - return addAndGet(0); - } - - @Override - public Publisher getAndAdd(final double delta) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAndAddAsync(delta); - } - }); - } - - - @Override - public Publisher getAndSet(final double newValue) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAndSetAsync(newValue); - } - }); - } - - @Override - public Publisher incrementAndGet() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.incrementAndGetAsync(); - } - }); - } - - @Override - public Publisher getAndIncrement() { - return getAndAdd(1); - } - - @Override - public Publisher getAndDecrement() { - return getAndAdd(-1); - } - - @Override - public Publisher set(final double newValue) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(newValue); - } - }); - } - - @Override - public Publisher getAndDelete() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAndDeleteAsync(); - } - }); - } - - public String toString() { - return instance.toString(); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java deleted file mode 100644 index 36aae6e2e..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonAtomicLong; -import org.redisson.api.RAtomicLongAsync; -import org.redisson.api.RAtomicLongReactive; -import org.redisson.api.RFuture; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; - -/** - * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} - * - * @author Nikita Koksharov - * - */ -public class RedissonAtomicLongReactive extends RedissonExpirableReactive implements RAtomicLongReactive { - - private final RAtomicLongAsync instance; - - public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) { - this(commandExecutor, name, new RedissonAtomicLong(commandExecutor, name)); - } - - public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicLongAsync instance) { - super(commandExecutor, name, instance); - this.instance = instance; - } - - @Override - public Publisher addAndGet(final long delta) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.addAndGetAsync(delta); - } - }); - } - - @Override - public Publisher compareAndSet(final long expect, final long update) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.compareAndSetAsync(expect, update); - } - }); - } - - @Override - public Publisher decrementAndGet() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.decrementAndGetAsync(); - } - }); - } - - @Override - public Publisher get() { - return addAndGet(0); - } - - @Override - public Publisher getAndAdd(final long delta) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAndAddAsync(delta); - } - }); - } - - @Override - public Publisher getAndDelete() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAndDeleteAsync(); - } - }); - } - - @Override - public Publisher getAndSet(final long newValue) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAndSetAsync(newValue); - } - }); - } - - @Override - public Publisher incrementAndGet() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.incrementAndGetAsync(); - } - }); - } - - @Override - public Publisher getAndIncrement() { - return getAndAdd(1); - } - - @Override - public Publisher getAndDecrement() { - return getAndAdd(-1); - } - - @Override - public Publisher set(final long newValue) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(newValue); - } - }); - } - - public String toString() { - return instance.toString(); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index bcb3ded23..7b4f751e4 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -18,7 +18,13 @@ package org.redisson.reactive; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; +import org.redisson.RedissonAtomicDouble; +import org.redisson.RedissonAtomicLong; +import org.redisson.RedissonBitSet; +import org.redisson.RedissonBucket; +import org.redisson.RedissonHyperLogLog; import org.redisson.RedissonScript; +import org.redisson.RedissonStream; import org.redisson.api.BatchOptions; import org.redisson.api.BatchResult; import org.redisson.api.RAtomicDoubleReactive; @@ -44,6 +50,7 @@ import org.redisson.api.RScriptReactive; import org.redisson.api.RSetCacheReactive; import org.redisson.api.RSetMultimapReactive; import org.redisson.api.RSetReactive; +import org.redisson.api.RStreamReactive; import org.redisson.api.RTopicReactive; import org.redisson.api.RedissonReactiveClient; import org.redisson.client.codec.Codec; @@ -70,24 +77,34 @@ public class RedissonBatchReactive implements RBatchReactive { this.options = options; } + @Override + public RStreamReactive getStream(String name) { + return ReactiveProxyBuilder.create(executorService, new RedissonStream(executorService, name), RStreamReactive.class); + } + + @Override + public RStreamReactive getStream(String name, Codec codec) { + return ReactiveProxyBuilder.create(executorService, new RedissonStream(codec, executorService, name), RStreamReactive.class); + } + @Override public RBucketReactive getBucket(String name) { - return new RedissonBucketReactive(executorService, name); + return ReactiveProxyBuilder.create(executorService, new RedissonBucket(executorService, name), RBucketReactive.class); } @Override public RBucketReactive getBucket(String name, Codec codec) { - return new RedissonBucketReactive(codec, executorService, name); + return ReactiveProxyBuilder.create(executorService, new RedissonBucket(codec, executorService, name), RBucketReactive.class); } @Override public RHyperLogLogReactive getHyperLogLog(String name) { - return new RedissonHyperLogLogReactive(executorService, name); + return ReactiveProxyBuilder.create(executorService, new RedissonHyperLogLog(executorService, name), RHyperLogLogReactive.class); } @Override public RHyperLogLogReactive getHyperLogLog(String name, Codec codec) { - return new RedissonHyperLogLogReactive(codec, executorService, name); + return ReactiveProxyBuilder.create(executorService, new RedissonHyperLogLog(codec, executorService, name), RHyperLogLogReactive.class); } @Override @@ -172,7 +189,7 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public RAtomicLongReactive getAtomicLongReactive(String name) { - return new RedissonAtomicLongReactive(executorService, name); + return ReactiveProxyBuilder.create(executorService, new RedissonAtomicLong(executorService, name), RAtomicLongReactive.class); } @Override @@ -202,7 +219,7 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public RBitSetReactive getBitSet(String name) { - return new RedissonBitSetReactive(executorService, name); + return ReactiveProxyBuilder.create(executorService, new RedissonBitSet(executorService, name), RBitSetReactive.class); } @Override @@ -296,7 +313,7 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public RAtomicDoubleReactive getAtomicDouble(String name) { - return new RedissonAtomicDoubleReactive(executorService, name); + return ReactiveProxyBuilder.create(executorService, new RedissonAtomicDouble(executorService, name), RAtomicDoubleReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java deleted file mode 100644 index 1a63e53ef..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.BitSet; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonBitSet; -import org.redisson.api.RBitSetAsync; -import org.redisson.api.RBitSetReactive; -import org.redisson.api.RFuture; -import org.redisson.client.codec.BitSetCodec; -import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; -import reactor.rx.Streams; - -/** - * - * @author Nikita Koksharov - * - */ -public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive { - - private final RBitSetAsync instance; - - public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) { - this(connectionManager, name, new RedissonBitSet(connectionManager, name)); - } - - public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name, RBitSetAsync instance) { - super(connectionManager, name, instance); - this.instance = instance; - } - - public Publisher get(final long bitIndex) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAsync(bitIndex); - } - }); - } - - public Publisher set(final long bitIndex, final boolean value) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(bitIndex, value); - } - }); - } - - public Publisher toByteArray() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.toByteArrayAsync(); - } - }); - } - - public Publisher asBitSet() { - return commandExecutor.readReactive(getName(), BitSetCodec.INSTANCE, RedisCommands.GET, getName()); - } - - @Override - public Publisher length() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.lengthAsync(); - } - }); - } - - @Override - public Publisher set(final long fromIndex, final long toIndex, final boolean value) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(fromIndex, toIndex, value); - } - }); - } - - @Override - public Publisher clear(final long fromIndex, final long toIndex) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.clearAsync(fromIndex, toIndex); - } - }); - } - - @Override - public Publisher set(final BitSet bs) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(bs); - } - }); - } - - @Override - public Publisher not() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.notAsync(); - } - }); - } - - @Override - public Publisher set(final long fromIndex, final long toIndex) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(fromIndex, toIndex); - } - }); - } - - @Override - public Publisher size() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.sizeAsync(); - } - }); - } - - @Override - public Publisher set(final long bitIndex) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(bitIndex); - } - }); - } - - @Override - public Publisher cardinality() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.cardinalityAsync(); - } - }); - } - - @Override - public Publisher clear(final long bitIndex) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.clearAsync(bitIndex); - } - }); - } - - @Override - public Publisher clear() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.clearAsync(); - } - }); - } - - @Override - public Publisher or(final String... bitSetNames) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.orAsync(bitSetNames); - } - }); - } - - @Override - public Publisher and(final String... bitSetNames) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.andAsync(bitSetNames); - } - }); - } - - @Override - public Publisher xor(final String... bitSetNames) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.xorAsync(bitSetNames); - } - }); - } - - @Override - public String toString() { - return Streams.create(asBitSet()).next().poll().toString(); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java deleted file mode 100644 index 22eeb6b5d..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.concurrent.TimeUnit; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonBucket; -import org.redisson.api.RBucketAsync; -import org.redisson.api.RBucketReactive; -import org.redisson.api.RFuture; -import org.redisson.client.codec.Codec; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; - -/** - * - * @author Nikita Koksharov - * - * @param value type - */ -public class RedissonBucketReactive extends RedissonExpirableReactive implements RBucketReactive { - - private final RBucketAsync instance; - - public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) { - this(connectionManager, name, new RedissonBucket(connectionManager, name)); - } - - public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name, RBucketAsync instance) { - super(connectionManager, name, instance); - this.instance = instance; - } - - public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { - this(codec, connectionManager, name, new RedissonBucket(codec, connectionManager, name)); - } - - public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RBucketAsync instance) { - super(codec, connectionManager, name, instance); - this.instance = instance; - } - - @Override - public Publisher get() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAsync(); - } - }); - } - - @Override - public Publisher getAndDelete() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAndDeleteAsync(); - } - }); - } - - @Override - public Publisher set(final V value) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(value); - } - }); - } - - @Override - public Publisher set(final V value, final long timeToLive, final TimeUnit timeUnit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.setAsync(value, timeToLive, timeUnit); - } - }); - } - - @Override - public Publisher size() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.sizeAsync(); - } - }); - } - - @Override - public Publisher trySet(final V value) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.trySetAsync(value); - } - }); - } - - @Override - public Publisher trySet(final V value, final long timeToLive, final TimeUnit timeUnit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.trySetAsync(value, timeToLive, timeUnit); - } - }); - } - - @Override - public Publisher compareAndSet(final V expect, final V update) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.compareAndSetAsync(expect, update); - } - }); - } - - @Override - public Publisher getAndSet(final V newValue) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.getAndSetAsync(newValue); - } - }); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java deleted file mode 100644 index e190f8e18..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonHyperLogLog; -import org.redisson.api.RFuture; -import org.redisson.api.RHyperLogLogAsync; -import org.redisson.api.RHyperLogLogReactive; -import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; - -/** - * - * @author Nikita Koksharov - * - * @param value type - */ -public class RedissonHyperLogLogReactive extends RedissonExpirableReactive implements RHyperLogLogReactive { - - private final RHyperLogLogAsync instance; - - public RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) { - super(commandExecutor, name, new RedissonHyperLogLog(commandExecutor, name)); - this.instance = (RHyperLogLogAsync) super.instance; - } - - public RedissonHyperLogLogReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - super(codec, commandExecutor, name, new RedissonHyperLogLog(commandExecutor, name)); - this.instance = (RHyperLogLogAsync) super.instance; - } - - @Override - public Publisher add(final V obj) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.addAsync(obj); - } - }); - } - - @Override - public Publisher addAll(final Collection objects) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.addAllAsync(objects); - } - }); - } - - @Override - public Publisher count() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.countAsync(); - } - }); - } - - @Override - public Publisher countWith(final String... otherLogNames) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.countWithAsync(otherLogNames); - } - }); - } - - @Override - public Publisher mergeWith(final String... otherLogNames) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.mergeWithAsync(otherLogNames); - } - }); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java index c92f24b64..2f75499a1 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java @@ -22,17 +22,17 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.redisson.RedissonKeys; import org.redisson.api.RFuture; import org.redisson.api.RKeysReactive; import org.redisson.api.RType; -import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.RedisClient; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandReactiveService; import org.redisson.connection.MasterSlaveEntry; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import reactor.fn.Supplier; import reactor.rx.Stream; import reactor.rx.Streams; @@ -89,13 +89,6 @@ public class RedissonKeysReactive implements RKeysReactive { return getKeysByPattern(null, count); } - private Publisher> scanIterator(MasterSlaveEntry entry, long startPos, String pattern, int count) { - if (pattern == null) { - return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count); - } - return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count); - } - private Publisher createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) { return new Stream() { @@ -103,6 +96,7 @@ public class RedissonKeysReactive implements RKeysReactive { public void subscribe(final Subscriber t) { t.onSubscribe(new ReactiveSubscription(this, t) { + private RedisClient client; private List firstValues; private long nextIterPos; @@ -116,18 +110,18 @@ public class RedissonKeysReactive implements RKeysReactive { protected void nextValues() { final ReactiveSubscription m = this; - scanIterator(entry, nextIterPos, pattern, count).subscribe(new Subscriber>() { - + instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count).addListener(new FutureListener>() { @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(ListScanResult res) { + public void operationComplete(Future> future) throws Exception { + if (!future.isSuccess()) { + m.onError(future.cause()); + return; + } + + ListScanResult res = future.get(); long prevIterPos = nextIterPos; if (nextIterPos == 0 && firstValues == null) { - firstValues = res.getValues(); + firstValues = (List)(Object)res.getValues(); } else if (res.getValues().equals(firstValues)) { m.onComplete(); currentIndex = 0; @@ -138,8 +132,8 @@ public class RedissonKeysReactive implements RKeysReactive { if (prevIterPos == nextIterPos) { nextIterPos = -1; } - for (String val : res.getValues()) { - m.onNext(val); + for (Object val : res.getValues()) { + m.onNext((String)val); currentIndex--; if (currentIndex == 0) { m.onComplete(); @@ -150,15 +144,7 @@ public class RedissonKeysReactive implements RKeysReactive { m.onComplete(); currentIndex = 0; } - } - - @Override - public void onError(Throwable error) { - m.onError(error); - } - - @Override - public void onComplete() { + if (currentIndex == 0) { return; } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java deleted file mode 100644 index 32c2ccc5c..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.concurrent.TimeUnit; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonLock; -import org.redisson.api.RFuture; -import org.redisson.api.RLockAsync; -import org.redisson.api.RLockReactive; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; - -/** - * - * @author Nikita Koksharov - * - */ -public class RedissonLockReactive extends RedissonExpirableReactive implements RLockReactive { - - private final RLockAsync instance; - - public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name) { - this(connectionManager, name, new RedissonLock(connectionManager, name)); - } - - public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, RLockAsync instance) { - super(connectionManager, name, instance); - this.instance = instance; - } - - @Override - public Publisher forceUnlock() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.forceUnlockAsync(); - } - }); - } - - @Override - public Publisher unlock() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.unlockAsync(); - } - }); - } - - @Override - public Publisher unlock(final long threadId) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.unlockAsync(threadId); - } - }); - } - - @Override - public Publisher tryLock() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryLockAsync(); - } - }); - } - - @Override - public Publisher lock() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.lockAsync(); - } - }); - } - - @Override - public Publisher lock(final long threadId) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.lockAsync(threadId); - } - }); - } - - @Override - public Publisher lock(final long leaseTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.lockAsync(leaseTime, unit); - } - }); - } - - @Override - public Publisher lock(final long leaseTime, final TimeUnit unit, final long threadId) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.lockAsync(leaseTime, unit, threadId); - } - }); - } - - @Override - public Publisher tryLock(final long threadId) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryLockAsync(threadId); - } - }); - } - - @Override - public Publisher tryLock(final long waitTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryLockAsync(waitTime, unit); - } - }); - } - - @Override - public Publisher tryLock(final long waitTime, final long leaseTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryLockAsync(waitTime, leaseTime, unit); - } - }); - } - - @Override - public Publisher tryLock(final long waitTime, final long leaseTime, final TimeUnit unit, final long threadId) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryLockAsync(waitTime, leaseTime, unit, threadId); - } - }); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java deleted file mode 100644 index edc3551e9..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.concurrent.TimeUnit; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonLock; -import org.redisson.RedissonPermitExpirableSemaphore; -import org.redisson.api.RFuture; -import org.redisson.api.RLockAsync; -import org.redisson.api.RPermitExpirableSemaphoreAsync; -import org.redisson.api.RPermitExpirableSemaphoreReactive; -import org.redisson.command.CommandAsyncExecutor; -import org.redisson.command.CommandReactiveExecutor; -import org.redisson.pubsub.SemaphorePubSub; - -import reactor.fn.Supplier; - -/** - * - * @author Nikita Koksharov - * - */ -public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableReactive implements RPermitExpirableSemaphoreReactive { - - private final RPermitExpirableSemaphoreAsync instance; - - public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { - super(connectionManager, name, new RedissonPermitExpirableSemaphore(connectionManager, name, semaphorePubSub)); - instance = (RPermitExpirableSemaphoreAsync) super.instance; - } - - protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) { - return new RedissonLock(commandExecutor, name); - } - - @Override - public Publisher acquire() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.acquireAsync(); - } - }); - } - - @Override - public Publisher acquire(final long leaseTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.acquireAsync(leaseTime, unit); - } - }); - } - - @Override - public Publisher tryAcquire() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(); - } - }); - } - - @Override - public Publisher tryAcquire(final long waitTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(waitTime, unit); - } - }); - } - - @Override - public Publisher tryAcquire(final long waitTime, final long leaseTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(waitTime, leaseTime, unit); - } - }); - } - - @Override - public Publisher tryRelease(final String permitId) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryReleaseAsync(permitId); - } - }); - } - - @Override - public Publisher release(final String permitId) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.releaseAsync(permitId); - } - }); - } - - @Override - public Publisher availablePermits() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.availablePermitsAsync(); - } - }); - } - - @Override - public Publisher trySetPermits(final int permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.trySetPermitsAsync(permits); - } - }); - } - - @Override - public Publisher addPermits(final int permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.addPermitsAsync(permits); - } - }); - } - - @Override - public Publisher updateLeaseTime(final String permitId, final long leaseTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.updateLeaseTimeAsync(permitId, leaseTime, unit); - } - }); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java deleted file mode 100644 index 63911757a..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.concurrent.TimeUnit; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonRateLimiter; -import org.redisson.api.RFuture; -import org.redisson.api.RRateLimiterAsync; -import org.redisson.api.RRateLimiterReactive; -import org.redisson.api.RateIntervalUnit; -import org.redisson.api.RateType; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; - -/** - * - * @author Nikita Koksharov - * - */ -public class RedissonRateLimiterReactive extends RedissonObjectReactive implements RRateLimiterReactive { - - private final RRateLimiterAsync instance; - - public RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name) { - this(connectionManager, name, new RedissonRateLimiter(connectionManager, name)); - } - - private RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name, RRateLimiterAsync instance) { - super(connectionManager, name, instance); - this.instance = instance; - } - - @Override - public Publisher trySetRate(final RateType mode, final long rate, final long rateInterval, - final RateIntervalUnit rateIntervalUnit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.trySetRateAsync(mode, rate, rateInterval, rateIntervalUnit); - } - }); - } - - @Override - public Publisher tryAcquire() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(); - } - }); - } - - @Override - public Publisher tryAcquire(final long permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(permits); - } - }); - } - - @Override - public Publisher acquire() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.acquireAsync(); - } - }); - } - - @Override - public Publisher acquire(final long permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.acquireAsync(permits); - } - }); - } - - @Override - public Publisher tryAcquire(final long timeout, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(timeout, unit); - } - }); - } - - @Override - public Publisher tryAcquire(final long permits, final long timeout, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(permits, timeout, unit); - } - }); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java index 80d63d706..84a46fb77 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonReadWriteLockReactive.java @@ -37,12 +37,12 @@ public class RedissonReadWriteLockReactive extends RedissonExpirableReactive imp @Override public RLockReactive readLock() { - return new RedissonLockReactive(commandExecutor, getName(), instance.readLock()); + return ReactiveProxyBuilder.create(commandExecutor, instance.readLock(), RLockReactive.class); } @Override public RLockReactive writeLock() { - return new RedissonLockReactive(commandExecutor, getName(), instance.writeLock()); + return ReactiveProxyBuilder.create(commandExecutor, instance.writeLock(), RLockReactive.class); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java deleted file mode 100644 index 473819eaa..000000000 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Copyright 2018 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.reactive; - -import java.util.concurrent.TimeUnit; - -import org.reactivestreams.Publisher; -import org.redisson.RedissonSemaphore; -import org.redisson.api.RFuture; -import org.redisson.api.RSemaphoreAsync; -import org.redisson.api.RSemaphoreReactive; -import org.redisson.command.CommandReactiveExecutor; -import org.redisson.pubsub.SemaphorePubSub; - -import reactor.fn.Supplier; - -/** - * - * @author Nikita Koksharov - * - */ -public class RedissonSemaphoreReactive extends RedissonExpirableReactive implements RSemaphoreReactive { - - private final RSemaphoreAsync instance; - - public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) { - super(connectionManager, name, new RedissonSemaphore(connectionManager, name, semaphorePubSub)); - instance = (RSemaphoreAsync) super.instance; - } - - @Override - public Publisher tryAcquire() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(); - } - }); - } - - @Override - public Publisher tryAcquire(final int permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(permits); - } - }); - } - - @Override - public Publisher acquire() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.acquireAsync(); - } - }); - } - - @Override - public Publisher acquire(final int permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.acquireAsync(permits); - } - }); - } - - @Override - public Publisher release() { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.releaseAsync(); - } - }); - } - - @Override - public Publisher release(final int permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.releaseAsync(permits); - } - }); - } - - @Override - public Publisher trySetPermits(final int permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.trySetPermitsAsync(permits); - } - }); - } - - @Override - public Publisher tryAcquire(final long waitTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(waitTime, unit); - } - }); - } - - @Override - public Publisher tryAcquire(final int permits, final long waitTime, final TimeUnit unit) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.tryAcquireAsync(permits, waitTime, unit); - } - }); - } - - @Override - public Publisher reducePermits(final int permits) { - return reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.reducePermitsAsync(permits); - } - }); - } - -} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java index f234ebc3a..d74a26eac 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -48,12 +48,12 @@ public class RedissonTransactionReactive implements RTransactionReactive { @Override public RBucketReactive getBucket(String name) { - return new RedissonBucketReactive(executorService, name, transaction.getBucket(name)); + return ReactiveProxyBuilder.create(executorService, transaction.getBucket(name), RBucketReactive.class); } @Override public RBucketReactive getBucket(String name, Codec codec) { - return new RedissonBucketReactive(codec, executorService, name, transaction.getBucket(name, codec)); + return ReactiveProxyBuilder.create(executorService, transaction.getBucket(name, codec), RBucketReactive.class); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonBitSetReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonBitSetReactiveTest.java index 17ec1b051..5b3fc7e30 100644 --- a/redisson/src/test/java/org/redisson/RedissonBitSetReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBitSetReactiveTest.java @@ -93,19 +93,6 @@ public class RedissonBitSetReactiveTest extends BaseReactiveTest { Assert.assertEquals(16, sync(bs.size()).intValue()); } - @Test - public void testAsBitSet() { - RBitSetReactive bs = redisson.getBitSet("testbitset"); - sync(bs.set(3, true)); - sync(bs.set(41, true)); - Assert.assertEquals(48, sync(bs.size()).intValue()); - - BitSet bitset = sync(bs.asBitSet()); - Assert.assertTrue(bitset.get(3)); - Assert.assertTrue(bitset.get(41)); - Assert.assertEquals(2, bitset.cardinality()); - } - @Test public void testAnd() { RBitSetReactive bs1 = redisson.getBitSet("testbitset1"); diff --git a/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java index cac728d8a..93e0fe8ba 100644 --- a/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java @@ -1,16 +1,25 @@ package org.redisson; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.util.List; -import static org.junit.Assert.*; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; -import org.redisson.api.*; +import org.redisson.api.BatchOptions; +import org.redisson.api.RBatch; +import org.redisson.api.RBatchReactive; +import org.redisson.api.RBucket; +import org.redisson.api.RBucketReactive; +import org.redisson.api.RMapCacheReactive; +import org.redisson.api.RSetReactive; +import org.redisson.api.RedissonClient; +import org.redisson.api.RedissonReactiveClient; import org.redisson.codec.JsonJacksonCodec; import org.redisson.config.Config; -import org.redisson.reactive.RedissonBucketReactive; -import org.redisson.reactive.RedissonMapCacheReactive; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; /** * @@ -25,13 +34,13 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest { RBucketReactive b3 = redisson.getBucket("b3"); sync(b2.set(b3)); sync(b1.set(redisson.getBucket("b2"))); - assertTrue(sync(b1.get()).getClass().equals(RedissonBucketReactive.class)); + assertTrue(sync(b1.get()) instanceof RBucketReactive); assertEquals("b3", ((RBucketReactive) sync(((RBucketReactive) sync(b1.get())).get())).getName()); RBucketReactive b4 = redisson.getBucket("b4"); sync(b4.set(redisson.getMapCache("testCache"))); - assertTrue(sync(b4.get()) instanceof RedissonMapCacheReactive); - sync(((RedissonMapCacheReactive) sync(b4.get())).fastPut(b1, b2)); - assertEquals("b2", ((RBucketReactive) sync(((RedissonMapCacheReactive) sync(b4.get())).get(b1))).getName()); + assertTrue(sync(b4.get()) instanceof RMapCacheReactive); + sync(((RMapCacheReactive) sync(b4.get())).fastPut(b1, b2)); + assertEquals("b2", ((RBucketReactive) sync(((RMapCacheReactive) sync(b4.get())).get(b1))).getName()); } @Test