Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/command/CommandReactiveService.java
#	redisson/src/main/java/org/redisson/reactive/NettyFuturePublisher.java
#	redisson/src/main/java/org/redisson/reactive/RedissonAtomicDoubleReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonAtomicLongReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonBitSetReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonBucketReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonHyperLogLogReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonKeysReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonLockReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonPermitExpirableSemaphoreReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonRateLimiterReactive.java
#	redisson/src/main/java/org/redisson/reactive/RedissonSemaphoreReactive.java
pull/1821/head
Nikita 6 years ago
commit bf7b04ab87

@ -111,13 +111,11 @@ public class RedissonKeys implements RKeys {
return getKeysByPattern(null, count);
}
private ListScanResult<Object> scanIterator(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
public RFuture<ListScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) {
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
return commandExecutor.get(f);
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
}
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
return commandExecutor.get(f);
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
}
private Iterator<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) {
@ -125,7 +123,7 @@ public class RedissonKeys implements RKeys {
@Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return RedissonKeys.this.scanIterator(client, entry, nextIterPos, pattern, count);
return commandExecutor.get(RedissonKeys.this.scanIteratorAsync(client, entry, nextIterPos, pattern, count));
}
@Override

@ -66,30 +66,21 @@ import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.reactive.ReactiveProxyBuilder;
import org.redisson.reactive.RedissonAtomicDoubleReactive;
import org.redisson.reactive.RedissonAtomicLongReactive;
import org.redisson.reactive.RedissonBatchReactive;
import org.redisson.reactive.RedissonBitSetReactive;
import org.redisson.reactive.RedissonBlockingDequeReactive;
import org.redisson.reactive.RedissonBlockingQueueReactive;
import org.redisson.reactive.RedissonBucketReactive;
import org.redisson.reactive.RedissonDequeReactive;
import org.redisson.reactive.RedissonGeoReactive;
import org.redisson.reactive.RedissonHyperLogLogReactive;
import org.redisson.reactive.RedissonKeysReactive;
import org.redisson.reactive.RedissonLexSortedSetReactive;
import org.redisson.reactive.RedissonListMultimapReactive;
import org.redisson.reactive.RedissonListReactive;
import org.redisson.reactive.RedissonLockReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
import org.redisson.reactive.RedissonMapReactive;
import org.redisson.reactive.RedissonPatternTopicReactive;
import org.redisson.reactive.RedissonPermitExpirableSemaphoreReactive;
import org.redisson.reactive.RedissonQueueReactive;
import org.redisson.reactive.RedissonRateLimiterReactive;
import org.redisson.reactive.RedissonReadWriteLockReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonSemaphoreReactive;
import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetMultimapReactive;
import org.redisson.reactive.RedissonSetReactive;
@ -145,22 +136,22 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RLockReactive getFairLock(String name) {
return new RedissonLockReactive(commandExecutor, name, new RedissonFairLock(commandExecutor, name));
return ReactiveProxyBuilder.create(commandExecutor, new RedissonFairLock(commandExecutor, name), RLockReactive.class);
}
@Override
public RRateLimiterReactive getRateLimiter(String name) {
return new RedissonRateLimiterReactive(commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonRateLimiter(commandExecutor, name), RRateLimiterReactive.class);
}
@Override
public RSemaphoreReactive getSemaphore(String name) {
return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonSemaphore(commandExecutor, name, semaphorePubSub), RSemaphoreReactive.class);
}
@Override
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name) {
return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonPermitExpirableSemaphore(commandExecutor, name, semaphorePubSub), RPermitExpirableSemaphoreReactive.class);
}
@Override
@ -170,7 +161,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RLockReactive getLock(String name) {
return new RedissonLockReactive(commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonLock(commandExecutor, name), RLockReactive.class);
}
@Override
@ -185,12 +176,12 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBucketReactive<V> getBucket(String name) {
return new RedissonBucketReactive<V>(commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBucket<V>(commandExecutor, name), RBucketReactive.class);
}
@Override
public <V> RBucketReactive<V> getBucket(String name, Codec codec) {
return new RedissonBucketReactive<V>(codec, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBucket<V>(codec, commandExecutor, name), RBucketReactive.class);
}
@Override
@ -211,12 +202,12 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(String name) {
return new RedissonHyperLogLogReactive<V>(commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonHyperLogLog<V>(commandExecutor, name), RHyperLogLogReactive.class);
}
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(String name, Codec codec) {
return new RedissonHyperLogLogReactive<V>(codec, commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonHyperLogLog<V>(codec, commandExecutor, name), RHyperLogLogReactive.class);
}
@Override
@ -346,17 +337,17 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RAtomicLongReactive getAtomicLong(String name) {
return new RedissonAtomicLongReactive(commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicLong(commandExecutor, name), RAtomicLongReactive.class);
}
@Override
public RAtomicDoubleReactive getAtomicDouble(String name) {
return new RedissonAtomicDoubleReactive(commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(commandExecutor, name), RAtomicDoubleReactive.class);
}
@Override
public RBitSetReactive getBitSet(String name) {
return new RedissonBitSetReactive(commandExecutor, name);
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBitSet(commandExecutor, name), RBitSetReactive.class);
}
@Override

