|
|
|
@ -15,12 +15,6 @@
|
|
|
|
|
*/
|
|
|
|
|
package org.redisson.reactive;
|
|
|
|
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
|
|
|
|
import org.reactivestreams.Publisher;
|
|
|
|
|
import org.redisson.RedissonSetCache;
|
|
|
|
|
import org.redisson.ScanIterator;
|
|
|
|
@ -34,9 +28,7 @@ import org.redisson.api.RedissonReactiveClient;
|
|
|
|
|
import org.redisson.client.RedisClient;
|
|
|
|
|
import org.redisson.client.protocol.decoder.ListScanResult;
|
|
|
|
|
|
|
|
|
|
import io.netty.buffer.ByteBuf;
|
|
|
|
|
import reactor.core.publisher.Flux;
|
|
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
@ -58,7 +50,7 @@ public class RedissonSetCacheReactive<V> {
|
|
|
|
|
return Flux.create(new SetReactiveIterator<V>() {
|
|
|
|
|
@Override
|
|
|
|
|
protected RFuture<ListScanResult<Object>> scanIterator(RedisClient client, long nextIterPos) {
|
|
|
|
|
return ((ScanIterator)instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
|
|
|
|
|
return ((ScanIterator) instance).scanIteratorAsync(instance.getName(), client, nextIterPos, null, 10);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
@ -67,33 +59,33 @@ public class RedissonSetCacheReactive<V> {
|
|
|
|
|
return new PublisherAdder<V>() {
|
|
|
|
|
@Override
|
|
|
|
|
public RFuture<Boolean> add(Object o) {
|
|
|
|
|
return instance.addAsync((V)o);
|
|
|
|
|
return instance.addAsync((V) o);
|
|
|
|
|
}
|
|
|
|
|
}.addAll(c);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(V value) {
|
|
|
|
|
String name = ((RedissonSetCache<V>)instance).getLockName(value, "permitexpirablesemaphore");
|
|
|
|
|
String name = ((RedissonSetCache<V>) instance).getLockName(value, "permitexpirablesemaphore");
|
|
|
|
|
return redisson.getPermitExpirableSemaphore(name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RSemaphoreReactive getSemaphore(V value) {
|
|
|
|
|
String name = ((RedissonSetCache<V>)instance).getLockName(value, "semaphore");
|
|
|
|
|
String name = ((RedissonSetCache<V>) instance).getLockName(value, "semaphore");
|
|
|
|
|
return redisson.getSemaphore(name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RLockReactive getFairLock(V value) {
|
|
|
|
|
String name = ((RedissonSetCache<V>)instance).getLockName(value, "fairlock");
|
|
|
|
|
String name = ((RedissonSetCache<V>) instance).getLockName(value, "fairlock");
|
|
|
|
|
return redisson.getFairLock(name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RReadWriteLockReactive getReadWriteLock(V value) {
|
|
|
|
|
String name = ((RedissonSetCache<V>)instance).getLockName(value, "rw_lock");
|
|
|
|
|
String name = ((RedissonSetCache<V>) instance).getLockName(value, "rw_lock");
|
|
|
|
|
return redisson.getReadWriteLock(name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public RLockReactive getLock(V value) {
|
|
|
|
|
String name = ((RedissonSetCache<V>)instance).getLockName(value, "lock");
|
|
|
|
|
String name = ((RedissonSetCache<V>) instance).getLockName(value, "lock");
|
|
|
|
|
return redisson.getLock(name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|