diff --git a/src/main/java/org/redisson/CommandReactiveExecutor.java b/src/main/java/org/redisson/CommandReactiveExecutor.java index 987eb7d28..87019d522 100644 --- a/src/main/java/org/redisson/CommandReactiveExecutor.java +++ b/src/main/java/org/redisson/CommandReactiveExecutor.java @@ -17,12 +17,11 @@ package org.redisson; import java.util.List; +import org.reactivestreams.Publisher; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; -import rx.Single; - /** * * @author Nikita Koksharov @@ -32,16 +31,16 @@ public interface CommandReactiveExecutor { ConnectionManager getConnectionManager(); - Single evalWriteObservable(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); + Publisher evalWriteObservable(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params); - Single evalReadObservable(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); + Publisher evalReadObservable(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object ... params); - Single writeObservable(String key, RedisCommand command, Object ... params); + Publisher writeObservable(String key, RedisCommand command, Object ... params); - Single writeObservable(String key, Codec codec, RedisCommand command, Object ... params); + Publisher writeObservable(String key, Codec codec, RedisCommand command, Object ... params); - Single readObservable(String key, RedisCommand command, Object ... params); + Publisher readObservable(String key, RedisCommand command, Object ... params); - Single readObservable(String key, Codec codec, RedisCommand command, Object ... params); + Publisher readObservable(String key, Codec codec, RedisCommand command, Object ... params); } diff --git a/src/main/java/org/redisson/CommandReactiveService.java b/src/main/java/org/redisson/CommandReactiveService.java index 8b63b4fdf..916eb592d 100644 --- a/src/main/java/org/redisson/CommandReactiveService.java +++ b/src/main/java/org/redisson/CommandReactiveService.java @@ -17,14 +17,18 @@ package org.redisson; import java.util.List; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.connection.ConnectionManager; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import rx.Single; -import rx.SingleSubscriber; +import reactor.core.support.Exceptions; +import reactor.rx.Stream; +import reactor.rx.action.Action; +import reactor.rx.subscription.ReactiveSubscription; /** * @@ -33,27 +37,45 @@ import rx.SingleSubscriber; */ public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor { - static class ToObservableFuture implements Single.OnSubscribe { + static class NettyFuturePublisher extends Stream { private final Future that; - public ToObservableFuture(Future that) { + public NettyFuturePublisher(Future that) { this.that = that; } @Override - public void call(final SingleSubscriber subscriber) { - that.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - subscriber.onError(future.cause()); - return; + public void subscribe(final Subscriber subscriber) { + try { + subscriber.onSubscribe(new ReactiveSubscription(this, subscriber) { + + @Override + public void request(long elements) { + Action.checkRequest(elements); + if (isComplete()) return; + + that.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + subscriber.onError(future.cause()); + return; + } + + if (future.getNow() != null) { + subscriber.onNext(future.getNow()); + } + onComplete(); + } + }); } - - subscriber.onSuccess(future.getNow()); - } - }); + }); + } catch (Throwable throwable) { + Exceptions.throwIfFatal(throwable); + subscriber.onError(throwable); + } } + } public CommandReactiveService(ConnectionManager connectionManager) { @@ -61,39 +83,39 @@ public class CommandReactiveService extends CommandAsyncService implements Comma } @Override - public Single writeObservable(String key, RedisCommand command, Object ... params) { + public Publisher writeObservable(String key, RedisCommand command, Object ... params) { return writeObservable(key, connectionManager.getCodec(), command, params); } @Override - public Single writeObservable(String key, Codec codec, RedisCommand command, Object ... params) { + public Publisher writeObservable(String key, Codec codec, RedisCommand command, Object ... params) { Future f = writeAsync(key, codec, command, params); - return Single.create(new ToObservableFuture(f)); + return new NettyFuturePublisher(f); } @Override - public Single readObservable(String key, RedisCommand command, Object ... params) { + public Publisher readObservable(String key, RedisCommand command, Object ... params) { return readObservable(key, connectionManager.getCodec(), command, params); } @Override - public Single readObservable(String key, Codec codec, RedisCommand command, Object ... params) { + public Publisher readObservable(String key, Codec codec, RedisCommand command, Object ... params) { Future f = readAsync(key, codec, command, params); - return Single.create(new ToObservableFuture(f)); + return new NettyFuturePublisher(f); } @Override - public Single evalReadObservable(String key, Codec codec, RedisCommand evalCommandType, + public Publisher evalReadObservable(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { Future f = evalReadAsync(key, codec, evalCommandType, script, keys, params); - return Single.create(new ToObservableFuture(f)); + return new NettyFuturePublisher(f); } @Override - public Single evalWriteObservable(String key, Codec codec, RedisCommand evalCommandType, + public Publisher evalWriteObservable(String key, Codec codec, RedisCommand evalCommandType, String script, List keys, Object... params) { Future f = evalWriteAsync(key, codec, evalCommandType, script, keys, params); - return Single.create(new ToObservableFuture(f)); + return new NettyFuturePublisher(f); } } diff --git a/src/main/java/org/redisson/RedissonBucketReactive.java b/src/main/java/org/redisson/RedissonBucketReactive.java index 109355ef3..55db31375 100644 --- a/src/main/java/org/redisson/RedissonBucketReactive.java +++ b/src/main/java/org/redisson/RedissonBucketReactive.java @@ -17,12 +17,11 @@ package org.redisson; import java.util.concurrent.TimeUnit; +import org.reactivestreams.Publisher; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RBucketReactive; -import rx.Single; - public class RedissonBucketReactive extends RedissonExpirableReactive implements RBucketReactive { protected RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) { @@ -34,22 +33,22 @@ public class RedissonBucketReactive extends RedissonExpirableReactive impleme } @Override - public Single get() { + public Publisher get() { return commandExecutor.readObservable(getName(), codec, RedisCommands.GET, getName()); } @Override - public Single set(V value) { + public Publisher set(V value) { return commandExecutor.writeObservable(getName(), codec, RedisCommands.SET, getName(), value); } @Override - public Single set(V value, long timeToLive, TimeUnit timeUnit) { + public Publisher set(V value, long timeToLive, TimeUnit timeUnit) { return commandExecutor.writeObservable(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value); } @Override - public Single exists() { + public Publisher exists() { return commandExecutor.readObservable(getName(), codec, RedisCommands.EXISTS, getName()); } diff --git a/src/main/java/org/redisson/RedissonExpirableReactive.java b/src/main/java/org/redisson/RedissonExpirableReactive.java index 25a754073..ca258ef74 100644 --- a/src/main/java/org/redisson/RedissonExpirableReactive.java +++ b/src/main/java/org/redisson/RedissonExpirableReactive.java @@ -18,12 +18,12 @@ package org.redisson; import java.util.Date; import java.util.concurrent.TimeUnit; +import org.reactivestreams.Publisher; import org.redisson.client.codec.Codec; import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RExpirableReactive; -import rx.Single; abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive { @@ -36,27 +36,27 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen } @Override - public Single expire(long timeToLive, TimeUnit timeUnit) { + public Publisher expire(long timeToLive, TimeUnit timeUnit) { return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.EXPIRE, getName(), timeUnit.toSeconds(timeToLive)); } @Override - public Single expireAt(long timestamp) { + public Publisher expireAt(long timestamp) { return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.EXPIREAT, getName(), timestamp); } @Override - public Single expireAt(Date timestamp) { + public Publisher expireAt(Date timestamp) { return expireAt(timestamp.getTime() / 1000); } @Override - public Single clearExpire() { + public Publisher clearExpire() { return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName()); } @Override - public Single remainTimeToLive() { + public Publisher remainTimeToLive() { return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.TTL, getName()); } diff --git a/src/main/java/org/redisson/RedissonHyperLogLogReactive.java b/src/main/java/org/redisson/RedissonHyperLogLogReactive.java index 274f53969..900f81638 100644 --- a/src/main/java/org/redisson/RedissonHyperLogLogReactive.java +++ b/src/main/java/org/redisson/RedissonHyperLogLogReactive.java @@ -20,12 +20,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import org.reactivestreams.Publisher; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RHyperLogLogReactive; -import rx.Single; - public class RedissonHyperLogLogReactive extends RedissonExpirableReactive implements RHyperLogLogReactive { protected RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) { @@ -37,12 +36,12 @@ public class RedissonHyperLogLogReactive extends RedissonExpirableReactive im } @Override - public Single add(V obj) { + public Publisher add(V obj) { return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFADD, getName(), obj); } @Override - public Single addAll(Collection objects) { + public Publisher addAll(Collection objects) { List args = new ArrayList(objects.size() + 1); args.add(getName()); args.addAll(objects); @@ -50,12 +49,12 @@ public class RedissonHyperLogLogReactive extends RedissonExpirableReactive im } @Override - public Single count() { + public Publisher count() { return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFCOUNT, getName()); } @Override - public Single countWith(String... otherLogNames) { + public Publisher countWith(String... otherLogNames) { List args = new ArrayList(otherLogNames.length + 1); args.add(getName()); args.addAll(Arrays.asList(otherLogNames)); @@ -63,7 +62,7 @@ public class RedissonHyperLogLogReactive extends RedissonExpirableReactive im } @Override - public Single mergeWith(String... otherLogNames) { + public Publisher mergeWith(String... otherLogNames) { List args = new ArrayList(otherLogNames.length + 1); args.add(getName()); args.addAll(Arrays.asList(otherLogNames)); diff --git a/src/main/java/org/redisson/RedissonListReactive.java b/src/main/java/org/redisson/RedissonListReactive.java index 2859d5076..30eb39a68 100644 --- a/src/main/java/org/redisson/RedissonListReactive.java +++ b/src/main/java/org/redisson/RedissonListReactive.java @@ -19,7 +19,6 @@ import static org.redisson.client.protocol.RedisCommands.EVAL_OBJECT; import static org.redisson.client.protocol.RedisCommands.LINDEX; import static org.redisson.client.protocol.RedisCommands.LLEN; import static org.redisson.client.protocol.RedisCommands.LPOP; -import static org.redisson.client.protocol.RedisCommands.LPUSH; import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE; import static org.redisson.client.protocol.RedisCommands.RPUSH; @@ -28,6 +27,10 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; @@ -38,13 +41,9 @@ import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.core.RListReactive; -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Single; -import rx.SingleSubscriber; -import rx.Subscriber; -import rx.observables.ConnectableObservable; -import rx.subjects.PublishSubject; +import reactor.core.reactivestreams.SubscriberBarrier; +import reactor.rx.Stream; +import reactor.rx.subscription.ReactiveSubscription; /** * Distributed and concurrent implementation of {@link java.util.List} @@ -64,80 +63,101 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Single size() { + public Publisher size() { return commandExecutor.readObservable(getName(), codec, LLEN, getName()); } @Override - public Observable descendingIterator() { + public Publisher descendingIterator() { return iterator(-1, false); } @Override - public Observable iterator() { + public Publisher iterator() { return iterator(0, true); } @Override - public Observable descendingIterator(int startIndex) { + public Publisher descendingIterator(int startIndex) { return iterator(startIndex, false); } @Override - public Observable iterator(int startIndex) { + public Publisher iterator(int startIndex) { return iterator(startIndex, true); } - private Observable iterator(final int startIndex, final boolean forward) { - return Observable.create(new OnSubscribe() { - - private int currentIndex = startIndex; + private Publisher iterator(final int startIndex, final boolean forward) { + return new Stream() { @Override - public void call(final Subscriber t) { - get(currentIndex).subscribe(new SingleSubscriber() { + public void subscribe(final Subscriber t) { + t.onSubscribe(new ReactiveSubscription(this, t) { - @Override - public void onError(Throwable e) { - t.onError(e); - } + private int currentIndex = startIndex; @Override - public void onSuccess(V val) { - if (val == null) { - t.onCompleted(); - return; - } - t.onNext(val); - if (forward) { - currentIndex++; - } else { - currentIndex--; - } - call(t); + protected void onRequest(final long n) { + final ReactiveSubscription m = this; + get(currentIndex).subscribe(new Subscriber() { + V currValue; + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(V value) { + currValue = value; + m.onNext(value); + if (forward) { + currentIndex++; + } else { + currentIndex--; + } + } + + @Override + public void onError(Throwable error) { + m.onError(error); + } + + @Override + public void onComplete() { + if (currValue == null) { + m.onComplete(); + return; + } + if (n-1 == 0) { + return; + } + onRequest(n-1); + } + }); } }); } - }); + }; } @Override - public Single add(V e) { + public Publisher add(V e) { return commandExecutor.writeObservable(getName(), codec, RPUSH, getName(), e); } @Override - public Single remove(Object o) { + public Publisher remove(Object o) { return remove(o, 1); } - protected Single remove(Object o, int count) { + protected Publisher remove(Object o, int count) { return commandExecutor.writeObservable(getName(), codec, LREM_SINGLE, getName(), count, o); } @Override - public Single containsAll(Collection c) { + public Publisher containsAll(Collection c) { return commandExecutor.evalReadObservable(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), "local items = redis.call('lrange', KEYS[1], 0, -1) " + "for i=1, #items do " + @@ -152,48 +172,19 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Single addAll(final Collection c) { + public Publisher addAll(final Collection c) { if (c.isEmpty()) { return size(); } - final PublishSubject promise = newObservable(); - ConnectableObservable r = promise.replay(); - r.connect(); - Single sizeObservable = size(); - sizeObservable.subscribe(new SingleSubscriber() { - @Override - public void onSuccess(final Long listSize) { - List args = new ArrayList(c.size() + 1); - args.add(getName()); - args.addAll(c); - Single res = commandExecutor.writeObservable(getName(), codec, RPUSH, args.toArray()); - res.subscribe(new SingleSubscriber() { - - @Override - public void onSuccess(Long value) { - promise.onNext(value); - promise.onCompleted(); - } - - @Override - public void onError(Throwable error) { - promise.onError(error); - } - }); - } - - @Override - public void onError(Throwable error) { - promise.onError(error); - } - - }); - return r.toSingle(); + List args = new ArrayList(c.size() + 1); + args.add(getName()); + args.addAll(c); + return commandExecutor.writeObservable(getName(), codec, RPUSH, args.toArray()); } @Override - public Single addAll(final long index, final Collection coll) { + public Publisher addAll(final long index, final Collection coll) { if (coll.isEmpty()) { return size(); } @@ -203,15 +194,15 @@ public class RedissonListReactive extends RedissonExpirableReactive implement Collections.reverse(elements); elements.add(0, getName()); - return commandExecutor.writeObservable(getName(), codec, LPUSH, elements.toArray()); + return commandExecutor.writeObservable(getName(), codec, RedisCommands.LPUSH, elements.toArray()); } - final PublishSubject promise = newObservable(); + final Processor promise = newObservable(); - Single s = size(); - s.subscribe(new SingleSubscriber() { + Publisher s = size(); + s.subscribe(new SubscriberBarrier(promise) { @Override - public void onSuccess(Long size) { + public void doNext(Long size) { if (!isPositionInRange(index, size)) { IndexOutOfBoundsException e = new IndexOutOfBoundsException("index: " + index + " but current size: "+ size); promise.onError(e); @@ -228,7 +219,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement List args = new ArrayList(coll.size() + 1); args.add(index); args.addAll(coll); - Single f = commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", new LongReplayConvertor(), 5), + Publisher f = commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", new LongReplayConvertor(), 5), "local ind = table.remove(ARGV, 1); " + // index is the first parameter "local tail = redis.call('lrange', KEYS[1], ind, -1); " + "redis.call('ltrim', KEYS[1], 0, ind - 1); " + @@ -239,16 +230,12 @@ public class RedissonListReactive extends RedissonExpirableReactive implement f.subscribe(toSubscriber(promise)); } - @Override - public void onError(Throwable error) { - promise.onError(error); - } }); - return promise.toSingle(); + return promise; } @Override - public Single removeAll(Collection c) { + public Publisher removeAll(Collection c) { return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), "local v = false " + "for i = 0, table.getn(ARGV), 1 do " @@ -260,7 +247,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Single retainAll(Collection c) { + public Publisher retainAll(Collection c) { return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", new BooleanReplayConvertor(), 4), "local changed = false " + "local items = redis.call('lrange', KEYS[1], 0, -1) " @@ -286,7 +273,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Single get(long index) { + public Publisher get(long index) { return commandExecutor.readObservable(getName(), codec, LINDEX, getName(), index); } @@ -295,7 +282,7 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Single set(long index, V element) { + public Publisher set(long index, V element) { return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand("EVAL", 5), "local v = redis.call('lindex', KEYS[1], ARGV[1]); " + "redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " + @@ -304,17 +291,17 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Single fastSet(long index, V element) { + public Publisher fastSet(long index, V element) { return commandExecutor.writeObservable(getName(), codec, RedisCommands.LSET, getName(), index, element); } @Override - public Single add(long index, V element) { + public Publisher add(long index, V element) { return addAll(index, Collections.singleton(element)); } @Override - public Single remove(int index) { + public Publisher remove(int index) { if (index == 0) { return commandExecutor.writeObservable(getName(), codec, LPOP, getName()); } @@ -329,11 +316,11 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Single contains(Object o) { + public Publisher contains(Object o) { return indexOf(o, new BooleanNumberReplayConvertor()); } - private Single indexOf(Object o, Convertor convertor) { + private Publisher indexOf(Object o, Convertor convertor) { return commandExecutor.evalReadObservable(getName(), codec, new RedisCommand("EVAL", convertor, 4), "local key = KEYS[1] " + "local obj = ARGV[1] " + @@ -348,12 +335,12 @@ public class RedissonListReactive extends RedissonExpirableReactive implement } @Override - public Single indexOf(Object o) { + public Publisher indexOf(Object o) { return indexOf(o, new IntegerReplayConvertor()); } @Override - public Single lastIndexOf(Object o) { + public Publisher lastIndexOf(Object o) { return commandExecutor.evalReadObservable(getName(), codec, new RedisCommand("EVAL", new IntegerReplayConvertor(), 4), "local key = KEYS[1] " + "local obj = ARGV[1] " + diff --git a/src/main/java/org/redisson/RedissonObjectReactive.java b/src/main/java/org/redisson/RedissonObjectReactive.java index c99971d3f..b50053401 100644 --- a/src/main/java/org/redisson/RedissonObjectReactive.java +++ b/src/main/java/org/redisson/RedissonObjectReactive.java @@ -15,13 +15,18 @@ */ package org.redisson; +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommands; import org.redisson.core.RObjectReactive; -import rx.Single; -import rx.SingleSubscriber; -import rx.subjects.PublishSubject; +import reactor.core.reactivestreams.SubscriberBarrier; +import reactor.rx.Stream; +import reactor.rx.Streams; +import reactor.rx.broadcast.Broadcaster; /** * Base Redisson object @@ -45,27 +50,21 @@ abstract class RedissonObjectReactive implements RObjectReactive { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } - protected SingleSubscriber toSubscriber(final PublishSubject promise) { - return new SingleSubscriber() { + protected Subscriber toSubscriber(final Subscriber promise) { + return new SubscriberBarrier(promise) { @Override - public void onSuccess(V value) { - promise.onNext(value); - promise.onCompleted(); - } - - @Override - public void onError(Throwable error) { - promise.onError(error); + protected void doSubscribe(Subscription subscription) { + subscription.request(1); } }; } - protected PublishSubject newObservable() { - return PublishSubject.create(); + protected Processor newObservable() { + return Broadcaster.create(); } - protected Single newSucceededObservable(V result) { - return Single.just(result); + protected Stream newSucceededObservable(V result) { + return Streams.just(result); } @Override @@ -74,27 +73,27 @@ abstract class RedissonObjectReactive implements RObjectReactive { } @Override - public Single rename(String newName) { + public Publisher rename(String newName) { return commandExecutor.writeObservable(getName(), RedisCommands.RENAME, getName(), newName); } @Override - public Single migrate(String host, int port, int database) { + public Publisher migrate(String host, int port, int database) { return commandExecutor.writeObservable(getName(), RedisCommands.MIGRATE, host, port, getName(), database); } @Override - public Single move(int database) { + public Publisher move(int database) { return commandExecutor.writeObservable(getName(), RedisCommands.MOVE, getName(), database); } @Override - public Single renamenx(String newName) { + public Publisher renamenx(String newName) { return commandExecutor.writeObservable(getName(), RedisCommands.RENAMENX, getName(), newName); } @Override - public Single delete() { + public Publisher delete() { return commandExecutor.writeObservable(getName(), RedisCommands.DEL_SINGLE, getName()); } diff --git a/src/main/java/org/redisson/core/RBucketReactive.java b/src/main/java/org/redisson/core/RBucketReactive.java index da4b537ef..fccbd4e18 100644 --- a/src/main/java/org/redisson/core/RBucketReactive.java +++ b/src/main/java/org/redisson/core/RBucketReactive.java @@ -17,7 +17,8 @@ package org.redisson.core; import java.util.concurrent.TimeUnit; -import rx.Single; +import org.reactivestreams.Publisher; + /** * object functions @@ -28,12 +29,12 @@ import rx.Single; */ public interface RBucketReactive extends RExpirableReactive { - Single get(); + Publisher get(); - Single set(V value); + Publisher set(V value); - Single set(V value, long timeToLive, TimeUnit timeUnit); + Publisher set(V value, long timeToLive, TimeUnit timeUnit); - Single exists(); + Publisher exists(); } diff --git a/src/main/java/org/redisson/core/RCollectionReactive.java b/src/main/java/org/redisson/core/RCollectionReactive.java index c22aae0de..291bbddf2 100644 --- a/src/main/java/org/redisson/core/RCollectionReactive.java +++ b/src/main/java/org/redisson/core/RCollectionReactive.java @@ -17,27 +17,26 @@ package org.redisson.core; import java.util.Collection; -import rx.Observable; -import rx.Single; +import org.reactivestreams.Publisher; public interface RCollectionReactive extends RExpirableReactive { - Observable iterator(); + Publisher iterator(); - Single retainAll(Collection c); + Publisher retainAll(Collection c); - Single removeAll(Collection c); + Publisher removeAll(Collection c); - Single contains(Object o); + Publisher contains(Object o); - Single containsAll(Collection c); + Publisher containsAll(Collection c); - Single remove(Object o); + Publisher remove(Object o); - Single size(); + Publisher size(); - Single add(V e); + Publisher add(V e); - Single addAll(Collection c); + Publisher addAll(Collection c); } diff --git a/src/main/java/org/redisson/core/RExpirableReactive.java b/src/main/java/org/redisson/core/RExpirableReactive.java index 6277d0b9f..7a28f9ace 100644 --- a/src/main/java/org/redisson/core/RExpirableReactive.java +++ b/src/main/java/org/redisson/core/RExpirableReactive.java @@ -18,8 +18,7 @@ package org.redisson.core; import java.util.Date; import java.util.concurrent.TimeUnit; -import rx.Observable; -import rx.Single; +import org.reactivestreams.Publisher; /** * Base interface for all Redisson objects @@ -38,7 +37,7 @@ public interface RExpirableReactive extends RObjectReactive { * @param timeUnit - timeout time unit * @return true if the timeout was set and false if not */ - Single expire(long timeToLive, TimeUnit timeUnit); + Publisher expire(long timeToLive, TimeUnit timeUnit); /** * Set an expire date for object in mode. When expire date comes @@ -47,7 +46,7 @@ public interface RExpirableReactive extends RObjectReactive { * @param timestamp - expire date * @return true if the timeout was set and false if not */ - Single expireAt(Date timestamp); + Publisher expireAt(Date timestamp); /** * Set an expire date for object in mode. When expire date comes @@ -56,7 +55,7 @@ public interface RExpirableReactive extends RObjectReactive { * @param timestamp - expire date in seconds (Unix timestamp) * @return true if the timeout was set and false if not */ - Single expireAt(long timestamp); + Publisher expireAt(long timestamp); /** * Clear an expire timeout or expire date for object in mode. @@ -64,13 +63,13 @@ public interface RExpirableReactive extends RObjectReactive { * * @return true if the timeout was cleared and false if not */ - Single clearExpire(); + Publisher clearExpire(); /** * Get remaining time to live of object in seconds. * * @return -1 if object does not exist or time in seconds */ - Single remainTimeToLive(); + Publisher remainTimeToLive(); } diff --git a/src/main/java/org/redisson/core/RHyperLogLogReactive.java b/src/main/java/org/redisson/core/RHyperLogLogReactive.java index f0915330e..12de71c75 100644 --- a/src/main/java/org/redisson/core/RHyperLogLogReactive.java +++ b/src/main/java/org/redisson/core/RHyperLogLogReactive.java @@ -17,18 +17,18 @@ package org.redisson.core; import java.util.Collection; -import rx.Single; +import org.reactivestreams.Publisher; public interface RHyperLogLogReactive extends RExpirableReactive { - Single add(V obj); + Publisher add(V obj); - Single addAll(Collection objects); + Publisher addAll(Collection objects); - Single count(); + Publisher count(); - Single countWith(String ... otherLogNames); + Publisher countWith(String ... otherLogNames); - Single mergeWith(String ... otherLogNames); + Publisher mergeWith(String ... otherLogNames); } diff --git a/src/main/java/org/redisson/core/RListReactive.java b/src/main/java/org/redisson/core/RListReactive.java index 80873c7e0..8113cfeef 100644 --- a/src/main/java/org/redisson/core/RListReactive.java +++ b/src/main/java/org/redisson/core/RListReactive.java @@ -17,8 +17,7 @@ package org.redisson.core; import java.util.Collection; -import rx.Observable; -import rx.Single; +import org.reactivestreams.Publisher; /** * list functions @@ -30,26 +29,26 @@ import rx.Single; // TODO add sublist support public interface RListReactive extends RCollectionReactive { - Observable descendingIterator(); + Publisher descendingIterator(); - Observable descendingIterator(int startIndex); + Publisher descendingIterator(int startIndex); - Observable iterator(int startIndex); + Publisher iterator(int startIndex); - Single lastIndexOf(Object o); + Publisher lastIndexOf(Object o); - Single indexOf(Object o); + Publisher indexOf(Object o); - Single add(long index, V element); + Publisher add(long index, V element); - Single addAll(long index, Collection coll); + Publisher addAll(long index, Collection coll); - Single fastSet(long index, V element); + Publisher fastSet(long index, V element); - Single set(long index, V element); + Publisher set(long index, V element); - Single get(long index); + Publisher get(long index); - Single remove(int index); + Publisher remove(int index); } diff --git a/src/main/java/org/redisson/core/RObjectReactive.java b/src/main/java/org/redisson/core/RObjectReactive.java index 00de9a4df..14785936c 100644 --- a/src/main/java/org/redisson/core/RObjectReactive.java +++ b/src/main/java/org/redisson/core/RObjectReactive.java @@ -15,7 +15,7 @@ */ package org.redisson.core; -import rx.Single; +import org.reactivestreams.Publisher; /** * Base interface for all Redisson objects @@ -36,7 +36,7 @@ public interface RObjectReactive { * @param database - destination database * @return */ - Single migrate(String host, int port, int database); + Publisher migrate(String host, int port, int database); /** * Move object to another database in mode @@ -44,14 +44,14 @@ public interface RObjectReactive { * @param database * @return true if key was moved false if not */ - Single move(int database); + Publisher move(int database); /** * Delete object in mode * * @return true if object was deleted false if not */ - Single delete(); + Publisher delete(); /** * Rename current object key to newName @@ -60,7 +60,7 @@ public interface RObjectReactive { * @param newName * @return */ - Single rename(String newName); + Publisher rename(String newName); /** * Rename current object key to newName @@ -69,6 +69,6 @@ public interface RObjectReactive { * @param newName * @return */ - Single renamenx(String newName); + Publisher renamenx(String newName); } diff --git a/src/test/java/org/redisson/BaseReactiveTest.java b/src/test/java/org/redisson/BaseReactiveTest.java index 9e971d311..bfe654d3b 100644 --- a/src/test/java/org/redisson/BaseReactiveTest.java +++ b/src/test/java/org/redisson/BaseReactiveTest.java @@ -1,11 +1,17 @@ package org.redisson; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.reactivestreams.Publisher; -import rx.Observable; -import rx.Single; +import reactor.fn.Consumer; +import reactor.rx.Streams; public abstract class BaseReactiveTest { @@ -21,8 +27,16 @@ public abstract class BaseReactiveTest { redisson.shutdown(); } - public V sync(Single ob) { - return ob.toBlocking().value(); + public Iterator toIterator(Publisher pub) { + return Streams.create(pub).toList().poll().iterator(); + } + + public V sync(Publisher ob) { + List t = Streams.create(ob).toList().poll(); + if (t == null) { + return null; + } + return t.iterator().next(); } public static Config createConfig() { diff --git a/src/test/java/org/redisson/RedissonListReactiveTest.java b/src/test/java/org/redisson/RedissonListReactiveTest.java index f0e9cffb4..045c79e79 100644 --- a/src/test/java/org/redisson/RedissonListReactiveTest.java +++ b/src/test/java/org/redisson/RedissonListReactiveTest.java @@ -8,19 +8,24 @@ import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; +import org.reactivestreams.Subscriber; import org.redisson.client.RedisException; import org.redisson.core.RListReactive; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import rx.Observable; -import rx.Single; -import rx.SingleSubscriber; +import reactor.core.queue.CompletableBlockingQueue; +import reactor.fn.Consumer; +import reactor.rx.Promise; +import reactor.rx.Stream; +import reactor.rx.Streams; public class RedissonListReactiveTest extends BaseReactiveTest { @@ -30,19 +35,20 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(test2.add("foo")); sync(test2.add(0, "bar")); - MatcherAssert.assertThat(test2.iterator().toBlocking().toIterable(), Matchers.contains("bar", "foo")); + MatcherAssert.assertThat(sync(test2), Matchers.contains("bar", "foo")); } @Test public void testAddAllWithIndex() throws InterruptedException { final RListReactive list = redisson.getList("list"); final CountDownLatch latch = new CountDownLatch(1); - list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new SingleSubscriber() { + list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new Promise() { + @Override - public void onSuccess(Long value) { - list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new SingleSubscriber() { + public void onNext(Long element) { + list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new Promise() { @Override - public void onSuccess(Long value) { + public void onNext(Long value) { latch.countDown(); } @@ -68,12 +74,12 @@ public class RedissonListReactiveTest extends BaseReactiveTest { public void testAdd() throws InterruptedException { final RListReactive list = redisson.getList("list"); final CountDownLatch latch = new CountDownLatch(1); - list.add(1L).subscribe(new SingleSubscriber() { + list.add(1L).subscribe(new Promise() { @Override - public void onSuccess(Long value) { - list.add(2L).subscribe(new SingleSubscriber() { + public void onNext(Long value) { + list.add(2L).subscribe(new Promise() { @Override - public void onSuccess(Long value) { + public void onNext(Long value) { latch.countDown(); } @@ -96,7 +102,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { } private Iterable sync(RListReactive list) { - return list.iterator().toBlocking().toIterable(); + return Streams.create(list.iterator()).toList().poll(); } @Test @@ -122,7 +128,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(0)); sync(list.add(10)); - Iterator iterator = list.iterator().toBlocking().getIterator(); + Iterator iterator = toIterator(list.iterator()); Assert.assertTrue(1 == iterator.next()); Assert.assertTrue(2 == iterator.next()); @@ -151,7 +157,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(0)); sync(list.add(10)); - Iterator iterator = list.descendingIterator().toBlocking().getIterator(); + Iterator iterator = toIterator(list.descendingIterator()); Assert.assertTrue(10 == iterator.next()); Assert.assertTrue(0 == iterator.next()); @@ -175,7 +181,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(4)); sync(list.add(5)); - Assert.assertEquals(-1, list.lastIndexOf(10).toBlocking().value().intValue()); + Assert.assertEquals(-1, sync(list.lastIndexOf(10)).intValue()); } @Test @@ -192,7 +198,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(0)); sync(list.add(10)); - int index = list.lastIndexOf(3).toBlocking().value(); + int index = sync(list.lastIndexOf(3)); Assert.assertEquals(2, index); } @@ -210,7 +216,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(0)); sync(list.add(10)); - int index = list.lastIndexOf(3).toBlocking().value(); + int index = sync(list.lastIndexOf(3)); Assert.assertEquals(5, index); } @@ -228,7 +234,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(3)); sync(list.add(10)); - int index = list.lastIndexOf(3).toBlocking().value(); + int index = sync(list.lastIndexOf(3)); Assert.assertEquals(8, index); } @@ -239,10 +245,10 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(i)); } - Assert.assertTrue(55 == list.indexOf(56).toBlocking().value()); - Assert.assertTrue(99 == list.indexOf(100).toBlocking().value()); - Assert.assertTrue(-1 == list.indexOf(200).toBlocking().value()); - Assert.assertTrue(-1 == list.indexOf(0).toBlocking().value()); + Assert.assertTrue(55 == sync(list.indexOf(56))); + Assert.assertTrue(99 == sync(list.indexOf(100))); + Assert.assertTrue(-1 == sync(list.indexOf(200))); + Assert.assertTrue(-1 == sync(list.indexOf(0))); } @Test @@ -254,7 +260,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { sync(list.add(4)); sync(list.add(5)); - Integer val = list.remove(0).toBlocking().value(); + Integer val = sync(list.remove(0)); Assert.assertTrue(1 == val); Assert.assertThat(sync(list), Matchers.contains(2, 3, 4, 5)); @@ -435,7 +441,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { public void testContainsAllEmpty() { RListReactive list = redisson.getList("list"); for (int i = 0; i < 200; i++) { - list.add(i); + sync(list.add(i)); } Assert.assertTrue(sync(list.containsAll(Collections.emptyList()))); @@ -458,7 +464,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest { private void checkIterator(RListReactive list) { int iteration = 0; - for (Iterator iterator = list.iterator().toBlocking().getIterator(); iterator.hasNext();) { + for (Iterator iterator = toIterator(list.iterator()); iterator.hasNext();) { String value = iterator.next(); String val = sync(list.get(iteration)); Assert.assertEquals(val, value);