@ -16,26 +16,41 @@
package org.redisson;
import java.io.Serializable;
import org.redisson.client.codec.Codec;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RBitSet;
import org.redisson.api.RBitSetReactive;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RBlockingQueueReactive;
import org.redisson.api.RBucket;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RDeque;
import org.redisson.api.RDequeReactive;
import org.redisson.api.RHyperLogLog;
import org.redisson.api.RHyperLogLogReactive;
import org.redisson.api.RLexSortedSet;
import org.redisson.api.RLexSortedSetReactive;
import org.redisson.api.RList;
import org.redisson.api.RListReactive;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RMapReactive;
import org.redisson.api.RObject;
import org.redisson.api.RObjectReactive;
import org.redisson.api.RQueue;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.annotation.REntity;
import org.redisson.client.codec.Codec;
import org.redisson.liveobject.misc.ClassUtils;
import org.redisson.misc.BiHashMap;
import org.redisson.reactive.RedissonAtomicLongReactive;
import org.redisson.reactive.RedissonBitSetReactive;
import org.redisson.reactive.RedissonBlockingQueueReactive;
import org.redisson.reactive.RedissonBucketReactive;
import org.redisson.reactive.RedissonDequeReactive;
import org.redisson.reactive.RedissonHyperLogLogReactive;
import org.redisson.reactive.RedissonLexSortedSetReactive;
import org.redisson.reactive.RedissonListReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
import org.redisson.reactive.RedissonMapReactive;
import org.redisson.reactive.RedissonQueueReactive;
import org.redisson.reactive.RedissonScoredSortedSetReactive;
import org.redisson.reactive.RedissonSetCacheReactive;
import org.redisson.reactive.RedissonSetReactive;
/**
*
@ -46,20 +61,20 @@ public class RedissonReference implements Serializable {
private static final BiHashMap<String, String> reactiveMap = new BiHashMap<String, String>();
static {
reactiveMap.put(RedissonAtomicLongReactive.class.getName(), RedissonAtomicLong.class.getName());
reactiveMap.put(RedissonBitSetReactive.class.getName(), RedissonBitSet.class.getName());
reactiveMap.put(RedissonBlockingQueueReactive.class.getName(), RedissonBlockingQueue.class.getName());
reactiveMap.put(RedissonBucketReactive.class.getName(), RedissonBucket.class.getName());
reactiveMap.put(RedissonDequeReactive.class.getName(), RedissonDeque.class.getName());
reactiveMap.put(RedissonHyperLogLogReactive.class.getName(), RedissonHyperLogLog.class.getName());
reactiveMap.put(RedissonLexSortedSetReactive.class.getName(), RedissonLexSortedSet.class.getName());
reactiveMap.put(RedissonListReactive.class.getName(), RedissonList.class.getName());
reactiveMap.put(RedissonMapCacheReactive.class.getName(), RedissonMapCache.class.getName());
reactiveMap.put(RedissonMapReactive.class.getName(), RedissonMap.class.getName());
reactiveMap.put(RedissonQueueReactive.class.getName(), RedissonQueue.class.getName());
reactiveMap.put(RedissonScoredSortedSetReactive.class.getName(), RedissonScoredSortedSet.class.getName());
reactiveMap.put(RedissonSetCacheReactive.class.getName(), RedissonSetCache.class.getName());
reactiveMap.put(RedissonSetReactive.class.getName(), RedissonSet.class.getName());
reactiveMap.put(RAtomicLongReactive.class.getName(), RAtomicLong.class.getName());
reactiveMap.put(RBitSetReactive.class.getName(), RBitSet.class.getName());
reactiveMap.put(RBlockingQueueReactive.class.getName(), RBlockingQueue.class.getName());
reactiveMap.put(RBucketReactive.class.getName(), RBucket.class.getName());
reactiveMap.put(RDequeReactive.class.getName(), RDeque.class.getName());
reactiveMap.put(RHyperLogLogReactive.class.getName(), RHyperLogLog.class.getName());
reactiveMap.put(RLexSortedSetReactive.class.getName(), RLexSortedSet.class.getName());
reactiveMap.put(RListReactive.class.getName(), RList.class.getName());
reactiveMap.put(RMapCacheReactive.class.getName(), RMapCache.class.getName());
reactiveMap.put(RMapReactive.class.getName(), RMap.class.getName());
reactiveMap.put(RQueueReactive.class.getName(), RQueue.class.getName());
reactiveMap.put(RScoredSortedSetReactive.class.getName(), RScoredSortedSet.class.getName());
reactiveMap.put(RSetCacheReactive.class.getName(), RSetCache.class.getName());
reactiveMap.put(RSetReactive.class.getName(), RSet.class.getName());
reactiveMap.makeImmutable();
}
@ -73,11 +88,11 @@ public class RedissonReference implements Serializable {
public RedissonReference() {
}
public RedissonReference(Class type, String keyName) {
public RedissonReference(Class<?> type, String keyName) {
this(type, keyName, null);
}
public RedissonReference(Class type, String keyName, Codec codec) {
public RedissonReference(Class<?> type, String keyName, Codec codec) {
if (!ClassUtils.isAnnotationPresent(type, REntity.class) && !RObject.class.isAssignableFrom(type) && !RObjectReactive.class.isAssignableFrom(type)) {
throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive");
}
@ -131,7 +146,8 @@ public class RedissonReference implements Serializable {
* @param type the type to set
*/
public void setType(Class<?> type) {
if (!ClassUtils.isAnnotationPresent(type, REntity.class) && (!RObject.class.isAssignableFrom(type) || !RObjectReactive.class.isAssignableFrom(type))) {
if (!ClassUtils.isAnnotationPresent(type, REntity.class)
&& (!RObject.class.isAssignableFrom(type) || !RObjectReactive.class.isAssignableFrom(type))) {
throw new IllegalArgumentException("Class reference has to be a type of either RObject or RLiveObject or RObjectReactive");
}
this.type = type.getName();

@ -33,6 +33,32 @@ import org.redisson.client.codec.Codec;
*/
public interface RBatchReactive {
/**
* Returns stream instance by <code>name</code>
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param <K> type of key
* @param <V> type of value
* @param name of stream
* @return RStream object
*/
<K, V> RStreamReactive<K, V> getStream(String name);
/**
* Returns stream instance by <code>name</code>
* using provided <code>codec</code> for entries.
* <p>
* Requires <b>Redis 5.0.0 and higher.</b>
*
* @param <K> type of key
* @param <V> type of value
* @param name - name of stream
* @param codec - codec for entry
* @return RStream object
*/
<K, V> RStreamReactive<K, V> getStream(String name, Codec codec);
/**
* Returns geospatial items holder instance by <code>name</code>.
*

@ -27,8 +27,6 @@ import org.reactivestreams.Publisher;
*/
public interface RBitSetReactive extends RExpirableReactive {
Publisher<BitSet> asBitSet();
Publisher<byte[]> toByteArray();
/**

@ -95,45 +95,4 @@ public class DefaultReferenceCodecProvider implements ReferenceCodecProvider {
codecCache.putIfAbsent(cls, codec);
}
@Override
public <T extends Codec> void registerCodec(REntity anno, Class<?> cls, T codec) {
if (!cls.isAnnotationPresent(anno.getClass())) {
throw new IllegalArgumentException("Annotation REntity does not present on type [" + cls.getCanonicalName() + "]");
}
registerCodec((Class<Codec>) anno.codec(), codec);
}
@Override
public <T extends Codec, K extends RObject> void registerCodec(RObjectField anno, Class<?> cls, Class<K> rObjectClass, String fieldName, T codec) {
try {
if (!cls.getField(fieldName).isAnnotationPresent(anno.getClass())) {
throw new IllegalArgumentException("Annotation RObjectField does not present on field " + fieldName + " of type [" + cls.getCanonicalName() + "]");
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
if (rObjectClass.isInterface()) {
throw new IllegalArgumentException("Cannot lookup an interface class of RObject [" + rObjectClass.getCanonicalName() + "]. Concrete class only.");
}
registerCodec((Class<Codec>) anno.codec(), codec);
}
@Override
public <T extends Codec, K extends RObject> void registerCodec(Class<T> codecClass, Class<K> rObjectClass, T codec) {
if (rObjectClass.isInterface()) {
throw new IllegalArgumentException("Cannot register an interface class of RObject [" + rObjectClass.getCanonicalName() + "]. Concrete class only.");
}
registerCodec((Class<Codec>) codecClass, codec);
}
@Override
public <T extends Codec, K extends RObject> void registerCodec(Class<T> codecClass, Class<K> rObjectClass, String name, T codec) {
registerCodec(codecClass, rObjectClass, codec);
}
@Override
public <T extends Codec> void registerCodec(Class<T> codecClass, RObject rObject, T codec) {
registerCodec(codecClass, rObject.getClass(), rObject.getName(), codec);
}
}

@ -109,71 +109,4 @@ public interface ReferenceCodecProvider {
*/
<T extends Codec> void registerCodec(Class<T> codecClass, T codec);
/**
* Register a codec by the REntity annotation and the class annotated with
* it.
*
* @param <T> the codec type to register.
* @param anno REntity annotation used on the class.
* @param cls The class that has the REntity annotation.
* @param codec the codec instance.
*/
<T extends Codec> void registerCodec(REntity anno, Class<?> cls, T codec);
/**
* Register a codec by the RObjectField annotation, the class annotated with
* REntity, the implementation class of RObject the field is going to
* be transformed into and the name of the field with this RObjectField
* annotation.
*
* @param <T> the codec type to register.
* @param <K> the type of the RObject.
* @param anno RObjectField annotation used on the field.
* @param cls The class that has the REntity annotation.
* @param rObjectClass the implementation class of RObject the field is going
* to be transformed into.
* @param fieldName the name of the field with this RObjectField annotation.
* @param codec the codec instance.
*/
<T extends Codec, K extends RObject> void registerCodec(RObjectField anno, Class<?> cls, Class<K> rObjectClass, String fieldName, T codec);
/**
* Register a codec by its class or super class and the class of the RObject
* implementation.
*
* @param <T> the codec type to register.
* @param <K> the RObjectField type.
* @param codecClass the codec Class to register it can be a super class of
* the instance.
* @param rObjectClass the class of the RObject implementation.
* @param codec the codec instance.
*/
<T extends Codec, K extends RObject> void registerCodec(Class<T> codecClass, Class<K> rObjectClass, T codec);
/**
* Register a codec by its class or super class, the class of the RObject
* implementation and the name of RObject (the value returned by
* RObjectField.getName() method).
*
* @param <T> the codec type to register.
* @param <K> the RObjectField type.
* @param codecClass the codec Class to register it can be a super class of
* the instance.
* @param rObjectClass the class of the RObject implementation.
* @param name the name of RObject.
* @param codec the codec instance.
*/
<T extends Codec, K extends RObject> void registerCodec(Class<T> codecClass, Class<K> rObjectClass, String name, T codec);
/**
* Register a codec by its class or super class and an instance of the
* RObject.
*
* @param <T> the codec type to register.
* @param codecClass the codec Class to register it can be a super class of
* the instance.
* @param rObject instance of the RObject implementation.
* @param codec the codec instance.
*/
<T extends Codec> void registerCodec(Class<T> codecClass, RObject rObject, T codec);
}

@ -33,8 +33,6 @@ public interface CommandReactiveExecutor extends CommandAsyncExecutor {
<R> Publisher<R> reactive(Supplier<RFuture<R>> supplier);
<T, R> Publisher<R> writeReactive(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> evalWriteReactive(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> Publisher<R> writeReactive(String key, Codec codec, RedisCommand<T> command, Object ... params);

@ -23,7 +23,6 @@ import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import reactor.core.publisher.Flux;
@ -66,16 +65,6 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
});
}
@Override
public <T, R> Publisher<R> writeReactive(final MasterSlaveEntry entry, final Codec codec, final RedisCommand<T> command, final Object ... params) {
return reactive(new Supplier<RFuture<R>>() {
@Override
public RFuture<R> get() {
return writeAsync(entry, codec, command, params);
};
});
}
@Override
public <T, R> Publisher<R> readReactive(final String key, final Codec codec, final RedisCommand<T> command, final Object ... params) {
return reactive(new Supplier<RFuture<R>>() {

@ -84,7 +84,7 @@ public class RedissonObjectBuilder {
public void store(RObject ar, String fieldName, RMap<String, Object> liveMap) {
Codec codec = ar.getCodec();
codecProvider.registerCodec((Class) codec.getClass(), ar, codec);
codecProvider.registerCodec((Class) codec.getClass(), codec);
liveMap.fastPut(fieldName,
new RedissonReference(ar.getClass(), ar.getName(), codec));
}

@ -114,14 +114,13 @@ public class RedissonObjectFactory {
Object id = ns.resolveId(rr.getKeyName());
return (T) liveObjectService.createLiveObject(type, id);
}
List<Class<?>> interfaces = Arrays.asList(type.getInterfaces());
for (Class<?> iType : interfaces) {
if (builders.containsKey(iType)) {// user cache to speed up things a little.
Method builder = builders.get(iType).get(isDefaultCodec(rr));
return (T) (isDefaultCodec(rr)
? builder.invoke(redisson, rr.getKeyName())
: builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType())));
}
RedissonObjectBuilder b = builders.get(type);
if (b != null) {
Method builder = b.get(isDefaultCodec(rr));
return (T) (isDefaultCodec(rr)
? builder.invoke(redisson, rr.getKeyName())
: builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType())));
}
}
throw new ClassNotFoundException("No RObject is found to match class type of " + rr.getTypeName() + " with codec type of " + rr.getCodecName());
@ -139,14 +138,12 @@ public class RedissonObjectFactory {
* Live Object from reference in reactive client is not supported yet.
*/
if (type != null) {
List<Class<?>> interfaces = Arrays.asList(type.getInterfaces());
for (Class<?> iType : interfaces) {
if (builders.containsKey(iType)) {// user cache to speed up things a little.
Method builder = builders.get(iType).get(isDefaultCodec(rr));
RedissonObjectBuilder b = builders.get(type);
if (b != null) {
Method builder = b.get(isDefaultCodec(rr));
return (T) (isDefaultCodec(rr)
? builder.invoke(redisson, rr.getKeyName())
: builder.invoke(redisson, rr.getKeyName(), codecProvider.getCodec(rr.getCodecType())));
}
}
}
throw new ClassNotFoundException("No RObjectReactive is found to match class type of " + rr.getReactiveTypeName()+ " with codec type of " + rr.getCodecName());
@ -158,14 +155,18 @@ public class RedissonObjectFactory {
}
if (object instanceof RObject && !(object instanceof RLiveObject)) {
Class<?> clazz = object.getClass().getInterfaces()[0];
RObject rObject = ((RObject) object);
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec());
return new RedissonReference(object.getClass(), ((RObject) object).getName(), ((RObject) object).getCodec());
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
return new RedissonReference(clazz, rObject.getName(), rObject.getCodec());
}
if (object instanceof RObjectReactive && !(object instanceof RLiveObject)) {
Class<?> clazz = object.getClass().getInterfaces()[0];
RObjectReactive rObject = ((RObjectReactive) object);
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), (Class) rObject.getClass(), rObject.getName(), rObject.getCodec());
return new RedissonReference(object.getClass(), ((RObjectReactive) object).getName(), ((RObjectReactive) object).getCodec());
config.getReferenceCodecProvider().registerCodec((Class) rObject.getCodec().getClass(), rObject.getCodec());
return new RedissonReference(clazz, rObject.getName(), rObject.getCodec());
}
try {
@ -179,6 +180,7 @@ public class RedissonObjectFactory {
.getFieldsWithAnnotation(rEntity, RId.class)
.getOnly().getName();
Class<?> type = ClassUtils.getDeclaredField(rEntity, name).getType();
return new RedissonReference(rEntity,
ns.getName(rEntity, type, name, ((RLiveObject) object).getLiveObjectId()));
}

@ -1,147 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.redisson.RedissonAtomicDouble;
import org.redisson.api.RAtomicDoubleAsync;
import org.redisson.api.RAtomicDoubleReactive;
import org.redisson.api.RFuture;
import org.redisson.command.CommandReactiveExecutor;
import java.util.function.Supplier;
/**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
*
* @author Nikita Koksharov
*
*/
public class RedissonAtomicDoubleReactive extends RedissonExpirableReactive implements RAtomicDoubleReactive {
private final RAtomicDoubleAsync instance;
public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor, name, new RedissonAtomicDouble(commandExecutor, name));
}
public RedissonAtomicDoubleReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicDoubleAsync instance) {
super(commandExecutor, name, instance);
this.instance = instance;
}
@Override
public Publisher<Double> addAndGet(final double delta) {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.addAndGetAsync(delta);
}
});
}
@Override
public Publisher<Boolean> compareAndSet(final double expect, final double update) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.compareAndSetAsync(expect, update);
}
});
}
@Override
public Publisher<Double> decrementAndGet() {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.decrementAndGetAsync();
}
});
}
@Override
public Publisher<Double> get() {
return addAndGet(0);
}
@Override
public Publisher<Double> getAndAdd(final double delta) {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.getAndAddAsync(delta);
}
});
}
@Override
public Publisher<Double> getAndSet(final double newValue) {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.getAndSetAsync(newValue);
}
});
}
@Override
public Publisher<Double> incrementAndGet() {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.incrementAndGetAsync();
}
});
}
@Override
public Publisher<Double> getAndIncrement() {
return getAndAdd(1);
}
@Override
public Publisher<Double> getAndDecrement() {
return getAndAdd(-1);
}
@Override
public Publisher<Void> set(final double newValue) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(newValue);
}
});
}
@Override
public Publisher<Double> getAndDelete() {
return reactive(new Supplier<RFuture<Double>>() {
@Override
public RFuture<Double> get() {
return instance.getAndDeleteAsync();
}
});
}
public String toString() {
return instance.toString();
}
}

