Migrated from RxJava to ReactiveStreams. #210

pull/337/head
Nikita 9 years ago
parent 2904f8e5e8
commit 029f5dca02

@ -17,12 +17,11 @@ package org.redisson;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import rx.Single;
/**
*
* @author Nikita Koksharov
@ -32,16 +31,16 @@ public interface CommandReactiveExecutor {
ConnectionManager getConnectionManager();
<T, R> Single<R> evalWriteObservable(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> Publisher<R> evalWriteObservable(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
<T, R> Single<R> evalReadObservable(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Publisher<R> evalReadObservable(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
<T, R> Single<R> writeObservable(String key, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> writeObservable(String key, RedisCommand<T> command, Object ... params);
<T, R> Single<R> writeObservable(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> writeObservable(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Single<R> readObservable(String key, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> readObservable(String key, RedisCommand<T> command, Object ... params);
<T, R> Single<R> readObservable(String key, Codec codec, RedisCommand<T> command, Object ... params);
<T, R> Publisher<R> readObservable(String key, Codec codec, RedisCommand<T> command, Object ... params);
}

@ -17,14 +17,18 @@ package org.redisson;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import rx.Single;
import rx.SingleSubscriber;
import reactor.core.support.Exceptions;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.subscription.ReactiveSubscription;
/**
*
@ -33,27 +37,45 @@ import rx.SingleSubscriber;
*/
public class CommandReactiveService extends CommandAsyncService implements CommandReactiveExecutor {
static class ToObservableFuture<T> implements Single.OnSubscribe<T> {
static class NettyFuturePublisher<T> extends Stream<T> {
private final Future<? extends T> that;
public ToObservableFuture(Future<? extends T> that) {
public NettyFuturePublisher(Future<? extends T> that) {
this.that = that;
}
@Override
public void call(final SingleSubscriber<? super T> subscriber) {
that.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
subscriber.onError(future.cause());
return;
public void subscribe(final Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new ReactiveSubscription<T>(this, subscriber) {
@Override
public void request(long elements) {
Action.checkRequest(elements);
if (isComplete()) return;
that.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
subscriber.onError(future.cause());
return;
}
if (future.getNow() != null) {
subscriber.onNext(future.getNow());
}
onComplete();
}
});
}
subscriber.onSuccess(future.getNow());
}
});
});
} catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
subscriber.onError(throwable);
}
}
}
public CommandReactiveService(ConnectionManager connectionManager) {
@ -61,39 +83,39 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
}
@Override
public <T, R> Single<R> writeObservable(String key, RedisCommand<T> command, Object ... params) {
public <T, R> Publisher<R> writeObservable(String key, RedisCommand<T> command, Object ... params) {
return writeObservable(key, connectionManager.getCodec(), command, params);
}
@Override
public <T, R> Single<R> writeObservable(String key, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> Publisher<R> writeObservable(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> f = writeAsync(key, codec, command, params);
return Single.create(new ToObservableFuture<R>(f));
return new NettyFuturePublisher<R>(f);
}
@Override
public <T, R> Single<R> readObservable(String key, RedisCommand<T> command, Object ... params) {
public <T, R> Publisher<R> readObservable(String key, RedisCommand<T> command, Object ... params) {
return readObservable(key, connectionManager.getCodec(), command, params);
}
@Override
public <T, R> Single<R> readObservable(String key, Codec codec, RedisCommand<T> command, Object ... params) {
public <T, R> Publisher<R> readObservable(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Future<R> f = readAsync(key, codec, command, params);
return Single.create(new ToObservableFuture<R>(f));
return new NettyFuturePublisher<R>(f);
}
@Override
public <T, R> Single<R> evalReadObservable(String key, Codec codec, RedisCommand<T> evalCommandType,
public <T, R> Publisher<R> evalReadObservable(String key, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, Object... params) {
Future<R> f = evalReadAsync(key, codec, evalCommandType, script, keys, params);
return Single.create(new ToObservableFuture<R>(f));
return new NettyFuturePublisher<R>(f);
}
@Override
public <T, R> Single<R> evalWriteObservable(String key, Codec codec, RedisCommand<T> evalCommandType,
public <T, R> Publisher<R> evalWriteObservable(String key, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, Object... params) {
Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return Single.create(new ToObservableFuture<R>(f));
return new NettyFuturePublisher<R>(f);
}
}

@ -17,12 +17,11 @@ package org.redisson;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RBucketReactive;
import rx.Single;
public class RedissonBucketReactive<V> extends RedissonExpirableReactive implements RBucketReactive<V> {
protected RedissonBucketReactive(CommandReactiveExecutor connectionManager, String name) {
@ -34,22 +33,22 @@ public class RedissonBucketReactive<V> extends RedissonExpirableReactive impleme
}
@Override
public Single<V> get() {
public Publisher<V> get() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.GET, getName());
}
@Override
public Single<Void> set(V value) {
public Publisher<Void> set(V value) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.SET, getName(), value);
}
@Override
public Single<Void> set(V value, long timeToLive, TimeUnit timeUnit) {
public Publisher<Void> set(V value, long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
}
@Override
public Single<Boolean> exists() {
public Publisher<Boolean> exists() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.EXISTS, getName());
}

@ -18,12 +18,12 @@ package org.redisson;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RExpirableReactive;
import rx.Single;
abstract class RedissonExpirableReactive extends RedissonObjectReactive implements RExpirableReactive {
@ -36,27 +36,27 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen
}
@Override
public Single<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.EXPIRE, getName(), timeUnit.toSeconds(timeToLive));
}
@Override
public Single<Boolean> expireAt(long timestamp) {
public Publisher<Boolean> expireAt(long timestamp) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.EXPIREAT, getName(), timestamp);
}
@Override
public Single<Boolean> expireAt(Date timestamp) {
public Publisher<Boolean> expireAt(Date timestamp) {
return expireAt(timestamp.getTime() / 1000);
}
@Override
public Single<Boolean> clearExpire() {
public Publisher<Boolean> clearExpire() {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName());
}
@Override
public Single<Long> remainTimeToLive() {
public Publisher<Long> remainTimeToLive() {
return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.TTL, getName());
}

@ -20,12 +20,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RHyperLogLogReactive;
import rx.Single;
public class RedissonHyperLogLogReactive<V> extends RedissonExpirableReactive implements RHyperLogLogReactive<V> {
protected RedissonHyperLogLogReactive(CommandReactiveExecutor commandExecutor, String name) {
@ -37,12 +36,12 @@ public class RedissonHyperLogLogReactive<V> extends RedissonExpirableReactive im
}
@Override
public Single<Boolean> add(V obj) {
public Publisher<Boolean> add(V obj) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFADD, getName(), obj);
}
@Override
public Single<Boolean> addAll(Collection<V> objects) {
public Publisher<Boolean> addAll(Collection<V> objects) {
List<Object> args = new ArrayList<Object>(objects.size() + 1);
args.add(getName());
args.addAll(objects);
@ -50,12 +49,12 @@ public class RedissonHyperLogLogReactive<V> extends RedissonExpirableReactive im
}
@Override
public Single<Long> count() {
public Publisher<Long> count() {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFCOUNT, getName());
}
@Override
public Single<Long> countWith(String... otherLogNames) {
public Publisher<Long> countWith(String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));
@ -63,7 +62,7 @@ public class RedissonHyperLogLogReactive<V> extends RedissonExpirableReactive im
}
@Override
public Single<Void> mergeWith(String... otherLogNames) {
public Publisher<Void> mergeWith(String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));

@ -19,7 +19,6 @@ import static org.redisson.client.protocol.RedisCommands.EVAL_OBJECT;
import static org.redisson.client.protocol.RedisCommands.LINDEX;
import static org.redisson.client.protocol.RedisCommands.LLEN;
import static org.redisson.client.protocol.RedisCommands.LPOP;
import static org.redisson.client.protocol.RedisCommands.LPUSH;
import static org.redisson.client.protocol.RedisCommands.LREM_SINGLE;
import static org.redisson.client.protocol.RedisCommands.RPUSH;
@ -28,6 +27,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
@ -38,13 +41,9 @@ import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.core.RListReactive;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
import rx.subjects.PublishSubject;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
/**
* Distributed and concurrent implementation of {@link java.util.List}
@ -64,80 +63,101 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<Long> size() {
public Publisher<Long> size() {
return commandExecutor.readObservable(getName(), codec, LLEN, getName());
}
@Override
public Observable<V> descendingIterator() {
public Publisher<V> descendingIterator() {
return iterator(-1, false);
}
@Override
public Observable<V> iterator() {
public Publisher<V> iterator() {
return iterator(0, true);
}
@Override
public Observable<V> descendingIterator(int startIndex) {
public Publisher<V> descendingIterator(int startIndex) {
return iterator(startIndex, false);
}
@Override
public Observable<V> iterator(int startIndex) {
public Publisher<V> iterator(int startIndex) {
return iterator(startIndex, true);
}
private Observable<V> iterator(final int startIndex, final boolean forward) {
return Observable.create(new OnSubscribe<V>() {
private int currentIndex = startIndex;
private Publisher<V> iterator(final int startIndex, final boolean forward) {
return new Stream<V>() {
@Override
public void call(final Subscriber<? super V> t) {
get(currentIndex).subscribe(new SingleSubscriber<V>() {
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
@Override
public void onError(Throwable e) {
t.onError(e);
}
private int currentIndex = startIndex;
@Override
public void onSuccess(V val) {
if (val == null) {
t.onCompleted();
return;
}
t.onNext(val);
if (forward) {
currentIndex++;
} else {
currentIndex--;
}
call(t);
protected void onRequest(final long n) {
final ReactiveSubscription<V> m = this;
get(currentIndex).subscribe(new Subscriber<V>() {
V currValue;
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(V value) {
currValue = value;
m.onNext(value);
if (forward) {
currentIndex++;
} else {
currentIndex--;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
}
@Override
public void onComplete() {
if (currValue == null) {
m.onComplete();
return;
}
if (n-1 == 0) {
return;
}
onRequest(n-1);
}
});
}
});
}
});
};
}
@Override
public Single<Long> add(V e) {
public Publisher<Long> add(V e) {
return commandExecutor.writeObservable(getName(), codec, RPUSH, getName(), e);
}
@Override
public Single<Boolean> remove(Object o) {
public Publisher<Boolean> remove(Object o) {
return remove(o, 1);
}
protected Single<Boolean> remove(Object o, int count) {
protected Publisher<Boolean> remove(Object o, int count) {
return commandExecutor.writeObservable(getName(), codec, LREM_SINGLE, getName(), count, o);
}
@Override
public Single<Boolean> containsAll(Collection<?> c) {
public Publisher<Boolean> containsAll(Collection<?> c) {
return commandExecutor.evalReadObservable(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local items = redis.call('lrange', KEYS[1], 0, -1) " +
"for i=1, #items do " +
@ -152,48 +172,19 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<Long> addAll(final Collection<? extends V> c) {
public Publisher<Long> addAll(final Collection<? extends V> c) {
if (c.isEmpty()) {
return size();
}
final PublishSubject<Long> promise = newObservable();
ConnectableObservable<Long> r = promise.replay();
r.connect();
Single<Long> sizeObservable = size();
sizeObservable.subscribe(new SingleSubscriber<Long>() {
@Override
public void onSuccess(final Long listSize) {
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
Single<Long> res = commandExecutor.writeObservable(getName(), codec, RPUSH, args.toArray());
res.subscribe(new SingleSubscriber<Long>() {
@Override
public void onSuccess(Long value) {
promise.onNext(value);
promise.onCompleted();
}
@Override
public void onError(Throwable error) {
promise.onError(error);
}
});
}
@Override
public void onError(Throwable error) {
promise.onError(error);
}
});
return r.toSingle();
List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);
return commandExecutor.writeObservable(getName(), codec, RPUSH, args.toArray());
}
@Override
public Single<Long> addAll(final long index, final Collection<? extends V> coll) {
public Publisher<Long> addAll(final long index, final Collection<? extends V> coll) {
if (coll.isEmpty()) {
return size();
}
@ -203,15 +194,15 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
Collections.reverse(elements);
elements.add(0, getName());
return commandExecutor.writeObservable(getName(), codec, LPUSH, elements.toArray());
return commandExecutor.writeObservable(getName(), codec, RedisCommands.LPUSH, elements.toArray());
}
final PublishSubject<Long> promise = newObservable();
final Processor<Long, Long> promise = newObservable();
Single<Long> s = size();
s.subscribe(new SingleSubscriber<Long>() {
Publisher<Long> s = size();
s.subscribe(new SubscriberBarrier<Long, Long>(promise) {
@Override
public void onSuccess(Long size) {
public void doNext(Long size) {
if (!isPositionInRange(index, size)) {
IndexOutOfBoundsException e = new IndexOutOfBoundsException("index: " + index + " but current size: "+ size);
promise.onError(e);
@ -228,7 +219,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
List<Object> args = new ArrayList<Object>(coll.size() + 1);
args.add(index);
args.addAll(coll);
Single<Long> f = commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5),
Publisher<Long> f = commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5),
"local ind = table.remove(ARGV, 1); " + // index is the first parameter
"local tail = redis.call('lrange', KEYS[1], ind, -1); " +
"redis.call('ltrim', KEYS[1], 0, ind - 1); " +
@ -239,16 +230,12 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
f.subscribe(toSubscriber(promise));
}
@Override
public void onError(Throwable error) {
promise.onError(error);
}
});
return promise.toSingle();
return promise;
}
@Override
public Single<Boolean> removeAll(Collection<?> c) {
public Publisher<Boolean> removeAll(Collection<?> c) {
return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local v = false " +
"for i = 0, table.getn(ARGV), 1 do "
@ -260,7 +247,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<Boolean> retainAll(Collection<?> c) {
public Publisher<Boolean> retainAll(Collection<?> c) {
return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4),
"local changed = false " +
"local items = redis.call('lrange', KEYS[1], 0, -1) "
@ -286,7 +273,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<V> get(long index) {
public Publisher<V> get(long index) {
return commandExecutor.readObservable(getName(), codec, LINDEX, getName(), index);
}
@ -295,7 +282,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<V> set(long index, V element) {
public Publisher<V> set(long index, V element) {
return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Object>("EVAL", 5),
"local v = redis.call('lindex', KEYS[1], ARGV[1]); " +
"redis.call('lset', KEYS[1], ARGV[1], ARGV[2]); " +
@ -304,17 +291,17 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<Void> fastSet(long index, V element) {
public Publisher<Void> fastSet(long index, V element) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.LSET, getName(), index, element);
}
@Override
public Single<Long> add(long index, V element) {
public Publisher<Long> add(long index, V element) {
return addAll(index, Collections.singleton(element));
}
@Override
public Single<V> remove(int index) {
public Publisher<V> remove(int index) {
if (index == 0) {
return commandExecutor.writeObservable(getName(), codec, LPOP, getName());
}
@ -329,11 +316,11 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<Boolean> contains(Object o) {
public Publisher<Boolean> contains(Object o) {
return indexOf(o, new BooleanNumberReplayConvertor());
}
private <R> Single<R> indexOf(Object o, Convertor<R> convertor) {
private <R> Publisher<R> indexOf(Object o, Convertor<R> convertor) {
return commandExecutor.evalReadObservable(getName(), codec, new RedisCommand<R>("EVAL", convertor, 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +
@ -348,12 +335,12 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
@Override
public Single<Integer> indexOf(Object o) {
public Publisher<Integer> indexOf(Object o) {
return indexOf(o, new IntegerReplayConvertor());
}
@Override
public Single<Integer> lastIndexOf(Object o) {
public Publisher<Integer> lastIndexOf(Object o) {
return commandExecutor.evalReadObservable(getName(), codec, new RedisCommand<Integer>("EVAL", new IntegerReplayConvertor(), 4),
"local key = KEYS[1] " +
"local obj = ARGV[1] " +

@ -15,13 +15,18 @@
*/
package org.redisson;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.RObjectReactive;
import rx.Single;
import rx.SingleSubscriber;
import rx.subjects.PublishSubject;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.broadcast.Broadcaster;
/**
* Base Redisson object
@ -45,27 +50,21 @@ abstract class RedissonObjectReactive implements RObjectReactive {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected <V> SingleSubscriber<V> toSubscriber(final PublishSubject<V> promise) {
return new SingleSubscriber<V>() {
protected <V> Subscriber<V> toSubscriber(final Subscriber<V> promise) {
return new SubscriberBarrier<V, V>(promise) {
@Override
public void onSuccess(V value) {
promise.onNext(value);
promise.onCompleted();
}
@Override
public void onError(Throwable error) {
promise.onError(error);
protected void doSubscribe(Subscription subscription) {
subscription.request(1);
}
};
}
protected <V> PublishSubject<V> newObservable() {
return PublishSubject.create();
protected <V> Processor<V, V> newObservable() {
return Broadcaster.create();
}
protected <V> Single<V> newSucceededObservable(V result) {
return Single.just(result);
protected <V> Stream<V> newSucceededObservable(V result) {
return Streams.just(result);
}
@Override
@ -74,27 +73,27 @@ abstract class RedissonObjectReactive implements RObjectReactive {
}
@Override
public Single<Void> rename(String newName) {
public Publisher<Void> rename(String newName) {
return commandExecutor.writeObservable(getName(), RedisCommands.RENAME, getName(), newName);
}
@Override
public Single<Void> migrate(String host, int port, int database) {
public Publisher<Void> migrate(String host, int port, int database) {
return commandExecutor.writeObservable(getName(), RedisCommands.MIGRATE, host, port, getName(), database);
}
@Override
public Single<Boolean> move(int database) {
public Publisher<Boolean> move(int database) {
return commandExecutor.writeObservable(getName(), RedisCommands.MOVE, getName(), database);
}
@Override
public Single<Boolean> renamenx(String newName) {
public Publisher<Boolean> renamenx(String newName) {
return commandExecutor.writeObservable(getName(), RedisCommands.RENAMENX, getName(), newName);
}
@Override
public Single<Boolean> delete() {
public Publisher<Boolean> delete() {
return commandExecutor.writeObservable(getName(), RedisCommands.DEL_SINGLE, getName());
}

@ -17,7 +17,8 @@ package org.redisson.core;
import java.util.concurrent.TimeUnit;
import rx.Single;
import org.reactivestreams.Publisher;
/**
* object functions
@ -28,12 +29,12 @@ import rx.Single;
*/
public interface RBucketReactive<V> extends RExpirableReactive {
Single<V> get();
Publisher<V> get();
Single<Void> set(V value);
Publisher<Void> set(V value);
Single<Void> set(V value, long timeToLive, TimeUnit timeUnit);
Publisher<Void> set(V value, long timeToLive, TimeUnit timeUnit);
Single<Boolean> exists();
Publisher<Boolean> exists();
}

@ -17,27 +17,26 @@ package org.redisson.core;
import java.util.Collection;
import rx.Observable;
import rx.Single;
import org.reactivestreams.Publisher;
public interface RCollectionReactive<V> extends RExpirableReactive {
Observable<V> iterator();
Publisher<V> iterator();
Single<Boolean> retainAll(Collection<?> c);
Publisher<Boolean> retainAll(Collection<?> c);
Single<Boolean> removeAll(Collection<?> c);
Publisher<Boolean> removeAll(Collection<?> c);
Single<Boolean> contains(Object o);
Publisher<Boolean> contains(Object o);
Single<Boolean> containsAll(Collection<?> c);
Publisher<Boolean> containsAll(Collection<?> c);
Single<Boolean> remove(Object o);
Publisher<Boolean> remove(Object o);
Single<Long> size();
Publisher<Long> size();
Single<Long> add(V e);
Publisher<Long> add(V e);
Single<Long> addAll(Collection<? extends V> c);
Publisher<Long> addAll(Collection<? extends V> c);
}

@ -18,8 +18,7 @@ package org.redisson.core;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Single;
import org.reactivestreams.Publisher;
/**
* Base interface for all Redisson objects
@ -38,7 +37,7 @@ public interface RExpirableReactive extends RObjectReactive {
* @param timeUnit - timeout time unit
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Single<Boolean> expire(long timeToLive, TimeUnit timeUnit);
Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit);
/**
* Set an expire date for object in mode. When expire date comes
@ -47,7 +46,7 @@ public interface RExpirableReactive extends RObjectReactive {
* @param timestamp - expire date
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Single<Boolean> expireAt(Date timestamp);
Publisher<Boolean> expireAt(Date timestamp);
/**
* Set an expire date for object in mode. When expire date comes
@ -56,7 +55,7 @@ public interface RExpirableReactive extends RObjectReactive {
* @param timestamp - expire date in seconds (Unix timestamp)
* @return <code>true</code> if the timeout was set and <code>false</code> if not
*/
Single<Boolean> expireAt(long timestamp);
Publisher<Boolean> expireAt(long timestamp);
/**
* Clear an expire timeout or expire date for object in mode.
@ -64,13 +63,13 @@ public interface RExpirableReactive extends RObjectReactive {
*
* @return <code>true</code> if the timeout was cleared and <code>false</code> if not
*/
Single<Boolean> clearExpire();
Publisher<Boolean> clearExpire();
/**
* Get remaining time to live of object in seconds.
*
* @return <code>-1</code> if object does not exist or time in seconds
*/
Single<Long> remainTimeToLive();
Publisher<Long> remainTimeToLive();
}

@ -17,18 +17,18 @@ package org.redisson.core;
import java.util.Collection;
import rx.Single;
import org.reactivestreams.Publisher;
public interface RHyperLogLogReactive<V> extends RExpirableReactive {
Single<Boolean> add(V obj);
Publisher<Boolean> add(V obj);
Single<Boolean> addAll(Collection<V> objects);
Publisher<Boolean> addAll(Collection<V> objects);
Single<Long> count();
Publisher<Long> count();
Single<Long> countWith(String ... otherLogNames);
Publisher<Long> countWith(String ... otherLogNames);
Single<Void> mergeWith(String ... otherLogNames);
Publisher<Void> mergeWith(String ... otherLogNames);
}

@ -17,8 +17,7 @@ package org.redisson.core;
import java.util.Collection;
import rx.Observable;
import rx.Single;
import org.reactivestreams.Publisher;
/**
* list functions
@ -30,26 +29,26 @@ import rx.Single;
// TODO add sublist support
public interface RListReactive<V> extends RCollectionReactive<V> {
Observable<V> descendingIterator();
Publisher<V> descendingIterator();
Observable<V> descendingIterator(int startIndex);
Publisher<V> descendingIterator(int startIndex);
Observable<V> iterator(int startIndex);
Publisher<V> iterator(int startIndex);
Single<Integer> lastIndexOf(Object o);
Publisher<Integer> lastIndexOf(Object o);
Single<Integer> indexOf(Object o);
Publisher<Integer> indexOf(Object o);
Single<Long> add(long index, V element);
Publisher<Long> add(long index, V element);
Single<Long> addAll(long index, Collection<? extends V> coll);
Publisher<Long> addAll(long index, Collection<? extends V> coll);
Single<Void> fastSet(long index, V element);
Publisher<Void> fastSet(long index, V element);
Single<V> set(long index, V element);
Publisher<V> set(long index, V element);
Single<V> get(long index);
Publisher<V> get(long index);
Single<V> remove(int index);
Publisher<V> remove(int index);
}

@ -15,7 +15,7 @@
*/
package org.redisson.core;
import rx.Single;
import org.reactivestreams.Publisher;
/**
* Base interface for all Redisson objects
@ -36,7 +36,7 @@ public interface RObjectReactive {
* @param database - destination database
* @return
*/
Single<Void> migrate(String host, int port, int database);
Publisher<Void> migrate(String host, int port, int database);
/**
* Move object to another database in mode
@ -44,14 +44,14 @@ public interface RObjectReactive {
* @param database
* @return <code>true</code> if key was moved <code>false</code> if not
*/
Single<Boolean> move(int database);
Publisher<Boolean> move(int database);
/**
* Delete object in mode
*
* @return <code>true</code> if object was deleted <code>false</code> if not
*/
Single<Boolean> delete();
Publisher<Boolean> delete();
/**
* Rename current object key to <code>newName</code>
@ -60,7 +60,7 @@ public interface RObjectReactive {
* @param newName
* @return
*/
Single<Void> rename(String newName);
Publisher<Void> rename(String newName);
/**
* Rename current object key to <code>newName</code>
@ -69,6 +69,6 @@ public interface RObjectReactive {
* @param newName
* @return
*/
Single<Boolean> renamenx(String newName);
Publisher<Boolean> renamenx(String newName);
}

