Migration to Project Reactor 3.1.0. #958

pull/1303/head
Nikita 8 years ago
parent d705fbe050
commit 1cc59c6337

@ -70,8 +70,8 @@
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-stream</artifactId>
<version>2.0.8.RELEASE</version>
<artifactId>reactor-core</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>
<dependency>

@ -291,7 +291,7 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public RBatchReactive createBatch() {
RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager);
RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager, commandExecutor);
if (config.isRedissonReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}

@ -18,8 +18,10 @@ package org.redisson.command;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonReactiveClient;
@ -28,10 +30,9 @@ import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.redisson.misc.RPromise;
import org.redisson.reactive.NettyFuturePublisher;
import reactor.fn.Supplier;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.MonoProcessor;
/**
*
@ -50,7 +51,7 @@ public class CommandReactiveBatchService extends CommandReactiveService {
@Override
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
NettyFuturePublisher<R> publisher = new NettyFuturePublisher<R>(supplier);
Publisher<R> publisher = super.reactive(supplier);
publishers.add(publisher);
return publisher;
}
@ -75,12 +76,26 @@ public class CommandReactiveBatchService extends CommandReactiveService {
private RFuture<Void> executeAsyncVoid(boolean noResult, long responseTimeout, int retryAttempts, long retryInterval) {
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
});
Flux.from(publisher).subscribe();
// publisher.subscribe(new Subscriber<Object>() {
//
// @Override
// public void onSubscribe(Subscription s) {
// s.request(1);
// }
//
// @Override
// public void onError(Throwable t) {
// }
//
// @Override
// public void onComplete() {
// }
//
// @Override
// public void onNext(Object t) {
// }
// });
}
return batchService.executeAsyncVoid(noResult, responseTimeout, retryAttempts, retryInterval);
}
@ -99,12 +114,26 @@ public class CommandReactiveBatchService extends CommandReactiveService {
public RFuture<List<?>> executeAsync(long responseTimeout, int retryAttempts, long retryInterval) {
for (Publisher<?> publisher : publishers) {
publisher.subscribe(new DefaultSubscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
});
Flux.from(publisher).subscribe();
// publisher.subscribe(new Subscriber<Object>() {
//
// @Override
// public void onSubscribe(Subscription s) {
// s.request(1);
// }
//
// @Override
// public void onError(Throwable t) {
// }
//
// @Override
// public void onComplete() {
// }
//
// @Override
// public void onNext(Object t) {
// }
// });
}
return batchService.executeAsync(responseTimeout, retryAttempts, retryInterval);

@ -18,17 +18,15 @@ package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.SlotCallback;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov

@ -18,6 +18,7 @@ package org.redisson.command;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.SlotCallback;
@ -26,9 +27,8 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.reactive.NettyFuturePublisher;
import reactor.fn.Supplier;
import reactor.core.publisher.Flux;
/**
*
@ -51,8 +51,22 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
});
}
@Override
public <R> Publisher<R> reactive(Supplier<RFuture<R>> supplier) {
return new NettyFuturePublisher<R>(supplier);
return Flux.create(emitter -> {
emitter.onRequest(n -> {
supplier.get().whenComplete((v, e) -> {
if (e != null) {
emitter.error(e);
return;
}
if (v != null) {
emitter.next(v);
}
emitter.complete();
});
});
});
}
@Override

@ -1,72 +0,0 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import org.reactivestreams.Subscriber;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.support.Exceptions;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
/**
*
* @author Nikita Koksharov
*
* @param <T> return type
*/
public class NettyFuturePublisher<T> extends Stream<T> {
private final Supplier<RFuture<T>> supplier;
public NettyFuturePublisher(Supplier<RFuture<T>> supplier) {
this.supplier = supplier;
}
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
try {
subscriber.onSubscribe(new ReactiveSubscription<T>(this, subscriber) {
@Override
protected void onRequest(long n) {
supplier.get().addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
onError(future.cause());
return;
}
if (future.getNow() != null) {
onNext(future.getNow());
}
onComplete();
}
});
}
});
} catch (Throwable throwable) {
Exceptions.throwIfFatal(throwable);
subscriber.onError(throwable);
}
}
}