@ -1,146 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonAtomicLong;
import org.redisson.api.RAtomicLongAsync;
import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RFuture;
import org.redisson.command.CommandReactiveExecutor;
/**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
*
* @author Nikita Koksharov
*
*/
public class RedissonAtomicLongReactive extends RedissonExpirableReactive implements RAtomicLongReactive {
private final RAtomicLongAsync instance;
public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name) {
this(commandExecutor, name, new RedissonAtomicLong(commandExecutor, name));
}
public RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, String name, RAtomicLongAsync instance) {
super(commandExecutor, name, instance);
this.instance = instance;
}
@Override
public Publisher<Long> addAndGet(final long delta) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.addAndGetAsync(delta);
}
});
}
@Override
public Publisher<Boolean> compareAndSet(final long expect, final long update) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.compareAndSetAsync(expect, update);
}
});
}
@Override
public Publisher<Long> decrementAndGet() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.decrementAndGetAsync();
}
});
}
@Override
public Publisher<Long> get() {
return addAndGet(0);
}
@Override
public Publisher<Long> getAndAdd(final long delta) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.getAndAddAsync(delta);
}
});
}
@Override
public Publisher<Long> getAndDelete() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.getAndDeleteAsync();
}
});
}
@Override
public Publisher<Long> getAndSet(final long newValue) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.getAndSetAsync(newValue);
}
});
}
@Override
public Publisher<Long> incrementAndGet() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.incrementAndGetAsync();
}
});
}
@Override
public Publisher<Long> getAndIncrement() {
return getAndAdd(1);
}
@Override
public Publisher<Long> getAndDecrement() {
return getAndAdd(-1);
}
@Override
public Publisher<Void> set(final long newValue) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(newValue);
}
});
}
public String toString() {
return instance.toString();
}
}

