From 0e78d16f82b637510a54867e8f2eaf5944e1e4cb Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 5 Oct 2018 14:04:30 +0300 Subject: [PATCH] refactoring --- .../main/java/org/redisson/RedissonList.java | 22 ++- .../redisson/RedissonListMultimapValues.java | 15 +- .../java/org/redisson/RedissonReactive.java | 14 +- .../org/redisson/api/RCollectionReactive.java | 6 +- .../src/main/java/org/redisson/api/RList.java | 2 + .../java/org/redisson/api/RListAsync.java | 8 +- .../java/org/redisson/api/RListReactive.java | 18 +- .../org/redisson/reactive/PublisherAdder.java | 39 ++-- .../reactive/ReactiveProxyBuilder.java | 1 + .../reactive/RedissonBatchReactive.java | 13 +- .../RedissonLexSortedSetReactive.java | 59 +++--- .../reactive/RedissonListReactive.java | 172 +++--------------- .../reactive/RedissonSetCacheReactive.java | 50 +---- .../reactive/RedissonSetMultimapReactive.java | 2 +- .../reactive/RedissonSetReactive.java | 61 +------ .../reactive/RedissonTransactionReactive.java | 8 +- .../RedissonLexSortedSetReactiveTest.java | 44 ++--- .../redisson/RedissonListReactiveTest.java | 32 ++-- .../RedissonReferenceReactiveTest.java | 2 +- .../RedissonSetCacheReactiveTest.java | 16 +- .../org/redisson/RedissonSetReactiveTest.java | 2 +- 21 files changed, 186 insertions(+), 400 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonList.java b/redisson/src/main/java/org/redisson/RedissonList.java index f0bf84810..4ac521f7f 100644 --- a/redisson/src/main/java/org/redisson/RedissonList.java +++ b/redisson/src/main/java/org/redisson/RedissonList.java @@ -148,11 +148,13 @@ public class RedissonList extends RedissonExpirable implements RList { return removeAsync(o, 1); } - protected RFuture removeAsync(Object o, int count) { + @Override + public RFuture removeAsync(Object o, int count) { return commandExecutor.writeAsync(getName(), codec, LREM_SINGLE, getName(), count, encode(o)); } - protected boolean remove(Object o, int count) { + @Override + public boolean remove(Object o, int count) { return get(removeAsync(o, count)); } @@ -391,17 +393,19 @@ public class RedissonList extends RedissonExpirable implements RList { public void add(int index, V element) { addAll(index, Collections.singleton(element)); } - + @Override - public V remove(int index) { - return remove((long) index); + public RFuture addAsync(int index, V element) { + return addAllAsync(index, Collections.singleton(element)); } - - public V remove(long index) { + + @Override + public V remove(int index) { return get(removeAsync(index)); } - public RFuture removeAsync(long index) { + @Override + public RFuture removeAsync(int index) { if (index == 0) { return commandExecutor.writeAsync(getName(), codec, LPOP, getName()); } @@ -421,7 +425,7 @@ public class RedissonList extends RedissonExpirable implements RList { } @Override - public RFuture fastRemoveAsync(long index) { + public RFuture fastRemoveAsync(int index) { return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, "redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" + "redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');", diff --git a/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java b/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java index 99385564c..4b0bbe617 100644 --- a/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java +++ b/redisson/src/main/java/org/redisson/RedissonListMultimapValues.java @@ -187,6 +187,11 @@ public class RedissonListMultimapValues extends RedissonExpirable implements public RFuture addAsync(V e) { return list.addAsync(e); } + + @Override + public RFuture addAsync(int index, V element) { + return list.addAsync(index, element); + } @Override public boolean remove(Object o) { @@ -198,7 +203,8 @@ public class RedissonListMultimapValues extends RedissonExpirable implements return removeAsync(o, 1); } - protected RFuture removeAsync(Object o, int count) { + @Override + public RFuture removeAsync(Object o, int count) { return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN, "local expireDate = 92233720368547758; " + "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " @@ -213,7 +219,8 @@ public class RedissonListMultimapValues extends RedissonExpirable implements System.currentTimeMillis(), count, encodeMapKey(key), encodeMapValue(o)); } - protected boolean remove(Object o, int count) { + @Override + public boolean remove(Object o, int count) { return get(removeAsync(o, count)); } @@ -438,7 +445,7 @@ public class RedissonListMultimapValues extends RedissonExpirable implements } @Override - public RFuture removeAsync(long index) { + public RFuture removeAsync(int index) { return list.removeAsync(index); } @@ -448,7 +455,7 @@ public class RedissonListMultimapValues extends RedissonExpirable implements } @Override - public RFuture fastRemoveAsync(long index) { + public RFuture fastRemoveAsync(int index) { return list.fastRemoveAsync(index); } diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index 45aac291a..f1820072b 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -60,7 +60,6 @@ import org.redisson.api.RTransactionReactive; import org.redisson.api.RedissonReactiveClient; import org.redisson.api.TransactionOptions; import org.redisson.client.codec.Codec; -import org.redisson.client.codec.StringCodec; import org.redisson.codec.ReferenceCodecProvider; import org.redisson.command.CommandReactiveService; import org.redisson.config.Config; @@ -266,14 +265,14 @@ public class RedissonReactive implements RedissonReactiveClient { public RSetReactive getSet(String name) { RedissonSet set = new RedissonSet(commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(commandExecutor, set), RSetReactive.class); + new RedissonSetReactive(set), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(commandExecutor, set), RSetReactive.class); + new RedissonSetReactive(set), RSetReactive.class); } @Override @@ -290,8 +289,9 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RLexSortedSetReactive getLexSortedSet(String name) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonLexSortedSet(commandExecutor, name, null), - new RedissonLexSortedSetReactive(commandExecutor, new RedissonScoredSortedSetReactive(StringCodec.INSTANCE, commandExecutor, name)), + RedissonLexSortedSet set = new RedissonLexSortedSet(commandExecutor, name, null); + return ReactiveProxyBuilder.create(commandExecutor, set, + new RedissonLexSortedSetReactive(set), RLexSortedSetReactive.class); } @@ -355,14 +355,14 @@ public class RedissonReactive implements RedissonReactiveClient { public RSetCacheReactive getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheReactive(commandExecutor, set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, commandExecutor, name, null); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetCacheReactive(commandExecutor, set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set), RSetCacheReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RCollectionReactive.java b/redisson/src/main/java/org/redisson/api/RCollectionReactive.java index ece3607dc..b7e6b09c8 100644 --- a/redisson/src/main/java/org/redisson/api/RCollectionReactive.java +++ b/redisson/src/main/java/org/redisson/api/RCollectionReactive.java @@ -96,7 +96,7 @@ public interface RCollectionReactive extends RExpirableReactive { * @return true if an element was added * and false if it is already present */ - Publisher add(V e); + Publisher add(V e); /** * Adds all elements contained in the specified collection @@ -105,7 +105,7 @@ public interface RCollectionReactive extends RExpirableReactive { * @return true if at least one element was added * and false if all elements are already present */ - Publisher addAll(Publisher c); + Publisher addAll(Publisher c); /** * Adds all elements contained in the specified collection @@ -114,6 +114,6 @@ public interface RCollectionReactive extends RExpirableReactive { * @return true if at least one element was added * and false if all elements are already present */ - Publisher addAll(Collection c); + Publisher addAll(Collection c); } diff --git a/redisson/src/main/java/org/redisson/api/RList.java b/redisson/src/main/java/org/redisson/api/RList.java index dba152867..34233cc09 100644 --- a/redisson/src/main/java/org/redisson/api/RList.java +++ b/redisson/src/main/java/org/redisson/api/RList.java @@ -99,4 +99,6 @@ public interface RList extends List, RExpirable, RListAsync, RSortable< */ void fastRemove(int index); + boolean remove(Object o, int count); + } diff --git a/redisson/src/main/java/org/redisson/api/RListAsync.java b/redisson/src/main/java/org/redisson/api/RListAsync.java index c2993317f..8d4dddbe3 100644 --- a/redisson/src/main/java/org/redisson/api/RListAsync.java +++ b/redisson/src/main/java/org/redisson/api/RListAsync.java @@ -54,6 +54,8 @@ public interface RListAsync extends RCollectionAsync, RSortableAsync addBeforeAsync(V elementToFind, V element); + RFuture addAsync(int index, V element); + RFuture addAllAsync(int index, Collection coll); RFuture lastIndexOfAsync(Object o); @@ -92,8 +94,10 @@ public interface RListAsync extends RCollectionAsync, RSortableAsync trimAsync(int fromIndex, int toIndex); - RFuture fastRemoveAsync(long index); + RFuture fastRemoveAsync(int index); - RFuture removeAsync(long index); + RFuture removeAsync(int index); + + RFuture removeAsync(Object o, int count); } diff --git a/redisson/src/main/java/org/redisson/api/RListReactive.java b/redisson/src/main/java/org/redisson/api/RListReactive.java index d7dd15ba8..9de36703e 100644 --- a/redisson/src/main/java/org/redisson/api/RListReactive.java +++ b/redisson/src/main/java/org/redisson/api/RListReactive.java @@ -62,21 +62,21 @@ public interface RListReactive extends RCollectionReactive, RSortableReact Publisher iterator(int startIndex); - Publisher lastIndexOf(Object o); + Publisher lastIndexOf(Object o); - Publisher indexOf(Object o); + Publisher indexOf(Object o); - Publisher add(long index, V element); + Publisher add(int index, V element); - Publisher addAll(long index, Collection coll); + Publisher addAll(int index, Collection coll); - Publisher fastSet(long index, V element); + Publisher fastSet(int index, V element); - Publisher set(long index, V element); + Publisher set(int index, V element); - Publisher get(long index); + Publisher get(int index); - Publisher remove(long index); + Publisher remove(int index); /** * Read all elements at once @@ -101,6 +101,6 @@ public interface RListReactive extends RCollectionReactive, RSortableReact * @param index - index of object * @return void */ - Publisher fastRemove(long index); + Publisher fastRemove(int index); } diff --git a/redisson/src/main/java/org/redisson/reactive/PublisherAdder.java b/redisson/src/main/java/org/redisson/reactive/PublisherAdder.java index 5b543864b..0d4db0695 100644 --- a/redisson/src/main/java/org/redisson/reactive/PublisherAdder.java +++ b/redisson/src/main/java/org/redisson/reactive/PublisherAdder.java @@ -19,7 +19,10 @@ import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; +import org.redisson.api.RFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import reactor.rx.Promise; import reactor.rx.Promises; import reactor.rx.action.support.DefaultSubscriber; @@ -32,21 +35,17 @@ import reactor.rx.action.support.DefaultSubscriber; */ public abstract class PublisherAdder { - public abstract Publisher add(Object o); + public abstract RFuture add(Object o); - public Integer sum(Integer first, Integer second) { - return first + second; - } - - public Publisher addAll(Publisher c) { - final Promise promise = Promises.prepare(); + public Publisher addAll(Publisher c) { + final Promise promise = Promises.prepare(); c.subscribe(new DefaultSubscriber() { volatile boolean completed; AtomicLong values = new AtomicLong(); Subscription s; - Integer lastSize = 0; + Boolean lastSize = false; @Override public void onSubscribe(Subscription s) { @@ -57,21 +56,17 @@ public abstract class PublisherAdder { @Override public void onNext(V o) { values.getAndIncrement(); - add(o).subscribe(new DefaultSubscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } - - @Override - public void onError(Throwable t) { - promise.onError(t); - } - + add(o).addListener(new FutureListener() { @Override - public void onNext(Integer o) { - lastSize = sum(lastSize, o); + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + promise.onError(future.cause()); + return; + } + + if (future.getNow()) { + lastSize = true; + } s.request(1); if (values.decrementAndGet() == 0 && completed) { promise.onNext(lastSize); diff --git a/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java b/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java index 50845af92..34f11347d 100644 --- a/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java +++ b/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java @@ -69,6 +69,7 @@ public class ReactiveProxyBuilder { final Method mm = instanceMethod; if (instanceMethod.getName().endsWith("Async")) { return commandExecutor.reactive(new Supplier>() { + @SuppressWarnings("unchecked") @Override public RFuture get() { try { diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java index 6def46e3a..3d58fd83d 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBatchReactive.java @@ -169,14 +169,14 @@ public class RedissonBatchReactive implements RBatchReactive { public RSetReactive getSet(String name) { RedissonSet set = new RedissonSet(executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(executorService, set), RSetReactive.class); + new RedissonSetReactive(set), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RedissonSet set = new RedissonSet(codec, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(executorService, set), RSetReactive.class); + new RedissonSetReactive(set), RSetReactive.class); } @Override @@ -234,14 +234,14 @@ public class RedissonBatchReactive implements RBatchReactive { public RSetCacheReactive getSetCache(String name) { RSetCache set = new RedissonSetCache(evictionScheduler, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(executorService, set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = new RedissonSetCache(codec, evictionScheduler, executorService, name, null); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(executorService, set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set), RSetCacheReactive.class); } @Override @@ -258,8 +258,9 @@ public class RedissonBatchReactive implements RBatchReactive { @Override public RLexSortedSetReactive getLexSortedSet(String name) { - return ReactiveProxyBuilder.create(executorService, new RedissonLexSortedSet(executorService, name, null), - new RedissonLexSortedSetReactive(executorService, new RedissonScoredSortedSetReactive(StringCodec.INSTANCE, executorService, name)), + RedissonLexSortedSet set = new RedissonLexSortedSet(executorService, name, null); + return ReactiveProxyBuilder.create(executorService, set, + new RedissonLexSortedSetReactive(set), RLexSortedSetReactive.class); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java index 060cc0437..c1f8c7678 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java @@ -15,14 +15,12 @@ */ package org.redisson.reactive; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - import org.reactivestreams.Publisher; -import org.redisson.client.codec.StringCodec; -import org.redisson.client.protocol.RedisCommands; -import org.redisson.command.CommandReactiveExecutor; +import org.redisson.RedissonScoredSortedSet; +import org.redisson.api.RFuture; +import org.redisson.api.RLexSortedSet; +import org.redisson.client.RedisClient; +import org.redisson.client.protocol.decoder.ListScanResult; /** * @@ -31,51 +29,48 @@ import org.redisson.command.CommandReactiveExecutor; */ public class RedissonLexSortedSetReactive { - private final RedissonScoredSortedSetReactive instance; - private final CommandReactiveExecutor commandExecutor; + private final RLexSortedSet instance; - public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, RedissonScoredSortedSetReactive instance) { - this.commandExecutor = commandExecutor; + public RedissonLexSortedSetReactive(RLexSortedSet instance) { this.instance = instance; } - public Publisher addAll(Publisher c) { + public Publisher addAll(Publisher c) { return new PublisherAdder() { @Override - public Publisher add(Object e) { - return RedissonLexSortedSetReactive.this.add(e); + public RFuture add(Object e) { + return instance.addAsync((String)e); } }.addAll(c); } + private Publisher scanIteratorReactive(final String pattern, final int count) { + return new SetReactiveIterator() { + @Override + protected RFuture> scanIterator(final RedisClient client, final long nextIterPos) { + return ((RedissonScoredSortedSet)instance).scanIteratorAsync(client, nextIterPos, pattern, count); + } + }; + } + + public String getName() { + return ((RedissonScoredSortedSet)instance).getName(); + } + public Publisher iterator() { - return instance.iterator(); + return scanIteratorReactive(null, 10); } public Publisher iterator(String pattern) { - return instance.iterator(pattern); + return scanIteratorReactive(pattern, 10); } public Publisher iterator(int count) { - return instance.iterator(count); + return scanIteratorReactive(null, count); } public Publisher iterator(String pattern, int count) { - return instance.iterator(pattern, count); - } - - public Publisher add(Object e) { - return commandExecutor.writeReactive(instance.getName(), StringCodec.INSTANCE, RedisCommands.ZADD_INT, instance.getName(), 0, e); - } - - public Publisher addAll(Collection c) { - List params = new ArrayList(2*c.size()); - params.add(instance.getName()); - for (Object param : c) { - params.add(0); - params.add(param); - } - return commandExecutor.writeReactive(instance.getName(), StringCodec.INSTANCE, RedisCommands.ZADD_INT, params.toArray()); + return scanIteratorReactive(pattern, count); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index a3f49f801..8e16cd11e 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -15,27 +15,15 @@ */ package org.redisson.reactive; -import static org.redisson.client.protocol.RedisCommands.LINDEX; -import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE; -import static org.redisson.client.protocol.RedisCommands.RPUSH; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.redisson.RedissonList; -import org.redisson.RedissonObject; import org.redisson.api.RFuture; import org.redisson.client.codec.Codec; -import org.redisson.client.protocol.RedisCommands; -import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.command.CommandReactiveExecutor; -import reactor.fn.Supplier; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -49,15 +37,12 @@ import reactor.rx.subscription.ReactiveSubscription; public class RedissonListReactive { private final RedissonList instance; - private final CommandReactiveExecutor commandExecutor; public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) { - this.commandExecutor = commandExecutor; this.instance = new RedissonList(commandExecutor, name, null); } public RedissonListReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) { - this.commandExecutor = commandExecutor; this.instance = new RedissonList(codec, commandExecutor, name, null); } @@ -89,33 +74,25 @@ public class RedissonListReactive { @Override protected void onRequest(final long n) { final ReactiveSubscription m = this; - get(currentIndex).subscribe(new Subscriber() { - V currValue; - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - + instance.getAsync(currentIndex).addListener(new FutureListener() { @Override - public void onNext(V value) { - currValue = value; - m.onNext(value); - if (forward) { - currentIndex++; - } else { - currentIndex--; + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + m.onError(future.cause()); + return; } - } - - @Override - public void onError(Throwable error) { - m.onError(error); - } - - @Override - public void onComplete() { - if (currValue == null) { + + V value = future.getNow(); + if (value != null) { + m.onNext(value); + if (forward) { + currentIndex++; + } else { + currentIndex--; + } + } + + if (value == null) { m.onComplete(); return; } @@ -132,120 +109,15 @@ public class RedissonListReactive { }; } - public Publisher add(V e) { - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RPUSH, instance.getName(), ((RedissonObject)instance).encode(e)); - } - - protected Publisher remove(Object o, int count) { - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), LREM_SINGLE, instance.getName(), count, ((RedissonObject)instance).encode(o)); - } - - public Publisher addAll(Publisher c) { + public Publisher addAll(Publisher c) { return new PublisherAdder() { @Override - public Integer sum(Integer first, Integer second) { - return second; - } - - @Override - public Publisher add(Object o) { - return RedissonListReactive.this.add((V)o); + public RFuture add(Object o) { + return instance.addAsync((V)o); } }.addAll(c); } - public Publisher addAll(Collection c) { - if (c.isEmpty()) { - return commandExecutor.reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.sizeAsync(); - } - }); - } - - List args = new ArrayList(c.size() + 1); - args.add(instance.getName()); - ((RedissonObject)instance).encode(args, c); - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RPUSH, args.toArray()); - } - - public Publisher addAll(long index, Collection coll) { - if (index < 0) { - throw new IndexOutOfBoundsException("index: " + index); - } - - if (coll.isEmpty()) { - return commandExecutor.reactive(new Supplier>() { - @Override - public RFuture get() { - return instance.sizeAsync(); - } - }); - } - - if (index == 0) { // prepend elements to list - List elements = new ArrayList(); - ((RedissonObject)instance).encode(elements, coll); - Collections.reverse(elements); - elements.add(0, instance.getName()); - - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.LPUSH, elements.toArray()); - } - - List args = new ArrayList(coll.size() + 1); - args.add(index); - ((RedissonObject)instance).encode(args, coll); - return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_INTEGER, - "local ind = table.remove(ARGV, 1); " + // index is the first parameter - "local size = redis.call('llen', KEYS[1]); " + - "assert(tonumber(ind) <= size, 'index: ' .. ind .. ' but current size: ' .. size); " + - "local tail = redis.call('lrange', KEYS[1], ind, -1); " + - "redis.call('ltrim', KEYS[1], 0, ind - 1); " + - "for i, v in ipairs(ARGV) do redis.call('rpush', KEYS[1], v) end;" + - "for i, v in ipairs(tail) do redis.call('rpush', KEYS[1], v) end;" + - "return redis.call('llen', KEYS[1]);", - Collections.singletonList(instance.getName()), args.toArray()); - } - - public Publisher get(long index) { - return commandExecutor.readReactive(instance.getName(), instance.getCodec(), LINDEX, instance.getName(), index); - } - - public Publisher set(long index, V element) { - return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_OBJECT, - "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + - "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + - "return v", - Collections.singletonList(instance.getName()), index, ((RedissonObject)instance).encode(element)); - } - - public Publisher fastSet(long index, V element) { - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.LSET, instance.getName(), index, ((RedissonObject)instance).encode(element)); - } - - public Publisher add(long index, V element) { - return addAll(index, Collections.singleton(element)); - } - - public Publisher indexOf(final Object o) { - return commandExecutor.reactive(new Supplier>() { - @Override - public RFuture get() { - return ((RedissonList)instance).indexOfAsync(o, new LongReplayConvertor()); - } - }); - } - - public Publisher lastIndexOf(final Object o) { - return commandExecutor.reactive(new Supplier>() { - @Override - public RFuture get() { - return ((RedissonList)instance).lastIndexOfAsync(o, new LongReplayConvertor()); - } - }); - } - } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java index 0c155b8ec..cd09fed1f 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetCacheReactive.java @@ -15,23 +15,12 @@ */ package org.redisson.reactive; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - import org.reactivestreams.Publisher; -import org.redisson.RedissonSetCache; import org.redisson.ScanIterator; import org.redisson.api.RFuture; import org.redisson.api.RSetCache; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.command.CommandReactiveExecutor; - -import io.netty.buffer.ByteBuf; -import reactor.rx.Streams; /** * @@ -42,10 +31,8 @@ import reactor.rx.Streams; public class RedissonSetCacheReactive { private final RSetCache instance; - private final CommandReactiveExecutor commandExecutor; - public RedissonSetCacheReactive(CommandReactiveExecutor commandExecutor, RSetCache instance) { - this.commandExecutor = commandExecutor; + public RedissonSetCacheReactive(RSetCache instance) { this.instance = instance; } @@ -58,40 +45,11 @@ public class RedissonSetCacheReactive { }; } - public Publisher add(V value) { - long timeoutDate = 92233720368547758L; - return commandExecutor.evalWriteReactive(instance.getName(), instance.getCodec(), RedisCommands.EVAL_INTEGER, - "local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " - + "if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then " - + "return 0;" - + "end; " + - "redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " + - "return 1; ", - Arrays.asList(instance.getName()), System.currentTimeMillis(), timeoutDate, ((RedissonSetCache)instance).encode(value)); - } - - public Publisher addAll(Collection c) { - if (c.isEmpty()) { - return Streams.just(0); - } - - long score = 92233720368547758L - System.currentTimeMillis(); - List params = new ArrayList(c.size()*2 + 1); - params.add(instance.getName()); - for (V value : c) { - ByteBuf objectState = ((RedissonSetCache)instance).encode(value); - params.add(score); - params.add(objectState); - } - - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.ZADD_RAW, params.toArray()); - } - - public Publisher addAll(Publisher c) { + public Publisher addAll(Publisher c) { return new PublisherAdder() { @Override - public Publisher add(Object o) { - return RedissonSetCacheReactive.this.add((V)o); + public RFuture add(Object o) { + return instance.addAsync((V)o); } }.addAll(c); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java index 55979a113..4ee892a5a 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetMultimapReactive.java @@ -45,7 +45,7 @@ public class RedissonSetMultimapReactive { public RSetReactive get(K key) { RSet set = ((RSetMultimap)instance).get(key); return ReactiveProxyBuilder.create(commandExecutor, set, - new RedissonSetReactive(commandExecutor, set), RSetReactive.class); + new RedissonSetReactive(set), RSetReactive.class); } } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java index a40522bae..cf4e5f9a5 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -15,22 +15,12 @@ */ package org.redisson.reactive; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - import org.reactivestreams.Publisher; -import org.redisson.RedissonObject; import org.redisson.RedissonSet; import org.redisson.api.RFuture; import org.redisson.api.RSet; import org.redisson.client.RedisClient; -import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.command.CommandReactiveExecutor; - -import reactor.fn.Supplier; /** * Distributed and concurrent implementation of {@link java.util.Set} @@ -42,63 +32,20 @@ import reactor.fn.Supplier; public class RedissonSetReactive { private final RSet instance; - private final CommandReactiveExecutor commandExecutor; - public RedissonSetReactive(CommandReactiveExecutor commandExecutor, RSet instance) { - this.commandExecutor = commandExecutor; + public RedissonSetReactive(RSet instance) { this.instance = instance; } - public Publisher addAll(Publisher c) { + public Publisher addAll(Publisher c) { return new PublisherAdder() { @Override - public Publisher add(Object e) { - return RedissonSetReactive.this.add((V)e); + public RFuture add(Object e) { + return instance.addAsync((V)e); } }.addAll(c); } - private Publisher> scanIteratorReactive(final RedisClient client, final long startPos, final String pattern, final int count) { - return commandExecutor.reactive(new Supplier>>() { - @Override - public RFuture> get() { - return ((RedissonSet)instance).scanIteratorAsync(instance.getName(), client, startPos, pattern, count); - } - }); - } - - public Publisher add(V e) { - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SADD, instance.getName(), ((RedissonObject)instance).encode(e)); - } - - public Publisher addAll(Collection c) { - List args = new ArrayList(c.size() + 1); - args.add(instance.getName()); - ((RedissonObject)instance).encode(args, c); - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SADD, args.toArray()); - } - - public Publisher intersection(String... names) { - List args = new ArrayList(names.length + 1); - args.add(instance.getName()); - args.addAll(Arrays.asList(names)); - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SINTERSTORE, args.toArray()); - } - - public Publisher diff(String... names) { - List args = new ArrayList(names.length + 1); - args.add(instance.getName()); - args.addAll(Arrays.asList(names)); - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SDIFFSTORE, args.toArray()); - } - - public Publisher union(String... names) { - List args = new ArrayList(names.length + 1); - args.add(instance.getName()); - args.addAll(Arrays.asList(names)); - return commandExecutor.writeReactive(instance.getName(), instance.getCodec(), RedisCommands.SUNIONSTORE, args.toArray()); - } - public Publisher iterator(int count) { return iterator(null, count); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java index 1fd984412..3071e261c 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonTransactionReactive.java @@ -92,28 +92,28 @@ public class RedissonTransactionReactive implements RTransactionReactive { public RSetReactive getSet(String name) { RSet set = transaction.getSet(name); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(executorService, set), RSetReactive.class); + new RedissonSetReactive(set), RSetReactive.class); } @Override public RSetReactive getSet(String name, Codec codec) { RSet set = transaction.getSet(name, codec); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetReactive(executorService, set), RSetReactive.class); + new RedissonSetReactive(set), RSetReactive.class); } @Override public RSetCacheReactive getSetCache(String name) { RSetCache set = transaction.getSetCache(name); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(executorService, set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set), RSetCacheReactive.class); } @Override public RSetCacheReactive getSetCache(String name, Codec codec) { RSetCache set = transaction.getSetCache(name, codec); return ReactiveProxyBuilder.create(executorService, set, - new RedissonSetCacheReactive(executorService, set), RSetCacheReactive.class); + new RedissonSetCacheReactive(set), RSetCacheReactive.class); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonLexSortedSetReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonLexSortedSetReactiveTest.java index 250d4f36d..692f026a8 100644 --- a/redisson/src/test/java/org/redisson/RedissonLexSortedSetReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonLexSortedSetReactiveTest.java @@ -18,28 +18,28 @@ public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest { @Test public void testAddAllReactive() { RLexSortedSetReactive list = redisson.getLexSortedSet("set"); - Assert.assertTrue(sync(list.add("1")) == 1); - Assert.assertTrue(sync(list.add("2")) == 1); - Assert.assertTrue(sync(list.add("3")) == 1); - Assert.assertTrue(sync(list.add("4")) == 1); - Assert.assertTrue(sync(list.add("5")) == 1); + Assert.assertTrue(sync(list.add("1"))); + Assert.assertTrue(sync(list.add("2"))); + Assert.assertTrue(sync(list.add("3"))); + Assert.assertTrue(sync(list.add("4"))); + Assert.assertTrue(sync(list.add("5"))); RLexSortedSetReactive list2 = redisson.getLexSortedSet("set2"); - Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue()); + Assert.assertEquals(true, sync(list2.addAll(list.iterator()))); Assert.assertEquals(5, sync(list2.size()).intValue()); } @Test public void testRemoveLexRangeTail() { RLexSortedSetReactive set = redisson.getLexSortedSet("simple"); - Assert.assertTrue(sync(set.add("a")) == 1); - Assert.assertFalse(sync(set.add("a")) == 1); - Assert.assertTrue(sync(set.add("b")) == 1); - Assert.assertTrue(sync(set.add("c")) == 1); - Assert.assertTrue(sync(set.add("d")) == 1); - Assert.assertTrue(sync(set.add("e")) == 1); - Assert.assertTrue(sync(set.add("f")) == 1); - Assert.assertTrue(sync(set.add("g")) == 1); + Assert.assertTrue(sync(set.add("a"))); + Assert.assertFalse(sync(set.add("a"))); + Assert.assertTrue(sync(set.add("b"))); + Assert.assertTrue(sync(set.add("c"))); + Assert.assertTrue(sync(set.add("d"))); + Assert.assertTrue(sync(set.add("e"))); + Assert.assertTrue(sync(set.add("f"))); + Assert.assertTrue(sync(set.add("g"))); Assert.assertEquals(0, sync(set.removeRangeTail("z", false)).intValue()); @@ -86,14 +86,14 @@ public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest { @Test public void testLexRangeTail() { RLexSortedSetReactive set = redisson.getLexSortedSet("simple"); - Assert.assertTrue(sync(set.add("a")) == 1); - Assert.assertFalse(sync(set.add("a")) == 1); - Assert.assertTrue(sync(set.add("b")) == 1); - Assert.assertTrue(sync(set.add("c")) == 1); - Assert.assertTrue(sync(set.add("d")) == 1); - Assert.assertTrue(sync(set.add("e")) == 1); - Assert.assertTrue(sync(set.add("f")) == 1); - Assert.assertTrue(sync(set.add("g")) == 1); + Assert.assertTrue(sync(set.add("a"))); + Assert.assertFalse(sync(set.add("a"))); + Assert.assertTrue(sync(set.add("b"))); + Assert.assertTrue(sync(set.add("c"))); + Assert.assertTrue(sync(set.add("d"))); + Assert.assertTrue(sync(set.add("e"))); + Assert.assertTrue(sync(set.add("f"))); + Assert.assertTrue(sync(set.add("g"))); assertThat(sync(set.rangeTail("c", false))).containsExactly("d", "e", "f", "g"); assertThat(sync(set.rangeTail("c", true))).containsExactly("c", "d", "e", "f", "g"); diff --git a/redisson/src/test/java/org/redisson/RedissonListReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonListReactiveTest.java index 3ee8c8563..d37828d8c 100644 --- a/redisson/src/test/java/org/redisson/RedissonListReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonListReactiveTest.java @@ -44,7 +44,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(5)); RListReactive list2 = redisson.getList("list2"); - Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue()); + Assert.assertEquals(true, sync(list2.addAll(list.iterator()))); Assert.assertEquals(5, sync(list2.size()).intValue()); } @@ -52,13 +52,13 @@ public class RedissonListReactiveTest extends BaseReactiveTest { public void testAddAllWithIndex() throws InterruptedException { final RListReactive list = redisson.getList("list"); final CountDownLatch latch = new CountDownLatch(1); - list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new Promise() { + list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new Promise() { @Override - public void onNext(Integer element) { - list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new Promise() { + public void onNext(Boolean element) { + list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new Promise() { @Override - public void onNext(Integer value) { + public void onNext(Boolean value) { latch.countDown(); } @@ -84,12 +84,12 @@ public class RedissonListReactiveTest extends BaseReactiveTest { public void testAdd() throws InterruptedException { final RListReactive list = redisson.getList("list"); final CountDownLatch latch = new CountDownLatch(1); - list.add(1L).subscribe(new Promise() { + list.add(1L).subscribe(new Promise() { @Override - public void onNext(Integer value) { - list.add(2L).subscribe(new Promise() { + public void onNext(Boolean value) { + list.add(2L).subscribe(new Promise() { @Override - public void onNext(Integer value) { + public void onNext(Boolean value) { latch.countDown(); } @@ -240,7 +240,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(3)); sync(list.add(10)); - long index = sync(list.lastIndexOf(3)); + int index = sync(list.lastIndexOf(3)); Assert.assertEquals(8, index); } @@ -286,7 +286,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { assertThat(sync(list)).containsExactly(1, 2, 3, 4, 6); } - @Test(expected = RedisException.class) + @Test(expected = IndexOutOfBoundsException.class) public void testSetFail() throws InterruptedException { RListReactive list = redisson.getList("list"); sync(list.add(1)); @@ -397,7 +397,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(4)); sync(list.add(5)); - Assert.assertEquals(8, sync(list.addAll(2, Arrays.asList(7, 8, 9))).longValue()); + Assert.assertEquals(true, sync(list.addAll(2, Arrays.asList(7, 8, 9)))); assertThat(sync(list)).containsExactly(1, 2, 7, 8, 9, 3, 4, 5); @@ -409,7 +409,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { assertThat(sync(list)).containsExactly(1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5); - Assert.assertEquals(15, sync(list.addAll(0, Arrays.asList(6, 7))).intValue()); + Assert.assertEquals(true, sync(list.addAll(0, Arrays.asList(6, 7)))); assertThat(sync(list)).containsExactly(6,7,1, 2, 7, 8, 9, 3, 4, 9, 1, 9, 5, 0, 5); } @@ -423,9 +423,9 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(4)); sync(list.add(5)); - Assert.assertEquals(8, sync(list.addAll(Arrays.asList(7, 8, 9))).intValue()); + Assert.assertEquals(true, sync(list.addAll(Arrays.asList(7, 8, 9)))); - Assert.assertEquals(11, sync(list.addAll(Arrays.asList(9, 1, 9))).intValue()); + Assert.assertEquals(true, sync(list.addAll(Arrays.asList(9, 1, 9)))); assertThat(sync(list)).containsExactly(1, 2, 3, 4, 5, 7, 8, 9, 9, 1, 9); } @@ -433,7 +433,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { @Test public void testAddAllEmpty() { RListReactive list = redisson.getList("list"); - Assert.assertEquals(0, sync(list.addAll(Collections.emptyList())).intValue()); + Assert.assertEquals(false, sync(list.addAll(Collections.emptyList()))); Assert.assertEquals(0, sync(list.size()).intValue()); } diff --git a/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java index 93e0fe8ba..fb03db643 100644 --- a/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java @@ -104,7 +104,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest { RBucketReactive b1 = reactive.getBucket("b1"); sync(b1.set(new MyObject())); RSetReactive s1 = reactive.getSet("s1"); - assertTrue(sync(s1.add(b1)) == 1); + assertTrue(sync(s1.add(b1))); assertTrue(codec == b1.getCodec()); Config config1 = new Config(); diff --git a/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java index 9f47cc72b..986c100a2 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetCacheReactiveTest.java @@ -189,14 +189,14 @@ public class RedissonSetCacheReactiveTest extends BaseReactiveTest { @Test public void testSize() { RSetCacheReactive set = redisson.getSetCache("set"); - Assert.assertEquals(1, sync(set.add(1)).intValue()); - Assert.assertEquals(1, sync(set.add(2)).intValue()); - Assert.assertEquals(1, sync(set.add(3)).intValue()); - Assert.assertEquals(0, sync(set.add(3)).intValue()); - Assert.assertEquals(0, sync(set.add(3)).intValue()); - Assert.assertEquals(1, sync(set.add(4)).intValue()); - Assert.assertEquals(1, sync(set.add(5)).intValue()); - Assert.assertEquals(0, sync(set.add(5)).intValue()); + Assert.assertEquals(true, sync(set.add(1))); + Assert.assertEquals(true, sync(set.add(2))); + Assert.assertEquals(true, sync(set.add(3))); + Assert.assertEquals(false, sync(set.add(3))); + Assert.assertEquals(false, sync(set.add(3))); + Assert.assertEquals(true, sync(set.add(4))); + Assert.assertEquals(true, sync(set.add(5))); + Assert.assertEquals(false, sync(set.add(5))); Assert.assertEquals(5, sync(set.size()).intValue()); } diff --git a/redisson/src/test/java/org/redisson/RedissonSetReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonSetReactiveTest.java index 3e4c32148..be1b42bff 100644 --- a/redisson/src/test/java/org/redisson/RedissonSetReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonSetReactiveTest.java @@ -39,7 +39,7 @@ public class RedissonSetReactiveTest extends BaseReactiveTest { sync(list.add(5)); RSetReactive list2 = redisson.getSet("set2"); - Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue()); + Assert.assertEquals(true, sync(list2.addAll(list.iterator()))); Assert.assertEquals(5, sync(list2.size()).intValue()); }