@ -15,15 +15,15 @@
*/
package org.redisson.reactive;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.api.RCollectionReactive;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
/**
*
@ -44,9 +44,8 @@ public class PublisherAdder<V> {
}
public Publisher<Integer> addAll(Publisher<? extends V> c) {
final Promise<Integer> promise = Promises.prepare();
c.subscribe(new DefaultSubscriber<V>() {
CompletableFuture<Integer> promise = new CompletableFuture<>();
c.subscribe(new BaseSubscriber<V>() {
volatile boolean completed;
AtomicLong values = new AtomicLong();
@ -54,47 +53,47 @@ public class PublisherAdder<V> {
Integer lastSize = 0;
@Override
public void onSubscribe(Subscription s) {
protected void hookOnSubscribe(Subscription s) {
this.s = s;
s.request(1);
}
@Override
public void onNext(V o) {
protected void hookOnNext(V o) {
values.getAndIncrement();
destination.add(o).subscribe(new DefaultSubscriber<Integer>() {
destination.add(o).subscribe(new BaseSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
protected void hookOnSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onError(Throwable t) {
promise.onError(t);
protected void hookOnError(Throwable t) {
promise.completeExceptionally(t);
}
@Override
public void onNext(Integer o) {
protected void hookOnNext(Integer o) {
lastSize = sum(lastSize, o);
s.request(1);
if (values.decrementAndGet() == 0 && completed) {
promise.onNext(lastSize);
promise.complete(lastSize);
}
}
});
}
@Override
public void onComplete() {
protected void hookOnComplete() {
completed = true;
if (values.get() == 0) {
promise.onNext(lastSize);
promise.complete(lastSize);
}
}
});
return promise;
return Mono.fromCompletionStage(promise);
}
}

@ -15,6 +15,8 @@
*/
package org.redisson.reactive;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonAtomicLong;
import org.redisson.api.RAtomicLongAsync;
@ -22,8 +24,7 @@ import org.redisson.api.RAtomicLongReactive;
import org.redisson.api.RFuture;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
import reactor.rx.Streams;
import reactor.core.publisher.Mono;
/**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
@ -127,7 +128,7 @@ public class RedissonAtomicLongReactive extends RedissonExpirableReactive implem
}
public String toString() {
return Long.toString(Streams.create(get()).next().poll());
return Long.toString(Mono.from(get()).block());
}
}

@ -16,6 +16,7 @@
package org.redisson.reactive;
import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.api.RAtomicLongReactive;
@ -40,11 +41,10 @@ import org.redisson.api.RTopicReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveBatchService;
import org.redisson.command.CommandReactiveService;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov
@ -54,10 +54,12 @@ public class RedissonBatchReactive implements RBatchReactive {
private final EvictionScheduler evictionScheduler;
private final CommandReactiveBatchService executorService;
private final CommandReactiveService commandExecutor;
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager) {
public RedissonBatchReactive(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, CommandReactiveService commandExecutor) {
this.evictionScheduler = evictionScheduler;
this.executorService = new CommandReactiveBatchService(connectionManager);
this.commandExecutor = commandExecutor;
}
@Override
@ -207,7 +209,7 @@ public class RedissonBatchReactive implements RBatchReactive {
@Override
public Publisher<List<?>> execute() {
return new NettyFuturePublisher<List<?>>(new Supplier<RFuture<List<?>>>() {
return commandExecutor.reactive(new Supplier<RFuture<List<?>>>() {
@Override
public RFuture<List<?>> get() {
return executorService.executeAsync();

@ -16,6 +16,7 @@
package org.redisson.reactive;
import java.util.BitSet;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonBitSet;
@ -25,8 +26,7 @@ import org.redisson.client.codec.BitSetCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
import reactor.rx.Streams;
import reactor.core.publisher.Mono;
/**
*
@ -215,7 +215,7 @@ public class RedissonBitSetReactive extends RedissonExpirableReactive implements
@Override
public String toString() {
return Streams.create(asBitSet()).next().poll().toString();
return Mono.from(asBitSet()).block().toString();
}
}

@ -16,6 +16,7 @@
package org.redisson.reactive;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonBucket;
@ -25,8 +26,6 @@ import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov

@ -18,6 +18,9 @@ package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@ -31,10 +34,8 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveService;
import org.redisson.connection.MasterSlaveEntry;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.subscription.ReactiveSubscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
*
@ -69,7 +70,7 @@ public class RedissonKeysReactive implements RKeysReactive {
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
publishers.add(createKeysIterator(entry, pattern));
}
return Streams.merge(publishers);
return Flux.merge(publishers);
}
@Override
@ -85,25 +86,23 @@ public class RedissonKeysReactive implements RKeysReactive {
}
private Publisher<String> createKeysIterator(final MasterSlaveEntry entry, final String pattern) {
return new Stream<String>() {
return Flux.create(new Consumer<FluxSink<String>>() {
@Override
public void subscribe(final Subscriber<? super String> t) {
t.onSubscribe(new ReactiveSubscription<String>(this, t) {
public void accept(FluxSink<String> emitter) {
emitter.onRequest(new LongConsumer() {
private List<String> firstValues;
private long nextIterPos;
private long currentIndex;
@Override
protected void onRequest(final long n) {
currentIndex = n;
nextValues();
public void accept(long value) {
currentIndex = value;
nextValues(emitter);
}
protected void nextValues() {
final ReactiveSubscription<String> m = this;
protected void nextValues(FluxSink<String> emitter) {
scanIterator(entry, nextIterPos, pattern).subscribe(new Subscriber<ListScanResult<String>>() {
@Override
@ -117,7 +116,7 @@ public class RedissonKeysReactive implements RKeysReactive {
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
emitter.complete();
currentIndex = 0;
return;
}
@ -127,22 +126,22 @@ public class RedissonKeysReactive implements RKeysReactive {
nextIterPos = -1;
}
for (String val : res.getValues()) {
m.onNext(val);
emitter.next(val);
currentIndex--;
if (currentIndex == 0) {
m.onComplete();
emitter.complete();
return;
}
}
if (nextIterPos == -1) {
m.onComplete();
emitter.complete();
currentIndex = 0;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
emitter.error(error);
}
@Override
@ -150,14 +149,16 @@ public class RedissonKeysReactive implements RKeysReactive {
if (currentIndex == 0) {
return;
}
nextValues();
nextValues(emitter);
}
});
}
});
}
};
});
}
@Override

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonLexSortedSet;
@ -28,7 +29,6 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
*

@ -23,6 +23,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@ -35,12 +40,9 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.subscription.ReactiveSubscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
/**
* Distributed and concurrent implementation of {@link java.util.List}
@ -89,17 +91,20 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
}
private Publisher<V> iterator(final int startIndex, final boolean forward) {
return new Stream<V>() {
return Flux.create(new Consumer<FluxSink<V>>() {
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
private int currentIndex = startIndex;
public void accept(FluxSink<V> emitter) {
emitter.onRequest(new LongConsumer() {
int currentIndex = startIndex;
@Override
protected void onRequest(final long n) {
final ReactiveSubscription<V> m = this;
public void accept(long value) {
onRequest(forward, emitter, value);
}
protected void onRequest(final boolean forward, FluxSink<V> emitter, long n) {
get(currentIndex).subscribe(new Subscriber<V>() {
V currValue;
@ -111,7 +116,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public void onNext(V value) {
currValue = value;
m.onNext(value);
emitter.next(value);
if (forward) {
currentIndex++;
} else {
@ -121,26 +126,27 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
@Override
public void onError(Throwable error) {
m.onError(error);
emitter.error(error);
}
@Override
public void onComplete() {
if (currValue == null) {
m.onComplete();
emitter.complete();
return;
}
if (n-1 == 0) {
return;
}
onRequest(n-1);
onRequest(forward, emitter, n-1);
}
});
}
});
}
};
});
}
@Override
@ -321,23 +327,23 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
if (!(o instanceof RedissonListReactive))
return false;
Stream<Object> e1 = Streams.wrap((Publisher<Object>)iterator());
Stream<Object> e2 = Streams.wrap(((RedissonListReactive<Object>) o).iterator());
Long count = Streams.merge(e1, e2).groupBy(new Function<Object, Object>() {
Flux<Object> e1 = Flux.from((Publisher<Object>)iterator());
Flux<Object> e2 = Flux.from(((RedissonListReactive<Object>) o).iterator());
Long count = Flux.merge(e1, e2).groupBy(new Function<Object, Object>() {
@Override
public Object apply(Object t) {
return t;
}
}).count().next().poll();
}).count().block();
boolean res = count.intValue() == Streams.wrap(size()).next().poll();
res &= count.intValue() == Streams.wrap(((RedissonListReactive<Object>) o).size()).next().poll();
boolean res = count.intValue() == Mono.from(size()).block();
res &= count.intValue() == Mono.from(((RedissonListReactive<Object>) o).size()).block();
return res;
}
@Override
public int hashCode() {
Integer hash = Streams.wrap(iterator()).map(new Function<V, Integer>() {
Integer hash = Flux.from(iterator()).map(new Function<V, Integer>() {
@Override
public Integer apply(V t) {
return t.hashCode();
@ -348,7 +354,7 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
public Integer apply(Integer t, Integer u) {
return 31*t + u;
}
}).next().poll();
}).block();
if (hash == null) {
return 1;

@ -17,6 +17,7 @@ package org.redisson.reactive;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonLock;
@ -26,8 +27,6 @@ import org.redisson.api.RLockReactive;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov

@ -21,6 +21,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMapCache;
@ -35,10 +38,8 @@ import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.fn.Supplier;
import reactor.rx.Streams;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* <p>Map-based cache with ability to set TTL for each entry via
@ -285,27 +286,27 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
@Override
public Publisher<Map.Entry<K, V>> entryIterator() {
return new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this).stream();
return Flux.create(new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this));
}
@Override
public Publisher<V> valueIterator() {
return new RedissonMapReactiveIterator<K, V, V>(this) {
return Flux.create(new RedissonMapReactiveIterator<K, V, V>(this) {
@Override
V getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
}
}.stream();
});
}
@Override
public Publisher<K> keyIterator() {
return new RedissonMapReactiveIterator<K, V, K>(this) {
return Flux.create(new RedissonMapReactiveIterator<K, V, K>(this) {
@Override
K getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
}
}.stream();
});
}
@Override
@ -325,18 +326,18 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
if (o instanceof Map) {
final Map<?,?> m = (Map<?,?>) o;
if (m.size() != Streams.create(size()).next().poll()) {
if (m.size() != Mono.from(size()).block()) {
return false;
}
return Streams.create(entryIterator()).map(mapFunction(m)).reduce(true, booleanAnd()).next().poll();
return Flux.from(entryIterator()).map(mapFunction(m)).reduce(true, booleanAnd()).block();
} else if (o instanceof RMapReactive) {
final RMapReactive<Object, Object> m = (RMapReactive<Object, Object>) o;
if (Streams.create(m.size()).next().poll() != Streams.create(size()).next().poll()) {
if (Mono.from(m.size()).block() != Mono.from(size()).block()) {
return false;
}
return Streams.create(entryIterator()).map(mapFunction(m)).reduce(true, booleanAnd()).next().poll();
return Flux.from(entryIterator()).map(mapFunction(m)).reduce(true, booleanAnd()).block();
}
return true;
@ -377,10 +378,10 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
Object key = e.getKey();
Object value = e.getValue();
if (value == null) {
if (!(Streams.create(m.get(key)).next().poll() ==null && Streams.create(m.containsKey(key)).next().poll()))
if (!(Mono.from(m.get(key)).block() == null && Mono.from(m.containsKey(key)).block()))
return false;
} else {
if (!value.equals(Streams.create(m.get(key)).next().poll()))
if (!value.equals(Mono.from(m.get(key)).block()))
return false;
}
return true;
@ -390,7 +391,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
@Override
public int hashCode() {
return Streams.create(entryIterator()).map(new Function<Map.Entry<K, V>, Integer>() {
return Flux.from(entryIterator()).map(new Function<Map.Entry<K, V>, Integer>() {
@Override
public Integer apply(Entry<K, V> t) {
return t.hashCode();
@ -401,7 +402,7 @@ public class RedissonMapCacheReactive<K, V> extends RedissonExpirableReactive im
public Integer apply(Integer t, Integer u) {
return t + u;
}
}).next().poll();
}).block();
}
@Override

@ -20,6 +20,9 @@ import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonMap;
@ -34,11 +37,8 @@ import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.fn.Supplier;
import reactor.rx.Streams;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
@ -290,27 +290,27 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
@Override
public Publisher<Map.Entry<K, V>> entryIterator() {
return new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this).stream();
return Flux.create(new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this));
}
@Override
public Publisher<V> valueIterator() {
return new RedissonMapReactiveIterator<K, V, V>(this) {
return Flux.create(new RedissonMapReactiveIterator<K, V, V>(this) {
@Override
V getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (V) entry.getValue().getObj();
}
}.stream();
});
}
@Override
public Publisher<K> keyIterator() {
return new RedissonMapReactiveIterator<K, V, K>(this) {
return Flux.create(new RedissonMapReactiveIterator<K, V, K>(this) {
@Override
K getValue(Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (K) entry.getKey().getObj();
}
}.stream();
});
}
@Override
@ -330,18 +330,18 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
if (o instanceof Map) {
final Map<?,?> m = (Map<?,?>) o;
if (m.size() != Streams.create(size()).next().poll()) {
if (m.size() != Mono.from(size()).block()) {
return false;
}
return Streams.create(entryIterator()).map(mapFunction(m)).reduce(true, booleanAnd()).next().poll();
return Flux.from(entryIterator()).map(mapFunction(m)).reduce(true, booleanAnd()).block();
} else if (o instanceof RMapReactive) {
final RMapReactive<Object, Object> m = (RMapReactive<Object, Object>) o;
if (Streams.create(m.size()).next().poll() != Streams.create(size()).next().poll()) {
if (Mono.from(m.size()).block() != Mono.from(size()).block()) {
return false;
}
return Streams.create(entryIterator()).map(mapFunction(m)).reduce(true, booleanAnd()).next().poll();
return Flux.from(entryIterator()).map(mapFunction(m)).reduce(true, booleanAnd()).block();
}
return true;
@ -382,10 +382,10 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
Object key = e.getKey();
Object value = e.getValue();
if (value == null) {
if (!(Streams.create(m.get(key)).next().poll() ==null && Streams.create(m.containsKey(key)).next().poll()))
if (!(Mono.from(m.get(key)).block() ==null && Mono.from(m.containsKey(key)).block()))
return false;
} else {
if (!value.equals(Streams.create(m.get(key)).next().poll()))
if (!value.equals(Mono.from(m.get(key)).block()))
return false;
}
return true;
@ -395,7 +395,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
@Override
public int hashCode() {
return Streams.create(entryIterator()).map(new Function<Map.Entry<K, V>, Integer>() {
return Flux.from(entryIterator()).map(new Function<Map.Entry<K, V>, Integer>() {
@Override
public Integer apply(Entry<K, V> t) {
return t.hashCode();
@ -406,7 +406,7 @@ public class RedissonMapReactive<K, V> extends RedissonExpirableReactive impleme
public Integer apply(Integer t, Integer u) {
return t + u;
}
}).next().poll();
}).block();
}
}

@ -20,6 +20,9 @@ import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@ -28,8 +31,8 @@ import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
/**
*
@ -39,104 +42,156 @@ import reactor.rx.subscription.ReactiveSubscription;
* @param <V> value type
* @param <M> entry type
*/
public class RedissonMapReactiveIterator<K, V, M> {
public class RedissonMapReactiveIterator<K, V, M> implements Consumer<FluxSink<M>> {
private final MapReactive<K, V> map;
public RedissonMapReactiveIterator(MapReactive<K, V> map) {
this.map = map;
}
public Publisher<M> stream() {
return new Stream<M>() {
@Override
public void accept(FluxSink<M> emitter) {
emitter.onRequest(new LongConsumer() {
private Map<ByteBuf, ByteBuf> firstValues;
private Map<ByteBuf, ByteBuf> lastValues;
private long nextIterPos;
private InetSocketAddress client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
@Override
public void subscribe(final Subscriber<? super M> t) {
t.onSubscribe(new ReactiveSubscription<M>(this, t) {
private Map<ByteBuf, ByteBuf> firstValues;
private long iterPos = 0;
private InetSocketAddress client;
private long currentIndex;
public void accept(long value) {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues(emitter);
completed = false;
}
};
protected void nextValues(FluxSink<M> emitter) {
map.scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<MapScanResult<ScanObjectEntry, ScanObjectEntry>>() {
@Override
protected void onRequest(final long n) {
currentIndex = n;
nextValues();
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
private void free(Map<ByteBuf, ByteBuf> map) {
if (map == null) {
return;
}
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();
}
return result;
}
protected void nextValues() {
final ReactiveSubscription<M> m = this;
map.scanIteratorReactive(client, iterPos).subscribe(new Subscriber<MapScanResult<ScanObjectEntry, ScanObjectEntry>>() {
@Override
public void onNext(MapScanResult<ScanObjectEntry, ScanObjectEntry> res) {
if (finished) {
free(firstValues);
free(lastValues);
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
return;
}
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
private void free(Map<ByteBuf, ByteBuf> map) {
if (map == null) {
return;
}
for (Entry<ByteBuf, ByteBuf> entry : map.entrySet()) {
entry.getKey().release();
entry.getValue().release();
}
long prevIterPos = nextIterPos;
if (lastValues != null) {
free(lastValues);
}
lastValues = convert(res.getMap());
client = res.getRedisClient();
if (nextIterPos == 0 && firstValues == null) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
}
@Override
public void onNext(MapScanResult<ScanObjectEntry, ScanObjectEntry> res) {
client = res.getRedisClient();
if (iterPos == 0 && firstValues == null) {
firstValues = convert(res.getMap());
} else if (convert(res.getMap()).equals(firstValues)) {
free(firstValues);
m.onComplete();
currentIndex = 0;
return;
}
iterPos = res.getPos();
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : res.getMap().entrySet()) {
M val = getValue(entry);
m.onNext(val);
currentIndex--;
if (currentIndex == 0) {
m.onComplete();
} else {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty()) {
if (res.getPos() == 0) {
finished = true;
emitter.complete();
return;
}
}
} else if (lastValues.keySet().removeAll(firstValues.keySet())) {
free(firstValues);
free(lastValues);
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
prevIterPos = -1;
finished = true;
emitter.complete();
return;
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
}
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : res.getMap().entrySet()) {
M val = getValue(entry);
emitter.next(val);
elementsRead.incrementAndGet();
}
@Override
public void onComplete() {
if (currentIndex == 0) {
return;
}
nextValues();
}
});
nextIterPos = res.getPos();
if (elementsRead.get() >= readAmount.get()) {
emitter.complete();
elementsRead.set(0);
completed = true;
return;
}
if (prevIterPos == nextIterPos) {
finished = true;
emitter.complete();
}
}
@Override
public void onError(Throwable error) {
emitter.error(error);
}
@Override
public void onComplete() {
if (finished || completed) {
return;
}
nextValues(emitter);
}
});
}
};
});
}
private Map<ByteBuf, ByteBuf> convert(Map<ScanObjectEntry, ScanObjectEntry> map) {
Map<ByteBuf, ByteBuf> result = new HashMap<ByteBuf, ByteBuf>(map.size());
for (Entry<ScanObjectEntry, ScanObjectEntry> entry : map.entrySet()) {
result.put(entry.getKey().getBuf(), entry.getValue().getBuf());
}
return result;
}
M getValue(final Entry<ScanObjectEntry, ScanObjectEntry> entry) {
return (M)new AbstractMap.SimpleEntry<K, V>((K)entry.getKey().getObj(), (V)entry.getValue().getObj()) {
@ -144,7 +199,7 @@ public class RedissonMapReactiveIterator<K, V, M> {
@Override
public V setValue(V value) {
Publisher<V> publisher = map.put((K) entry.getKey().getObj(), value);
return ((Stream<V>)publisher).next().poll();
return Mono.from(publisher).block();
}
};

@ -17,6 +17,7 @@ package org.redisson.reactive;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonReference;
@ -28,9 +29,7 @@ import org.redisson.command.CommandReactiveExecutor;
import org.redisson.misc.RedissonObjectFactory;
import io.netty.buffer.ByteBuf;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.core.publisher.Mono;
/**
* Base Redisson object
@ -58,8 +57,8 @@ abstract class RedissonObjectReactive implements RObjectReactive {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
protected <V> Stream<V> newSucceeded(V result) {
return Streams.just(result);
protected <V> Mono<V> newSucceeded(V result) {
return Mono.just(result);
}
@Override

@ -17,6 +17,7 @@ package org.redisson.reactive;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.PubSubPatternMessageListener;
@ -34,7 +35,6 @@ import org.redisson.pubsub.AsyncSemaphore;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.fn.Supplier;
/**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
@ -61,7 +61,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
@Override
public Publisher<Integer> addListener(final PatternStatusListener listener) {
return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() {
return commandExecutor.reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
RPromise<Integer> promise = commandExecutor.getConnectionManager().newPromise();
@ -73,7 +73,7 @@ public class RedissonPatternTopicReactive<M> implements RPatternTopicReactive<M>
@Override
public Publisher<Integer> addListener(final PatternMessageListener<M> listener) {
return new NettyFuturePublisher<Integer>(new Supplier<RFuture<Integer>>() {
return commandExecutor.reactive(new Supplier<RFuture<Integer>>() {
@Override
public RFuture<Integer> get() {
RPromise<Integer> promise = commandExecutor.getConnectionManager().newPromise();

@ -18,6 +18,7 @@ package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonScoredSortedSet;
@ -33,7 +34,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
import reactor.core.publisher.Flux;
/**
*
@ -181,12 +182,12 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
@Override
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonScoredSortedSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};
});
}
@Override

@ -17,6 +17,7 @@ package org.redisson.reactive;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonLock;
@ -29,8 +30,6 @@ import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.pubsub.SemaphorePubSub;
import reactor.fn.Supplier;
/**
*
* @author Nikita Koksharov

@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSetCache;
@ -34,7 +35,7 @@ import org.redisson.command.CommandReactiveExecutor;
import org.redisson.eviction.EvictionScheduler;
import io.netty.buffer.ByteBuf;
import reactor.fn.Supplier;
import reactor.core.publisher.Flux;
/**
* <p>Set-based cache with ability to set TTL for each entry via
@ -96,12 +97,12 @@ public class RedissonSetCacheReactive<V> extends RedissonExpirableReactive imple
@Override
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonSetCacheReactive.this.scanIterator(client, nextIterPos);
}
};
});
}
@Override

@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.RedissonSet;
@ -33,7 +34,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
import reactor.core.publisher.Flux;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
@ -209,12 +210,12 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
@Override
public Publisher<V> iterator() {
return new SetReactiveIterator<V>() {
return Flux.create(new SetReactiveIterator<V>() {
@Override
protected Publisher<ListScanResult<ScanObjectEntry>> scanIteratorReactive(InetSocketAddress client, long nextIterPos) {
return RedissonSetReactive.this.scanIteratorReactive(client, nextIterPos);
}
};
});
}
}

@ -17,6 +17,7 @@ package org.redisson.reactive;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.redisson.PubSubMessageListener;
@ -31,8 +32,6 @@ import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.Supplier;
/**
* Distributed topic implementation. Messages are delivered to all message listeners across Redis cluster.
*

@ -18,6 +18,9 @@ package org.redisson.reactive;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@ -26,8 +29,7 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import io.netty.buffer.ByteBuf;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
import reactor.core.publisher.FluxSink;
/**
*
@ -35,32 +37,32 @@ import reactor.rx.subscription.ReactiveSubscription;
*
* @param <V> value type
*/
public abstract class SetReactiveIterator<V> extends Stream<V> {
public abstract class SetReactiveIterator<V> implements Consumer<FluxSink<V>> {
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
public void accept(FluxSink<V> emitter) {
emitter.onRequest(new LongConsumer() {
private List<ByteBuf> firstValues;
private List<ByteBuf> lastValues;
private long nextIterPos;
private InetSocketAddress client;
private AtomicLong elementsRead = new AtomicLong();
private boolean finished;
private volatile boolean completed;
private AtomicLong readAmount = new AtomicLong();
@Override
protected void onRequest(long n) {
nextValues();
}
private void handle(List<ScanObjectEntry> vals) {
for (ScanObjectEntry val : vals) {
onNext((V)val.getObj());
public void accept(long value) {
readAmount.addAndGet(value);
if (completed || elementsRead.get() == 0) {
nextValues(emitter);
completed = false;
}
}
protected void nextValues() {
final ReactiveSubscription<V> m = this;
protected void nextValues(FluxSink<V> emitter) {
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<ScanObjectEntry>>() {
@Override
@ -105,7 +107,7 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
if (firstValues.isEmpty()) {
if (res.getPos() == 0) {
finished = true;
m.onComplete();
emitter.complete();
return;
}
}
@ -119,37 +121,50 @@ public abstract class SetReactiveIterator<V> extends Stream<V> {
nextIterPos = 0;
prevIterPos = -1;
finished = true;
m.onComplete();
emitter.complete();
return;
}
}
handle(res.getValues());
for (ScanObjectEntry val : res.getValues()) {
emitter.next((V)val.getObj());
elementsRead.incrementAndGet();
}
nextIterPos = res.getPos();
if (elementsRead.get() >= readAmount.get()) {
emitter.complete();
elementsRead.set(0);
completed = true;
return;
}
if (prevIterPos == nextIterPos) {
finished = true;
m.onComplete();
emitter.complete();
}
}
@Override
public void onError(Throwable error) {
m.onError(error);
emitter.error(error);
}
@Override
public void onComplete() {
if (finished) {
if (finished || completed) {
return;
}
nextValues();
nextValues(emitter);
}
});
}
});
}
protected boolean tryAgain() {
return false;
}
private void free(List<ByteBuf> list) {
if (list == null) {

@ -21,8 +21,8 @@ import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.Assert;
import org.w3c.dom.Element;
import reactor.core.support.Assert;
/**
*

@ -22,9 +22,9 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.w3c.dom.Element;
import reactor.core.support.Assert;
/**
*

@ -25,8 +25,8 @@ import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.Assert;
import org.w3c.dom.Element;
import reactor.core.support.Assert;
/**
*

@ -13,8 +13,8 @@ import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.config.Config;
import reactor.rx.Promise;
import reactor.rx.Streams;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public abstract class BaseReactiveTest {
@ -59,34 +59,23 @@ public abstract class BaseReactiveTest {
}
public static <V> Iterable<V> sync(RScoredSortedSetReactive<V> list) {
return Streams.create(list.iterator()).toList().poll();
return toIterable(list.iterator());
}
public static <V> Iterable<V> sync(RCollectionReactive<V> list) {
return Streams.create(list.iterator()).toList().poll();
return toIterable(list.iterator());
}
public static <V> Iterator<V> toIterator(Publisher<V> pub) {
return Streams.create(pub).toList().poll().iterator();
return Flux.from(pub).toIterable().iterator();
}
public static <V> Iterable<V> toIterable(Publisher<V> pub) {
return Streams.create(pub).toList().poll();
return Flux.from(pub).toIterable();
}
public static <V> V sync(Publisher<V> ob) {
Promise<V> promise;
if (Promise.class.isAssignableFrom(ob.getClass())) {
promise = (Promise<V>) ob;
} else {
promise = Streams.wrap(ob).next();
}
V val = promise.poll();
if (promise.isError()) {
throw new RuntimeException(promise.reason());
}
return val;
return Mono.from(ob).block();
}
public static RedissonReactiveClient createInstance() {

@ -4,17 +4,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RCollectionReactive;
import org.redisson.api.RLexSortedSetReactive;
import reactor.rx.Streams;
public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest {
public static <V> Iterable<V> sync(RLexSortedSetReactive list) {
return (Iterable<V>) Streams.create(list.iterator()).toList().poll();
}
@Test
public void testAddAllReactive() {
RLexSortedSetReactive list = redisson.getLexSortedSet("set");
@ -44,9 +38,9 @@ public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest {
Assert.assertEquals(0, sync(set.removeRangeTail("z", false)).intValue());
Assert.assertEquals(4, sync(set.removeRangeTail("c", false)).intValue());
assertThat(sync(set)).containsExactly("a", "b", "c");
assertThat(sync((RCollectionReactive)set)).containsExactly("a", "b", "c");
Assert.assertEquals(1, sync(set.removeRangeTail("c", true)).intValue());
assertThat(sync(set)).containsExactly("a", "b");
assertThat(sync((RCollectionReactive)set)).containsExactly("a", "b");
}
@ -62,9 +56,9 @@ public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest {
sync(set.add("g"));
Assert.assertEquals(2, sync(set.removeRangeHead("c", false)).intValue());
assertThat(sync(set)).containsExactly("c", "d", "e", "f", "g");
assertThat(sync((RCollectionReactive)set)).containsExactly("c", "d", "e", "f", "g");
Assert.assertEquals(1, (int)sync(set.removeRangeHead("c", true)));
assertThat(sync(set)).containsExactly("d", "e", "f", "g");
assertThat(sync((RCollectionReactive)set)).containsExactly("d", "e", "f", "g");
}
@Test
@ -79,7 +73,7 @@ public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest {
sync(set.add("g"));
Assert.assertEquals(5, sync(set.removeRange("aaa", true, "g", false)).intValue());
assertThat(sync(set)).containsExactly("a", "g");
assertThat(sync((RCollectionReactive)set)).containsExactly("a", "g");
}

@ -1,17 +1,18 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import static org.assertj.core.api.Assertions.*;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.api.RListReactive;
import org.redisson.client.RedisException;
import reactor.rx.Promise;
import reactor.core.publisher.BaseSubscriber;
public class RedissonListReactiveTest extends BaseReactiveTest {
@ -73,25 +74,25 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
public void testAddAllWithIndex() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");
final CountDownLatch latch = new CountDownLatch(1);
list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new Promise<Integer>() {
list.addAll(Arrays.asList(1L, 2L, 3L)).subscribe(new BaseSubscriber<Integer>() {
@Override
public void onNext(Integer element) {
list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new Promise<Integer>() {
public void hookOnNext(Integer element) {
list.addAll(Arrays.asList(1L, 24L, 3L)).subscribe(new BaseSubscriber<Integer>() {
@Override
public void onNext(Integer value) {
public void hookOnNext(Integer value) {
latch.countDown();
}
@Override
public void onError(Throwable error) {
public void hookOnError(Throwable error) {
Assert.fail(error.getMessage());
}
});
}
@Override
public void onError(Throwable error) {
public void hookOnError(Throwable error) {
Assert.fail(error.getMessage());
}
});
@ -105,24 +106,24 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
public void testAdd() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");
final CountDownLatch latch = new CountDownLatch(1);
list.add(1L).subscribe(new Promise<Integer>() {
list.add(1L).subscribe(new BaseSubscriber<Integer>() {
@Override
public void onNext(Integer value) {
list.add(2L).subscribe(new Promise<Integer>() {
public void hookOnNext(Integer value) {
list.add(2L).subscribe(new BaseSubscriber<Integer>() {
@Override
public void onNext(Integer value) {
public void hookOnNext(Integer value) {
latch.countDown();
}
@Override
public void onError(Throwable error) {
public void hookOnError(Throwable error) {
Assert.fail(error.getMessage());
}
});
}
@Override
public void onError(Throwable error) {
public void hookOnError(Throwable error) {
Assert.fail(error.getMessage());
}
});

@ -10,6 +10,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
@ -120,6 +121,32 @@ public class RedissonMapReactiveTest extends BaseReactiveTest {
}
@Test
public void testIteratorSequence() throws InterruptedException {
RMapReactive<Long, Long> map = redisson.getMap("map");
for (int i = 0; i < 1000; i++) {
sync(map.put(Long.valueOf(i), Long.valueOf(i)));
}
Map<Long, Long> setCopy = new HashMap<>();
for (int i = 0; i < 1000; i++) {
setCopy.put(Long.valueOf(i), Long.valueOf(i));
}
checkIterator(map, setCopy);
}
private <K, V> void checkIterator(RMapReactive<K, V> set, Map<K, V> setCopy) {
for (Iterator<Entry<K, V>> iterator = toIterator(set.entryIterator()); iterator.hasNext();) {
Entry<K, V> entry = iterator.next();
if (!setCopy.remove(entry.getKey(), entry.getValue())) {
Assert.fail();
}
}
Assert.assertEquals(0, setCopy.size());
}
@Test
public void testAddAndGet() throws InterruptedException {
RMapReactive<Integer, Integer> map = redisson.getMap("getAll");

Loading…
Cancel
Save