@ -19,7 +19,13 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonAtomicDouble;
import org.redisson.RedissonAtomicLong;
import org.redisson.RedissonBitSet;
import org.redisson.RedissonBucket;
import org.redisson.RedissonHyperLogLog;
import org.redisson.RedissonScript;
import org.redisson.RedissonStream;
import org.redisson.api.BatchOptions;
import org.redisson.api.BatchResult;
import org.redisson.api.RAtomicDoubleReactive;
@ -45,6 +51,7 @@ import org.redisson.api.RScriptReactive;
import org.redisson.api.RSetCacheReactive;
import org.redisson.api.RSetMultimapReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RStreamReactive;
import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
@ -72,24 +79,34 @@ public class RedissonBatchReactive implements RBatchReactive {
this.options = options;
}
@Override
public <K, V> RStreamReactive<K, V> getStream(String name) {
return ReactiveProxyBuilder.create(executorService, new RedissonStream<K, V>(executorService, name), RStreamReactive.class);
}
@Override
public <K, V> RStreamReactive<K, V> getStream(String name, Codec codec) {
return ReactiveProxyBuilder.create(executorService, new RedissonStream<K, V>(codec, executorService, name), RStreamReactive.class);
}
@Override
public <V> RBucketReactive<V> getBucket(String name) {
return new RedissonBucketReactive<V>(executorService, name);
return ReactiveProxyBuilder.create(executorService, new RedissonBucket<V>(executorService, name), RBucketReactive.class);
}
@Override
public <V> RBucketReactive<V> getBucket(String name, Codec codec) {
return new RedissonBucketReactive<V>(codec, executorService, name);
return ReactiveProxyBuilder.create(executorService, new RedissonBucket<V>(codec, executorService, name), RBucketReactive.class);
}
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(String name) {
return new RedissonHyperLogLogReactive<V>(executorService, name);
return ReactiveProxyBuilder.create(executorService, new RedissonHyperLogLog<V>(executorService, name), RHyperLogLogReactive.class);
}
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(String name, Codec codec) {
return new RedissonHyperLogLogReactive<V>(codec, executorService, name);
return ReactiveProxyBuilder.create(executorService, new RedissonHyperLogLog<V>(codec, executorService, name), RHyperLogLogReactive.class);
}
@Override
@ -174,7 +191,7 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public RAtomicLongReactive getAtomicLongReactive(String name) {
return new RedissonAtomicLongReactive(executorService, name);
return ReactiveProxyBuilder.create(executorService, new RedissonAtomicLong(executorService, name), RAtomicLongReactive.class);
}
@Override
@ -204,7 +221,7 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public RBitSetReactive getBitSet(String name) {
return new RedissonBitSetReactive(executorService, name);
return ReactiveProxyBuilder.create(executorService, new RedissonBitSet(executorService, name), RBitSetReactive.class);
}
@Override
@ -298,7 +315,7 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public RAtomicDoubleReactive getAtomicDouble(String name) {
return new RedissonAtomicDoubleReactive(executorService, name);
return ReactiveProxyBuilder.create(executorService, new RedissonAtomicDouble(executorService, name), RAtomicDoubleReactive.class);
}
@Override

