refactoring

pull/1705/head
Nikita 7 years ago
parent 3de7f75b06
commit 0e78d16f82

@ -148,11 +148,13 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return removeAsync(o, 1);
}
protected RFuture<Boolean> removeAsync(Object o, int count) {
@Override
public RFuture<Boolean> removeAsync(Object o, int count) {
return commandExecutor.writeAsync(getName(), codec, LREM_SINGLE, getName(), count, encode(o));
}
protected boolean remove(Object o, int count) {
@Override
public boolean remove(Object o, int count) {
return get(removeAsync(o, count));
}
@ -391,17 +393,19 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
public void add(int index, V element) {
addAll(index, Collections.singleton(element));
}
@Override
public V remove(int index) {
return remove((long) index);
public RFuture<Boolean> addAsync(int index, V element) {
return addAllAsync(index, Collections.singleton(element));
}
public V remove(long index) {
@Override
public V remove(int index) {
return get(removeAsync(index));
}
public RFuture<V> removeAsync(long index) {
@Override
public RFuture<V> removeAsync(int index) {
if (index == 0) {
return commandExecutor.writeAsync(getName(), codec, LPOP, getName());
}
@ -421,7 +425,7 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
}
@Override
public RFuture<Void> fastRemoveAsync(long index) {
public RFuture<Void> fastRemoveAsync(int index) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');",

@ -187,6 +187,11 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
public RFuture<Boolean> addAsync(V e) {
return list.addAsync(e);
}
@Override
public RFuture<Boolean> addAsync(int index, V element) {
return list.addAsync(index, element);
}
@Override
public boolean remove(Object o) {
@ -198,7 +203,8 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
return removeAsync(o, 1);
}
protected RFuture<Boolean> removeAsync(Object o, int count) {
@Override
public RFuture<Boolean> removeAsync(Object o, int count) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
@ -213,7 +219,8 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
System.currentTimeMillis(), count, encodeMapKey(key), encodeMapValue(o));
}
protected boolean remove(Object o, int count) {
@Override
public boolean remove(Object o, int count) {
return get(removeAsync(o, count));
}
@ -438,7 +445,7 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
}
@Override
public RFuture<V> removeAsync(long index) {
public RFuture<V> removeAsync(int index) {
return list.removeAsync(index);
}
@ -448,7 +455,7 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
}
@Override
public RFuture<Void> fastRemoveAsync(long index) {
public RFuture<Void> fastRemoveAsync(int index) {
return list.fastRemoveAsync(index);
}

@ -60,7 +60,6 @@ import org.redisson.api.RTransactionReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.command.CommandReactiveService;
import org.redisson.config.Config;
@ -266,14 +265,14 @@ public class RedissonReactive implements RedissonReactiveClient {
public <V> RSetReactive<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(commandExecutor, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}
@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(commandExecutor, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}
@Override
@ -290,8 +289,9 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RLexSortedSetReactive getLexSortedSet(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonLexSortedSet(commandExecutor, name, null),
new RedissonLexSortedSetReactive(commandExecutor, new RedissonScoredSortedSetReactive<String>(StringCodec.INSTANCE, commandExecutor, name)),
RedissonLexSortedSet set = new RedissonLexSortedSet(commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonLexSortedSetReactive(set),
RLexSortedSetReactive.class);
}
@ -355,14 +355,14 @@ public class RedissonReactive implements RedissonReactiveClient {
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(commandExecutor, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(commandExecutor, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}
@Override

@ -96,7 +96,7 @@ public interface RCollectionReactive<V> extends RExpirableReactive {
* @return <code>true</code> if an element was added
* and <code>false</code> if it is already present
*/
Publisher<Integer> add(V e);
Publisher<Boolean> add(V e);
/**
* Adds all elements contained in the specified collection
@ -105,7 +105,7 @@ public interface RCollectionReactive<V> extends RExpirableReactive {
* @return <code>true</code> if at least one element was added
* and <code>false</code> if all elements are already present
*/
Publisher<Integer> addAll(Publisher<? extends V> c);
Publisher<Boolean> addAll(Publisher<? extends V> c);
/**
* Adds all elements contained in the specified collection
@ -114,6 +114,6 @@ public interface RCollectionReactive<V> extends RExpirableReactive {
* @return <code>true</code> if at least one element was added
* and <code>false</code> if all elements are already present
*/
Publisher<Integer> addAll(Collection<? extends V> c);
Publisher<Boolean> addAll(Collection<? extends V> c);
}

@ -99,4 +99,6 @@ public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<
*/
void fastRemove(int index);
boolean remove(Object o, int count);
}

@ -54,6 +54,8 @@ public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<
*/
RFuture<Integer> addBeforeAsync(V elementToFind, V element);
RFuture<Boolean> addAsync(int index, V element);
RFuture<Boolean> addAllAsync(int index, Collection<? extends V> coll);
RFuture<Integer> lastIndexOfAsync(Object o);
@ -92,8 +94,10 @@ public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<
*/
RFuture<Void> trimAsync(int fromIndex, int toIndex);
RFuture<Void> fastRemoveAsync(long index);
RFuture<Void> fastRemoveAsync(int index);
RFuture<V> removeAsync(long index);
RFuture<V> removeAsync(int index);
RFuture<Boolean> removeAsync(Object o, int count);
}

@ -62,21 +62,21 @@ public interface RListReactive<V> extends RCollectionReactive<V>, RSortableReact
Publisher<V> iterator(int startIndex);
Publisher<Long> lastIndexOf(Object o);
Publisher<Integer> lastIndexOf(Object o);
Publisher<Long> indexOf(Object o);
Publisher<Integer> indexOf(Object o);
Publisher<Integer> add(long index, V element);
Publisher<Void> add(int index, V element);
Publisher<Integer> addAll(long index, Collection<? extends V> coll);
Publisher<Boolean> addAll(int index, Collection<? extends V> coll);
Publisher<Void> fastSet(long index, V element);
Publisher<Void> fastSet(int index, V element);
Publisher<V> set(long index, V element);
Publisher<V> set(int index, V element);
Publisher<V> get(long index);
Publisher<V> get(int index);
Publisher<V> remove(long index);
Publisher<V> remove(int index);
/**
* Read all elements at once
@ -101,6 +101,6 @@ public interface RListReactive<V> extends RCollectionReactive<V>, RSortableReact
* @param index - index of object
* @return void
*/
Publisher<Void> fastRemove(long index);
Publisher<Void> fastRemove(int index);
}

@ -19,7 +19,10 @@ import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.action.support.DefaultSubscriber;
@ -32,21 +35,17 @@ import reactor.rx.action.support.DefaultSubscriber;
*/
public abstract class PublisherAdder<V> {
public abstract Publisher<Integer> add(Object o);
public abstract RFuture<Boolean> add(Object o);
public Integer sum(Integer first, Integer second) {
return first + second;
}
public Publisher<Integer> addAll(Publisher<? extends V> c) {
final Promise<Integer> promise = Promises.prepare();
public Publisher<Boolean> addAll(Publisher<? extends V> c) {
final Promise<Boolean> promise = Promises.prepare();
c.subscribe(new DefaultSubscriber<V>() {
volatile boolean completed;
AtomicLong values = new AtomicLong();
Subscription s;
Integer lastSize = 0;
Boolean lastSize = false;
@Override
public void onSubscribe(Subscription s) {
@ -57,21 +56,17 @@ public abstract class PublisherAdder<V> {
@Override
public void onNext(V o) {
values.getAndIncrement();
add(o).subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onError(Throwable t) {
promise.onError(t);
}
add(o).addListener(new FutureListener<Boolean>() {
@Override
public void onNext(Integer o) {
lastSize = sum(lastSize, o);
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.onError(future.cause());
return;
}
if (future.getNow()) {
lastSize = true;
}
s.request(1);
if (values.decrementAndGet() == 0 && completed) {
promise.onNext(lastSize);

@ -69,6 +69,7 @@ public class ReactiveProxyBuilder {
final Method mm = instanceMethod;
if (instanceMethod.getName().endsWith("Async")) {
return commandExecutor.reactive(new Supplier<RFuture<Object>>() {
@SuppressWarnings("unchecked")
@Override
public RFuture<Object> get() {
try {

@ -169,14 +169,14 @@ public class RedissonBatchReactive implements RBatchReactive {
public <V> RSetReactive<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(executorService, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}
@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(executorService, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}
@Override
@ -234,14 +234,14 @@ public class RedissonBatchReactive implements RBatchReactive {
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}
@Override
@ -258,8 +258,9 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public RLexSortedSetReactive getLexSortedSet(String name) {
return ReactiveProxyBuilder.create(executorService, new RedissonLexSortedSet(executorService, name, null),
new RedissonLexSortedSetReactive(executorService, new RedissonScoredSortedSetReactive<String>(StringCodec.INSTANCE, executorService, name)),
RedissonLexSortedSet set = new RedissonLexSortedSet(executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonLexSortedSetReactive(set),
RLexSortedSetReactive.class);
}

@ -15,14 +15,12 @@
*/
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.api.RFuture;
import org.redisson.api.RLexSortedSet;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
/**
*
@ -31,51 +29,48 @@ import org.redisson.command.CommandReactiveExecutor;
*/
public class RedissonLexSortedSetReactive {
private final RedissonScoredSortedSetReactive<String> instance;
private final CommandReactiveExecutor commandExecutor;
private final RLexSortedSet instance;
public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, RedissonScoredSortedSetReactive<String> instance) {
this.commandExecutor = commandExecutor;
public RedissonLexSortedSetReactive(RLexSortedSet instance) {
this.instance = instance;
}
public Publisher<Integer> addAll(Publisher<? extends String> c) {
public Publisher<Boolean> addAll(Publisher<? extends String> c) {
return new PublisherAdder<String>() {
@Override
public Publisher<Integer> add(Object e) {
return RedissonLexSortedSetReactive.this.add(e);
public RFuture<Boolean> add(Object e) {
return instance.addAsync((String)e);
}
}.addAll(c);
}
private Publisher<String> scanIteratorReactive(final String pattern, final int count) {
return new SetReactiveIterator<String>() {
@Override
protected RFuture<ListScanResult<Object>> scanIterator(final RedisClient client, final long nextIterPos) {
return ((RedissonScoredSortedSet<String>)instance).scanIteratorAsync(client, nextIterPos, pattern, count);
}
};
}
public String getName() {
return ((RedissonScoredSortedSet)instance).getName();
}
public Publisher<String> iterator() {
return instance.iterator();
return scanIteratorReactive(null, 10);
}
public Publisher<String> iterator(String pattern) {
return instance.iterator(pattern);
return scanIteratorReactive(pattern, 10);
}
public Publisher<String> iterator(int count) {
return instance.iterator(count);
return scanIteratorReactive(null, count);
}
public Publisher<String> iterator(String pattern, int count) {
return instance.iterator(pattern, count);
}
public Publisher<Integer> add(Object e) {
return commandExecutor.writeReactive(instance.getName(), StringCodec.INSTANCE, RedisCommands.ZADD_INT, instance.getName(), 0, e);
}
public Publisher<Integer> addAll(Collection<? extends String> c) {
List<Object> params = new ArrayList<Object>(2*c.size());
params.add(instance.getName());
for (Object param : c) {
params.add(0);
params.add(param);
}
return commandExecutor.writeReactive(instance.getName(), StringCodec.INSTANCE, RedisCommands.ZADD_INT, params.toArray());
return scanIteratorReactive(pattern, count);
}
}

@ -15,27 +15,15 @@
*/
package org.redisson.reactive;
import static org.redisson.client.protocol.RedisCommands.LINDEX;
import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE;
import static org.redisson.client.protocol.RedisCommands.RPUSH;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.RedissonList;
import org.redisson.RedissonObject;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
@ -49,15 +37,12 @@ import reactor.rx.subscription.ReactiveSubscription;
public class RedissonListReactive<V> {
private final RedissonList<V> instance;
private final CommandReactiveExecutor commandExecutor;
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.instance = new RedissonList<V>(commandExecutor, name, null);
}
public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
this.commandExecutor = commandExecutor;
this.instance = new RedissonList<V>(codec, commandExecutor, name, null);
}
@ -89,33 +74,25 @@ public class RedissonListReactive<V> {
@Override
protected void onRequest(final long n) {
final ReactiveSubscription<V> m = this;
get(currentIndex).subscribe(new Subscriber<V>() {
V currValue;
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
instance.getAsync(currentIndex).addListener(new FutureListener<V>() {
@Override
public void onNext(V value) {
currValue = value;
m.onNext(value);
if (forward) {
currentIndex++;
} else {
currentIndex--;
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
m.onError(future.cause());
return;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
}
@Override
public void onComplete() {
if (currValue == null) {
V value = future.getNow();
if (value != null) {
m.onNext(value);
if (forward) {
currentIndex++;
} else {
currentIndex--;
}
}
if (value == null) {
m.onComplete();
return;
}
@ -132,120 +109,15 @@ public class RedissonListReactive<V> {
};
}
public Publisher<Integer> add(V e) {
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RPUSH, instance.getName(), ((RedissonObject)instance).encode(e));
}
protected Publisher<Boolean> remove(Object o, int count) {
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), LREM_SINGLE, instance.getName(), count, ((RedissonObject)instance).encode(o));
}
public Publisher<Integer> addAll(Publisher<? extends V> c) {
public Publisher<Boolean> addAll(Publisher<? extends V> c) {
return new PublisherAdder<V>() {
@Override
public Integer sum(Integer first, Integer second) {
return second;
}
@Override
public Publisher<Integer> add(Object o) {
return RedissonListReactive.this.add((V)o);
public RFuture<Boolean> add(Object o) {
return instance.addAsync((V)o);
}
}.addAll(c);
}
public Publisher<Integer> addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return commandExecutor.reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sizeAsync();
}
});
}
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(instance.getName());
((RedissonObject)instance).encode(args, c);
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RPUSH, args.toArray());
}
public Publisher<Integer> addAll(long index, Collection<? extends V> coll) {
if (index < 0) {
throw new IndexOutOfBoundsException("index: " + index);
}
if (coll.isEmpty()) {
return commandExecutor.reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
return instance.sizeAsync();
}
});
}
if (index == 0) { // prepend elements to list
List<Object> elements = new ArrayList<Object>();
((RedissonObject)instance).encode(elements, coll);
Collections.reverse(elements);
elements.add(0, instance.getName());
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.LPUSH, elements.toArray());
}
List<Object> args = new ArrayList<Object>(coll.size() + 1);
args.add(index);
((RedissonObject)instance).encode(args, coll);
return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_INTEGER,
"local ind = table.remove(ARGV, 1); " + // index is the first parameter
"local size = redis.call('llen', KEYS[1]); " +
"assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " +
"local tail = redis.call('lrange', KEYS[1], ind, -1); " +
"redis.call('ltrim', KEYS[1], 0, ind - 1); " +
"for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" +
"for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" +
"return redis.call('llen', KEYS[1]);",
Collections.<Object>singletonList(instance.getName()), args.toArray());
}
public Publisher<V> get(long index) {
return commandExecutor.readReactive(instance.getName(), instance.getCodec(), LINDEX, instance.getName(), index);
}
public Publisher<V> set(long index, V element) {
return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_OBJECT,
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
"return v",
Collections.<Object>singletonList(instance.getName()), index, ((RedissonObject)instance).encode(element));
}
public Publisher<Void> fastSet(long index, V element) {
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.LSET, instance.getName(), index, ((RedissonObject)instance).encode(element));
}
public Publisher<Integer> add(long index, V element) {
return addAll(index, Collections.singleton(element));
}
public Publisher<Long> indexOf(final Object o) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return ((RedissonList)instance).indexOfAsync(o, new LongReplayConvertor());
}
});
}
public Publisher<Long> lastIndexOf(final Object o) {
return commandExecutor.reactive(new Supplier<RFuture<Long>>() {
@Override
public RFuture<Long> get() {
return ((RedissonList)instance).lastIndexOfAsync(o, new LongReplayConvertor());
}
});
}
}

@ -15,23 +15,12 @@
*/
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
import org.redisson.ScanIterator;
import org.redisson.api.RFuture;
import org.redisson.api.RSetCache;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import io.netty.buffer.ByteBuf;
import reactor.rx.Streams;
/**
*
@ -42,10 +31,8 @@ import reactor.rx.Streams;
public class RedissonSetCacheReactive<V> {
private final RSetCache<V> instance;
private final CommandReactiveExecutor commandExecutor;
public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, RSetCache<V> instance) {
this.commandExecutor = commandExecutor;
public RedissonSetCacheReactive(RSetCache<V> instance) {
this.instance = instance;
}
@ -58,40 +45,11 @@ public class RedissonSetCacheReactive<V> {
};
}
public Publisher<Integer> add(V value) {
long timeoutDate = 92233720368547758L;
return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_INTEGER,
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
+ "if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then "
+ "return 0;"
+ "end; " +
"redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " +
"return 1; ",
Arrays.<Object>asList(instance.getName()), System.currentTimeMillis(), timeoutDate, ((RedissonSetCache)instance).encode(value));
}
public Publisher<Integer> addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return Streams.just(0);
}
long score = 92233720368547758L - System.currentTimeMillis();
List<Object> params = new ArrayList<Object>(c.size()*2 + 1);
params.add(instance.getName());
for (V value : c) {
ByteBuf objectState = ((RedissonSetCache)instance).encode(value);
params.add(score);
params.add(objectState);
}
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.ZADD_RAW, params.toArray());
}
public Publisher<Integer> addAll(Publisher<? extends V> c) {
public Publisher<Boolean> addAll(Publisher<? extends V> c) {
return new PublisherAdder<V>() {
@Override
public Publisher<Integer> add(Object o) {
return RedissonSetCacheReactive.this.add((V)o);
public RFuture<Boolean> add(Object o) {
return instance.addAsync((V)o);
}
}.addAll(c);
}

@ -45,7 +45,7 @@ public class RedissonSetMultimapReactive<K, V> {
public RSetReactive<V> get(K key) {
RSet<V> set = ((RSetMultimap<K, V>)instance).get(key);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(commandExecutor, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}
}

@ -15,22 +15,12 @@
*/
package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.RedissonObject;
import org.redisson.RedissonSet;
import org.redisson.api.RFuture;
import org.redisson.api.RSet;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
@ -42,63 +32,20 @@ import reactor.fn.Supplier;
public class RedissonSetReactive<V> {
private final RSet<V> instance;
private final CommandReactiveExecutor commandExecutor;
public RedissonSetReactive(CommandReactiveExecutor commandExecutor, RSet<V> instance) {
this.commandExecutor = commandExecutor;
public RedissonSetReactive(RSet<V> instance) {
this.instance = instance;
}
public Publisher<Integer> addAll(Publisher<? extends V> c) {
public Publisher<Boolean> addAll(Publisher<? extends V> c) {
return new PublisherAdder<Object>() {
@Override
public Publisher<Integer> add(Object e) {
return RedissonSetReactive.this.add((V)e);
public RFuture<Boolean> add(Object e) {
return instance.addAsync((V)e);
}
}.addAll(c);
}
private Publisher<ListScanResult<Object>> scanIteratorReactive(final RedisClient client, final long startPos, final String pattern, final int count) {
return commandExecutor.reactive(new Supplier<RFuture<ListScanResult<Object>>>() {
@Override
public RFuture<ListScanResult<Object>> get() {
return ((RedissonSet)instance).scanIteratorAsync(instance.getName(), client, startPos, pattern, count);
}
});
}
public Publisher<Integer> add(V e) {
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SADD, instance.getName(), ((RedissonObject)instance).encode(e));
}
public Publisher<Integer> addAll(Collection<? extends V> c) {
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(instance.getName());
((RedissonObject)instance).encode(args, c);
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SADD, args.toArray());
}
public Publisher<Long> intersection(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(instance.getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SINTERSTORE, args.toArray());
}
public Publisher<Long> diff(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(instance.getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SDIFFSTORE, args.toArray());
}
public Publisher<Long> union(String... names) {
List<Object> args = new ArrayList<Object>(names.length + 1);
args.add(instance.getName());
args.addAll(Arrays.asList(names));
return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SUNIONSTORE, args.toArray());
}
public Publisher<V> iterator(int count) {
return iterator(null, count);
}

@ -92,28 +92,28 @@ public class RedissonTransactionReactive implements RTransactionReactive {
public <V> RSetReactive<V> getSet(String name) {
RSet<V> set = transaction.<V>getSet(name);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(executorService, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}
@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
RSet<V> set = transaction.<V>getSet(name, codec);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(executorService, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = transaction.<V>getSetCache(name);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}
@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = transaction.<V>getSetCache(name, codec);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}
@Override

@ -18,28 +18,28 @@ public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest {
@Test
public void testAddAllReactive() {
RLexSortedSetReactive list = redisson.getLexSortedSet("set");
Assert.assertTrue(sync(list.add("1")) == 1);
Assert.assertTrue(sync(list.add("2")) == 1);
Assert.assertTrue(sync(list.add("3")) == 1);
Assert.assertTrue(sync(list.add("4")) == 1);
Assert.assertTrue(sync(list.add("5")) == 1);
Assert.assertTrue(sync(list.add("1")));
Assert.assertTrue(sync(list.add("2")));
Assert.assertTrue(sync(list.add("3")));
Assert.assertTrue(sync(list.add("4")));
Assert.assertTrue(sync(list.add("5")));
RLexSortedSetReactive list2 = redisson.getLexSortedSet("set2");
Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue());
Assert.assertEquals(true, sync(list2.addAll(list.iterator())));
Assert.assertEquals(5, sync(list2.size()).intValue());
}
@Test
public void testRemoveLexRangeTail() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
Assert.assertTrue(sync(set.add("a")) == 1);
Assert.assertFalse(sync(set.add("a")) == 1);
Assert.assertTrue(sync(set.add("b")) == 1);
Assert.assertTrue(sync(set.add("c")) == 1);
Assert.assertTrue(sync(set.add("d")) == 1);
Assert.assertTrue(sync(set.add("e")) == 1);
Assert.assertTrue(sync(set.add("f")) == 1);
Assert.assertTrue(sync(set.add("g")) == 1);
Assert.assertTrue(sync(set.add("a")));
Assert.assertFalse(sync(set.add("a")));
Assert.assertTrue(sync(set.add("b")));
Assert.assertTrue(sync(set.add("c")));
Assert.assertTrue(sync(set.add("d")));
Assert.assertTrue(sync(set.add("e")));
Assert.assertTrue(sync(set.add("f")));
Assert.assertTrue(sync(set.add("g")));
Assert.assertEquals(0, sync(set.removeRangeTail("z", false)).intValue());
@ -86,14 +86,14 @@ public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest {
@Test
public void testLexRangeTail() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");
Assert.assertTrue(sync(set.add("a")) == 1);
Assert.assertFalse(sync(set.add("a")) == 1);
Assert.assertTrue(sync(set.add("b")) == 1);
Assert.assertTrue(sync(set.add("c")) == 1);
Assert.assertTrue(sync(set.add("d")) == 1);
Assert.assertTrue(sync(set.add("e")) == 1);
Assert.assertTrue(sync(set.add("f")) == 1);
Assert.assertTrue(sync(set.add("g")) == 1);
Assert.assertTrue(sync(set.add("a")));
Assert.assertFalse(sync(set.add("a")));
Assert.assertTrue(sync(set.add("b")));
Assert.assertTrue(sync(set.add("c")));
Assert.assertTrue(sync(set.add("d")));
Assert.assertTrue(sync(set.add("e")));
Assert.assertTrue(sync(set.add("f")));
Assert.assertTrue(sync(set.add("g")));
assertThat(sync(set.rangeTail("c", false))).containsExactly("d", "e", "f", "g");
assertThat(sync(set.rangeTail("c", true))).containsExactly("c", "d", "e", "f", "g");

@ -44,7 +44,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(5));
RListReactive<Integer> list2 = redisson.getList("list2");
Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue());
Assert.assertEquals(true, sync(list2.addAll(list.iterator())));
Assert.assertEquals(5, sync(list2.size()).intValue());
}
@ -52,13 +52,13 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
public void testAddAllWithIndex() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");
final CountDownLatch latch = new CountDownLatch(1);
list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new Promise<Integer>() {
list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new Promise<Boolean>() {
@Override
public void onNext(Integer element) {
list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new Promise<Integer>() {
public void onNext(Boolean element) {
list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new Promise<Boolean>() {
@Override
public void onNext(Integer value) {
public void onNext(Boolean value) {
latch.countDown();
}
@ -84,12 +84,12 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
public void testAdd() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");
final CountDownLatch latch = new CountDownLatch(1);
list.add(1L).subscribe(new Promise<Integer>() {
list.add(1L).subscribe(new Promise<Boolean>() {
@Override
public void onNext(Integer value) {
list.add(2L).subscribe(new Promise<Integer>() {
public void onNext(Boolean value) {
list.add(2L).subscribe(new Promise<Boolean>() {
@Override
public void onNext(Integer value) {
public void onNext(Boolean value) {
latch.countDown();
}
@ -240,7 +240,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(3));
sync(list.add(10));
long index = sync(list.lastIndexOf(3));
int index = sync(list.lastIndexOf(3));
Assert.assertEquals(8, index);
}
@ -286,7 +286,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
assertThat(sync(list)).containsExactly(1, 2, 3, 4, 6);
}
@Test(expected = RedisException.class)
@Test(expected = IndexOutOfBoundsException.class)
public void testSetFail() throws InterruptedException {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
@ -397,7 +397,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(4));
sync(list.add(5));
Assert.assertEquals(8, sync(list.addAll(2, Arrays.asList(7, 8, 9))).longValue());
Assert.assertEquals(true, sync(list.addAll(2, Arrays.asList(7, 8, 9))));
assertThat(sync(list)).containsExactly(1, 2, 7, 8, 9, 3, 4, 5);
@ -409,7 +409,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
assertThat(sync(list)).containsExactly(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5);
Assert.assertEquals(15, sync(list.addAll(0, Arrays.asList(6, 7))).intValue());
Assert.assertEquals(true, sync(list.addAll(0, Arrays.asList(6, 7))));
assertThat(sync(list)).containsExactly(6,7,1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5);
}
@ -423,9 +423,9 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(4));
sync(list.add(5));
Assert.assertEquals(8, sync(list.addAll(Arrays.asList(7, 8, 9))).intValue());
Assert.assertEquals(true, sync(list.addAll(Arrays.asList(7, 8, 9))));
Assert.assertEquals(11, sync(list.addAll(Arrays.asList(9, 1, 9))).intValue());
Assert.assertEquals(true, sync(list.addAll(Arrays.asList(9, 1, 9))));
assertThat(sync(list)).containsExactly(1, 2, 3, 4, 5, 7, 8, 9, 9, 1, 9);
}
@ -433,7 +433,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
@Test
public void testAddAllEmpty() {
RListReactive<Integer> list = redisson.getList("list");
Assert.assertEquals(0, sync(list.addAll(Collections.<Integer>emptyList())).intValue());
Assert.assertEquals(false, sync(list.addAll(Collections.<Integer>emptyList())));
Assert.assertEquals(0, sync(list.size()).intValue());
}

@ -104,7 +104,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
RBucketReactive<Object> b1 = reactive.getBucket("b1");
sync(b1.set(new MyObject()));
RSetReactive<Object> s1 = reactive.getSet("s1");
assertTrue(sync(s1.add(b1)) == 1);
assertTrue(sync(s1.add(b1)));
assertTrue(codec == b1.getCodec());
Config config1 = new Config();

@ -189,14 +189,14 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest {
@Test
public void testSize() {
RSetCacheReactive<Integer> set = redisson.getSetCache("set");
Assert.assertEquals(1, sync(set.add(1)).intValue());
Assert.assertEquals(1, sync(set.add(2)).intValue());
Assert.assertEquals(1, sync(set.add(3)).intValue());
Assert.assertEquals(0, sync(set.add(3)).intValue());
Assert.assertEquals(0, sync(set.add(3)).intValue());
Assert.assertEquals(1, sync(set.add(4)).intValue());
Assert.assertEquals(1, sync(set.add(5)).intValue());
Assert.assertEquals(0, sync(set.add(5)).intValue());
Assert.assertEquals(true, sync(set.add(1)));
Assert.assertEquals(true, sync(set.add(2)));
Assert.assertEquals(true, sync(set.add(3)));
Assert.assertEquals(false, sync(set.add(3)));
Assert.assertEquals(false, sync(set.add(3)));
Assert.assertEquals(true, sync(set.add(4)));
Assert.assertEquals(true, sync(set.add(5)));
Assert.assertEquals(false, sync(set.add(5)));
Assert.assertEquals(5, sync(set.size()).intValue());
}

@ -39,7 +39,7 @@ public class RedissonSetReactiveTest extends BaseReactiveTest {
sync(list.add(5));
RSetReactive<Integer> list2 = redisson.getSet("set2");
Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue());
Assert.assertEquals(true, sync(list2.addAll(list.iterator())));
Assert.assertEquals(5, sync(list2.size()).intValue());
}

Loading…
Cancel
Save