@ -1,11 +1,17 @@
package org.redisson;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.Single;
import reactor.fn.Consumer;
import reactor.rx.Streams;
public abstract class BaseReactiveTest {
@ -21,8 +27,16 @@ public abstract class BaseReactiveTest {
redisson.shutdown();
}
public <V> V sync(Single<V> ob) {
return ob.toBlocking().value();
public <V> Iterator<V> toIterator(Publisher<V> pub) {
return Streams.create(pub).toList().poll().iterator();
}
public <V> V sync(Publisher<V> ob) {
List<V> t = Streams.create(ob).toList().poll();
if (t == null) {
return null;
}
return t.iterator().next();
}
public static Config createConfig() {

@ -8,19 +8,24 @@ import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.redisson.client.RedisException;
import org.redisson.core.RListReactive;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import rx.Observable;
import rx.Single;
import rx.SingleSubscriber;
import reactor.core.queue.CompletableBlockingQueue;
import reactor.fn.Consumer;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
public class RedissonListReactiveTest extends BaseReactiveTest {
@ -30,19 +35,20 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(test2.add("foo"));
sync(test2.add(0, "bar"));
MatcherAssert.assertThat(test2.iterator().toBlocking().toIterable(), Matchers.contains("bar", "foo"));
MatcherAssert.assertThat(sync(test2), Matchers.contains("bar", "foo"));
}
@Test
public void testAddAllWithIndex() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");
final CountDownLatch latch = new CountDownLatch(1);
list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new SingleSubscriber<Long>() {
list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new Promise<Long>() {
@Override
public void onSuccess(Long value) {
list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new SingleSubscriber<Long>() {
public void onNext(Long element) {
list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new Promise<Long>() {
@Override
public void onSuccess(Long value) {
public void onNext(Long value) {
latch.countDown();
}
@ -68,12 +74,12 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
public void testAdd() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");
final CountDownLatch latch = new CountDownLatch(1);
list.add(1L).subscribe(new SingleSubscriber<Long>() {
list.add(1L).subscribe(new Promise<Long>() {
@Override
public void onSuccess(Long value) {
list.add(2L).subscribe(new SingleSubscriber<Long>() {
public void onNext(Long value) {
list.add(2L).subscribe(new Promise<Long>() {
@Override
public void onSuccess(Long value) {
public void onNext(Long value) {
latch.countDown();
}
@ -96,7 +102,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
}
private <V> Iterable<V> sync(RListReactive<V> list) {
return list.iterator().toBlocking().toIterable();
return Streams.create(list.iterator()).toList().poll();
}
@Test
@ -122,7 +128,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(0));
sync(list.add(10));
Iterator<Integer> iterator = list.iterator().toBlocking().getIterator();
Iterator<Integer> iterator = toIterator(list.iterator());
Assert.assertTrue(1 == iterator.next());
Assert.assertTrue(2 == iterator.next());
@ -151,7 +157,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(0));
sync(list.add(10));
Iterator<Integer> iterator = list.descendingIterator().toBlocking().getIterator();
Iterator<Integer> iterator = toIterator(list.descendingIterator());
Assert.assertTrue(10 == iterator.next());
Assert.assertTrue(0 == iterator.next());
@ -175,7 +181,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(4));
sync(list.add(5));
Assert.assertEquals(-1, list.lastIndexOf(10).toBlocking().value().intValue());
Assert.assertEquals(-1, sync(list.lastIndexOf(10)).intValue());
}
@Test
@ -192,7 +198,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(0));
sync(list.add(10));
int index = list.lastIndexOf(3).toBlocking().value();
int index = sync(list.lastIndexOf(3));
Assert.assertEquals(2, index);
}
@ -210,7 +216,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(0));
sync(list.add(10));
int index = list.lastIndexOf(3).toBlocking().value();
int index = sync(list.lastIndexOf(3));
Assert.assertEquals(5, index);
}
@ -228,7 +234,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(3));
sync(list.add(10));
int index = list.lastIndexOf(3).toBlocking().value();
int index = sync(list.lastIndexOf(3));
Assert.assertEquals(8, index);
}
@ -239,10 +245,10 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(i));
}
Assert.assertTrue(55 == list.indexOf(56).toBlocking().value());
Assert.assertTrue(99 == list.indexOf(100).toBlocking().value());
Assert.assertTrue(-1 == list.indexOf(200).toBlocking().value());
Assert.assertTrue(-1 == list.indexOf(0).toBlocking().value());
Assert.assertTrue(55 == sync(list.indexOf(56)));
Assert.assertTrue(99 == sync(list.indexOf(100)));
Assert.assertTrue(-1 == sync(list.indexOf(200)));
Assert.assertTrue(-1 == sync(list.indexOf(0)));
}
@Test
@ -254,7 +260,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
sync(list.add(4));
sync(list.add(5));
Integer val = list.remove(0).toBlocking().value();
Integer val = sync(list.remove(0));
Assert.assertTrue(1 == val);
Assert.assertThat(sync(list), Matchers.contains(2, 3, 4, 5));
@ -435,7 +441,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
public void testContainsAllEmpty() {
RListReactive<Integer> list = redisson.getList("list");
for (int i = 0; i < 200; i++) {
list.add(i);
sync(list.add(i));
}
Assert.assertTrue(sync(list.containsAll(Collections.emptyList())));
@ -458,7 +464,7 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
private void checkIterator(RListReactive<String> list) {
int iteration = 0;
for (Iterator<String> iterator = list.iterator().toBlocking().getIterator(); iterator.hasNext();) {
for (Iterator<String> iterator = toIterator(list.iterator()); iterator.hasNext();) {
String value = iterator.next();
String val = sync(list.get(iteration));
Assert.assertEquals(val, value);

Loading…
Cancel
Save