@ -1,226 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.BitSet;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonBitSet;
import org.redisson.api.RBitSetAsync;
import org.redisson.api.RBitSetReactive;
import org.redisson.api.RFuture;
import org.redisson.client.codec.BitSetCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.publisher.Mono;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonBitSetReactive extends RedissonExpirableReactive implements RBitSetReactive {
private final RBitSetAsync instance;
public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name) {
this(connectionManager, name, new RedissonBitSet(connectionManager, name));
}
public RedissonBitSetReactive(CommandReactiveExecutor connectionManager, String name, RBitSetAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;
}
public Publisher<Boolean> get(final long bitIndex) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.getAsync(bitIndex);
}
});
}
public Publisher<Boolean> set(final long bitIndex, final boolean value) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.setAsync(bitIndex, value);
}
});
}
public Publisher<byte[]> toByteArray() {
return reactive(new Supplier<RFuture<byte[]>>() {
@Override
public RFuture<byte[]> get() {
return instance.toByteArrayAsync();
}
});
}
public Publisher<BitSet> asBitSet() {
return commandExecutor.readReactive(getName(), BitSetCodec.INSTANCE, RedisCommands.GET, getName());
}
@Override
public Publisher<Long> length() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.lengthAsync();
}
});
}
@Override
public Publisher<Void> set(final long fromIndex, final long toIndex, final boolean value) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(fromIndex, toIndex, value);
}
});
}
@Override
public Publisher<Void> clear(final long fromIndex, final long toIndex) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.clearAsync(fromIndex, toIndex);
}
});
}
@Override
public Publisher<Void> set(final BitSet bs) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(bs);
}
});
}
@Override
public Publisher<Void> not() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.notAsync();
}
});
}
@Override
public Publisher<Void> set(final long fromIndex, final long toIndex) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(fromIndex, toIndex);
}
});
}
@Override
public Publisher<Long> size() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.sizeAsync();
}
});
}
@Override
public Publisher<Boolean> set(final long bitIndex) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.setAsync(bitIndex);
}
});
}
@Override
public Publisher<Long> cardinality() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.cardinalityAsync();
}
});
}
@Override
public Publisher<Boolean> clear(final long bitIndex) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.clearAsync(bitIndex);
}
});
}
@Override
public Publisher<Void> clear() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.clearAsync();
}
});
}
@Override
public Publisher<Void> or(final String... bitSetNames) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.orAsync(bitSetNames);
}
});
}
@Override
public Publisher<Void> and(final String... bitSetNames) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.andAsync(bitSetNames);
}
});
}
@Override
public Publisher<Void> xor(final String... bitSetNames) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.xorAsync(bitSetNames);
}
});
}
@Override
public String toString() {
return Mono.from(asBitSet()).block().toString();
}
}

@ -1,147 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonBucket;
import org.redisson.api.RBucketAsync;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public class RedissonBucketReactive<V> extends RedissonExpirableReactive implements RBucketReactive<V> {
private final RBucketAsync<V> instance;
public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) {
this(connectionManager, name, new RedissonBucket<V>(connectionManager, name));
}
public RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name, RBucketAsync<V> instance) {
super(connectionManager, name, instance);
this.instance = instance;
}
public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) {
this(codec, connectionManager, name, new RedissonBucket<V>(codec, connectionManager, name));
}
public RedissonBucketReactive(Codec codec, CommandReactiveExecutor connectionManager, String name, RBucketAsync<V> instance) {
super(codec, connectionManager, name, instance);
this.instance = instance;
}
@Override
public Publisher<V> get() {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.getAsync();
}
});
}
@Override
public Publisher<V> getAndDelete() {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.getAndDeleteAsync();
}
});
}
@Override
public Publisher<Void> set(final V value) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(value);
}
});
}
@Override
public Publisher<Void> set(final V value, final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.setAsync(value, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<Long> size() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.sizeAsync();
}
});
}
@Override
public Publisher<Boolean> trySet(final V value) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetAsync(value);
}
});
}
@Override
public Publisher<Boolean> trySet(final V value, final long timeToLive, final TimeUnit timeUnit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetAsync(value, timeToLive, timeUnit);
}
});
}
@Override
public Publisher<Boolean> compareAndSet(final V expect, final V update) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.compareAndSetAsync(expect, update);
}
});
}
@Override
public Publisher<V> getAndSet(final V newValue) {
return reactive(new Supplier<RFuture<V>>() {
@Override
public RFuture<V> get() {
return instance.getAndSetAsync(newValue);
}
});
}
}

@ -1,99 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.Collection;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonHyperLogLog;
import org.redisson.api.RFuture;
import org.redisson.api.RHyperLogLogAsync;
import org.redisson.api.RHyperLogLogReactive;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
/**
*
* @author Nikita Koksharov
*
* @param <V> value type
*/
public class RedissonHyperLogLogReactive<V> extends RedissonExpirableReactive implements RHyperLogLogReactive<V> {
private final RHyperLogLogAsync<V> instance;
public RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name, new RedissonHyperLogLog<V>(commandExecutor, name));
this.instance = (RHyperLogLogAsync<V>) super.instance;
}
public RedissonHyperLogLogReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name, new RedissonHyperLogLog<V>(commandExecutor, name));
this.instance = (RHyperLogLogAsync<V>) super.instance;
}
@Override
public Publisher<Boolean> add(final V obj) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.addAsync(obj);
}
});
}
@Override
public Publisher<Boolean> addAll(final Collection<V> objects) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.addAllAsync(objects);
}
});
}
@Override
public Publisher<Long> count() {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.countAsync();
}
});
}
@Override
public Publisher<Long> countWith(final String... otherLogNames) {
return reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return instance.countWithAsync(otherLogNames);
}
});
}
@Override
public Publisher<Void> mergeWith(final String... otherLogNames) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.mergeWithAsync(otherLogNames);
}
});
}
}

@ -30,12 +30,15 @@ import org.redisson.RedissonKeys;
import org.redisson.api.RFuture;
import org.redisson.api.RKeysReactive;
import org.redisson.api.RType;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveService;
import org.redisson.connection.MasterSlaveEntry;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
@ -90,19 +93,14 @@ public class RedissonKeysReactive implements RKeysReactive {
return getKeysByPattern(null, count);
}
private Publisher<ListScanResult<String>> scanIterator(MasterSlaveEntry entry, long startPos, String pattern, int count) {
if (pattern == null) {
return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "COUNT", count);
}
return commandExecutor.writeReactive(entry, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern, "COUNT", count);
}
private Publisher<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern, final int count) {
return Flux.create(new Consumer<FluxSink<String>>() {
@Override
public void accept(FluxSink<String> emitter) {
emitter.onRequest(new LongConsumer() {
private RedisClient client;
private List<String> firstValues;
private long nextIterPos;
@ -115,18 +113,19 @@ public class RedissonKeysReactive implements RKeysReactive {
}
protected void nextValues(FluxSink<String> emitter) {
scanIterator(entry, nextIterPos, pattern, count).subscribe(new Subscriber<ListScanResult<String>>() {
instance.scanIteratorAsync(client, entry, nextIterPos, pattern, count).addListener(new FutureListener<ListScanResult<Object>>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ListScanResult<String> res) {
public void operationComplete(Future<ListScanResult<Object>> future) throws Exception {
if (!future.isSuccess()) {
emitter.error(future.cause());
return;
}
ListScanResult<Object> res = future.get();
long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
firstValues = (List<String>)(Object)res.getValues();
} else if (res.getValues().equals(firstValues)) {
emitter.complete();
currentIndex = 0;
@ -137,8 +136,8 @@ public class RedissonKeysReactive implements RKeysReactive {
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (String val : res.getValues()) {
emitter.next(val);
for (Object val : res.getValues()) {
emitter.next((String)val);
currentIndex--;
if (currentIndex == 0) {
emitter.complete();
@ -149,20 +148,13 @@ public class RedissonKeysReactive implements RKeysReactive {
emitter.complete();
currentIndex = 0;
}
}
@Override
public void onError(Throwable error) {
emitter.error(error);
}
@Override
public void onComplete() {
if (currentIndex == 0) {
return;
}
nextValues(emitter);
}
});
}

@ -1,166 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonLock;
import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync;
import org.redisson.api.RLockReactive;
import org.redisson.command.CommandReactiveExecutor;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonLockReactive extends RedissonExpirableReactive implements RLockReactive {
private final RLockAsync instance;
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name) {
this(connectionManager, name, new RedissonLock(connectionManager, name));
}
public RedissonLockReactive(CommandReactiveExecutor connectionManager, String name, RLockAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;
}
@Override
public Publisher<Boolean> forceUnlock() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.forceUnlockAsync();
}
});
}
@Override
public Publisher<Void> unlock() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.unlockAsync();
}
});
}
@Override
public Publisher<Void> unlock(final long threadId) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.unlockAsync(threadId);
}
});
}
@Override
public Publisher<Boolean> tryLock() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync();
}
});
}
@Override
public Publisher<Void> lock() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.lockAsync();
}
});
}
@Override
public Publisher<Void> lock(final long threadId) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.lockAsync(threadId);
}
});
}
@Override
public Publisher<Void> lock(final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.lockAsync(leaseTime, unit);
}
});
}
@Override
public Publisher<Void> lock(final long leaseTime, final TimeUnit unit, final long threadId) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.lockAsync(leaseTime, unit, threadId);
}
});
}
@Override
public Publisher<Boolean> tryLock(final long threadId) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync(threadId);
}
});
}
@Override
public Publisher<Boolean> tryLock(final long waitTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync(waitTime, unit);
}
});
}
@Override
public Publisher<Boolean> tryLock(final long waitTime, final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync(waitTime, leaseTime, unit);
}
});
}
@Override
public Publisher<Boolean> tryLock(final long waitTime, final long leaseTime, final TimeUnit unit, final long threadId) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryLockAsync(waitTime, leaseTime, unit, threadId);
}
});
}
}

@ -1,160 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonLock;
import org.redisson.RedissonPermitExpirableSemaphore;
import org.redisson.api.RFuture;
import org.redisson.api.RLockAsync;
import org.redisson.api.RPermitExpirableSemaphoreAsync;
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.pubsub.SemaphorePubSub;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonPermitExpirableSemaphoreReactive extends RedissonExpirableReactive implements RPermitExpirableSemaphoreReactive {
private final RPermitExpirableSemaphoreAsync instance;
public RedissonPermitExpirableSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
super(connectionManager, name, new RedissonPermitExpirableSemaphore(connectionManager, name, semaphorePubSub));
instance = (RPermitExpirableSemaphoreAsync) super.instance;
}
protected RLockAsync createLock(CommandAsyncExecutor connectionManager, String name) {
return new RedissonLock(commandExecutor, name);
}
@Override
public Publisher<String> acquire() {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.acquireAsync();
}
});
}
@Override
public Publisher<String> acquire(final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.acquireAsync(leaseTime, unit);
}
});
}
@Override
public Publisher<String> tryAcquire() {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.tryAcquireAsync();
}
});
}
@Override
public Publisher<String> tryAcquire(final long waitTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.tryAcquireAsync(waitTime, unit);
}
});
}
@Override
public Publisher<String> tryAcquire(final long waitTime, final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<String>>() {
@Override
public RFuture<String> get() {
return instance.tryAcquireAsync(waitTime, leaseTime, unit);
}
});
}
@Override
public Publisher<Boolean> tryRelease(final String permitId) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryReleaseAsync(permitId);
}
});
}
@Override
public Publisher<Void> release(final String permitId) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.releaseAsync(permitId);
}
});
}
@Override
public Publisher<Integer> availablePermits() {
return reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.availablePermitsAsync();
}
});
}
@Override
public Publisher<Boolean> trySetPermits(final int permits) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetPermitsAsync(permits);
}
});
}
@Override
public Publisher<Void> addPermits(final int permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.addPermitsAsync(permits);
}
});
}
@Override
public Publisher<Boolean> updateLeaseTime(final String permitId, final long leaseTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.updateLeaseTimeAsync(permitId, leaseTime, unit);
}
});
}
}

@ -1,119 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonRateLimiter;
import org.redisson.api.RFuture;
import org.redisson.api.RRateLimiterAsync;
import org.redisson.api.RRateLimiterReactive;
import org.redisson.api.RateIntervalUnit;
import org.redisson.api.RateType;
import org.redisson.command.CommandReactiveExecutor;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonRateLimiterReactive extends RedissonObjectReactive implements RRateLimiterReactive {
private final RRateLimiterAsync instance;
public RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name) {
this(connectionManager, name, new RedissonRateLimiter(connectionManager, name));
}
private RedissonRateLimiterReactive(CommandReactiveExecutor connectionManager, String name, RRateLimiterAsync instance) {
super(connectionManager, name, instance);
this.instance = instance;
}
@Override
public Publisher<Boolean> trySetRate(final RateType mode, final long rate, final long rateInterval,
final RateIntervalUnit rateIntervalUnit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetRateAsync(mode, rate, rateInterval, rateIntervalUnit);
}
});
}
@Override
public Publisher<Boolean> tryAcquire() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync();
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final long permits) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(permits);
}
});
}
@Override
public Publisher<Void> acquire() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.acquireAsync();
}
});
}
@Override
public Publisher<Void> acquire(final long permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.acquireAsync(permits);
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final long timeout, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(timeout, unit);
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final long permits, final long timeout, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(permits, timeout, unit);
}
});
}
}

@ -37,12 +37,12 @@ public class RedissonReadWriteLockReactive extends RedissonExpirableReactive imp
@Override
public RLockReactive readLock() {
return new RedissonLockReactive(commandExecutor, getName(), instance.readLock());
return ReactiveProxyBuilder.create(commandExecutor, instance.readLock(), RLockReactive.class);
}
@Override
public RLockReactive writeLock() {
return new RedissonLockReactive(commandExecutor, getName(), instance.writeLock());
return ReactiveProxyBuilder.create(commandExecutor, instance.writeLock(), RLockReactive.class);
}

@ -1,143 +0,0 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSemaphore;
import org.redisson.api.RFuture;
import org.redisson.api.RSemaphoreAsync;
import org.redisson.api.RSemaphoreReactive;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.pubsub.SemaphorePubSub;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSemaphoreReactive extends RedissonExpirableReactive implements RSemaphoreReactive {
private final RSemaphoreAsync instance;
public RedissonSemaphoreReactive(CommandReactiveExecutor connectionManager, String name, SemaphorePubSub semaphorePubSub) {
super(connectionManager, name, new RedissonSemaphore(connectionManager, name, semaphorePubSub));
instance = (RSemaphoreAsync) super.instance;
}
@Override
public Publisher<Boolean> tryAcquire() {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync();
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final int permits) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(permits);
}
});
}
@Override
public Publisher<Void> acquire() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.acquireAsync();
}
});
}
@Override
public Publisher<Void> acquire(final int permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.acquireAsync(permits);
}
});
}
@Override
public Publisher<Void> release() {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.releaseAsync();
}
});
}
@Override
public Publisher<Void> release(final int permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.releaseAsync(permits);
}
});
}
@Override
public Publisher<Boolean> trySetPermits(final int permits) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.trySetPermitsAsync(permits);
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final long waitTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(waitTime, unit);
}
});
}
@Override
public Publisher<Boolean> tryAcquire(final int permits, final long waitTime, final TimeUnit unit) {
return reactive(new Supplier<RFuture<Boolean>>() {
@Override
public RFuture<Boolean> get() {
return instance.tryAcquireAsync(permits, waitTime, unit);
}
});
}
@Override
public Publisher<Void> reducePermits(final int permits) {
return reactive(new Supplier<RFuture<Void>>() {
@Override
public RFuture<Void> get() {
return instance.reducePermitsAsync(permits);
}
});
}
}

@ -48,12 +48,12 @@ public class RedissonTransactionReactive implements RTransactionReactive {
@Override
public <V> RBucketReactive<V> getBucket(String name) {
return new RedissonBucketReactive<V>(executorService, name, transaction.<V>getBucket(name));
return ReactiveProxyBuilder.create(executorService, transaction.<V>getBucket(name), RBucketReactive.class);
}
@Override
public <V> RBucketReactive<V> getBucket(String name, Codec codec) {
return new RedissonBucketReactive<V>(codec, executorService, name, transaction.<V>getBucket(name, codec));
return ReactiveProxyBuilder.create(executorService, transaction.<V>getBucket(name, codec), RBucketReactive.class);
}
@Override

@ -93,19 +93,6 @@ public class RedissonBitSetReactiveTest extends BaseReactiveTest {
Assert.assertEquals(16, sync(bs.size()).intValue());
}
@Test
public void testAsBitSet() {
RBitSetReactive bs = redisson.getBitSet("testbitset");
sync(bs.set(3, true));
sync(bs.set(41, true));
Assert.assertEquals(48, sync(bs.size()).intValue());
BitSet bitset = sync(bs.asBitSet());
Assert.assertTrue(bitset.get(3));
Assert.assertTrue(bitset.get(41));
Assert.assertEquals(2, bitset.cardinality());
}
@Test
public void testAnd() {
RBitSetReactive bs1 = redisson.getBitSet("testbitset1");

@ -1,16 +1,25 @@
package org.redisson;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.List;
import static org.junit.Assert.*;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.redisson.api.*;
import org.redisson.api.BatchOptions;
import org.redisson.api.RBatch;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBucket;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RSetReactive;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.redisson.reactive.RedissonBucketReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
*
@ -25,13 +34,13 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
RBucketReactive<Object> b3 = redisson.getBucket("b3");
sync(b2.set(b3));
sync(b1.set(redisson.getBucket("b2")));
assertTrue(sync(b1.get()).getClass().equals(RedissonBucketReactive.class));
assertTrue(sync(b1.get()) instanceof RBucketReactive);
assertEquals("b3", ((RBucketReactive) sync(((RBucketReactive) sync(b1.get())).get())).getName());
RBucketReactive<Object> b4 = redisson.getBucket("b4");
sync(b4.set(redisson.getMapCache("testCache")));
assertTrue(sync(b4.get()) instanceof RedissonMapCacheReactive);
sync(((RedissonMapCacheReactive) sync(b4.get())).fastPut(b1, b2));
assertEquals("b2", ((RBucketReactive) sync(((RedissonMapCacheReactive) sync(b4.get())).get(b1))).getName());
assertTrue(sync(b4.get()) instanceof RMapCacheReactive);
sync(((RMapCacheReactive) sync(b4.get())).fastPut(b1, b2));
assertEquals("b2", ((RBucketReactive) sync(((RMapCacheReactive) sync(b4.get())).get(b1))).getName());
}
@Test

Loading…
Cancel
Save