RCollectionReactive.addAll from Publisher source added. #210

pull/337/head
Nikita 9 years ago
parent f4424a39e0
commit e4a325824b

@ -37,6 +37,8 @@ public interface RCollectionReactive<V> extends RExpirableReactive {
Publisher<Long> add(V e);
Publisher<Long> addAll(Publisher<? extends V> c);
Publisher<Long> addAll(Collection<? extends V> c);
}

@ -0,0 +1,94 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.action.support.DefaultSubscriber;
abstract class RedissonCollectionReactive<V> extends RedissonExpirableReactive {
RedissonCollectionReactive(CommandReactiveExecutor connectionManager, String name) {
super(connectionManager, name);
}
RedissonCollectionReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) {
super(codec, connectionManager, name);
}
protected Publisher<Long> addAll(Publisher<? extends V> c, final Function<V, Publisher<Long>> function, final BiFunction<Long, Long, Long> sizeFunc) {
final Promise<Long> promise = Promises.prepare();
c.subscribe(new DefaultSubscriber<V>() {
Subscription s;
Long lastSize = 0L;
V lastValue;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(1);
}
@Override
public void onNext(V o) {
lastValue = o;
function.apply(o).subscribe(new DefaultSubscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onError(Throwable t) {
promise.onError(t);
}
@Override
public void onNext(Long o) {
lastSize = sizeFunc.apply(lastSize, o);
}
@Override
public void onComplete() {
lastValue = null;
s.request(1);
}
});
}
@Override
public void onComplete() {
if (lastValue == null) {
promise.onNext(lastSize);
}
}
});
return promise;
}
}

@ -25,12 +25,30 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.BiFunction;
import reactor.fn.Function;
public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactive<String> implements RLexSortedSetReactive {
public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(StringCodec.INSTANCE, commandExecutor, name);
}
@Override
public Publisher<Long> addAll(Publisher<? extends String> c) {
return addAll(c, new Function<String, Publisher<Long>>() {
@Override
public Publisher<Long> apply(String o) {
return add(o);
}
}, new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long left, Long right) {
return left + right;
}
});
}
@Override
public Publisher<Integer> removeRangeHeadByLex(String toElement, boolean toInclusive) {
String toValue = value(toElement, toInclusive);

@ -41,6 +41,8 @@ import org.redisson.client.protocol.convertor.IntegerReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.command.CommandReactiveExecutor;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
@ -51,7 +53,7 @@ import reactor.rx.subscription.ReactiveSubscription;
*
* @param <V> the type of elements held in this collection
*/
public class RedissonListReactive<V> extends RedissonExpirableReactive implements RListReactive<V> {
public class RedissonListReactive<V> extends RedissonCollectionReactive<V> implements RListReactive<V> {
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
@ -170,6 +172,21 @@ public class RedissonListReactive<V> extends RedissonExpirableReactive implement
Collections.<Object>singletonList(getName()), c.toArray());
}
@Override
public Publisher<Long> addAll(Publisher<? extends V> c) {
return addAll(c, new Function<V, Publisher<Long>>() {
@Override
public Publisher<Long> apply(V o) {
return add(o);
}
}, new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long left, Long right) {
return right;
}
});
}
@Override
public Publisher<Long> addAll(Collection<? extends V> c) {
if (c.isEmpty()) {

@ -15,7 +15,6 @@
*/
package org.redisson.reactive;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@ -27,7 +26,6 @@ import org.redisson.command.CommandReactiveExecutor;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.broadcast.Broadcaster;
/**
* Base Redisson object

@ -17,6 +17,7 @@ package org.redisson.reactive;
import java.math.BigDecimal;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -34,10 +35,11 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactive implements RScoredSortedSetReactive<V> {
public class RedissonScoredSortedSetReactive<V> extends RedissonCollectionReactive<V> implements RScoredSortedSetReactive<V> {
public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
@ -112,22 +114,49 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
t.onSubscribe(new SubscriberBarrier<V, V>(t) {
private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
private List<V> prevValues = new ArrayList<V>();
@Override
protected void onRequest(final long n) {
protected void doRequest(long n) {
currentIndex = n;
if (!prevValues.isEmpty()) {
List<V> vals = new ArrayList<V>(prevValues);
prevValues.clear();
handle(vals);
if (currentIndex == 0) {
return;
}
}
nextValues();
}
private void handle(List<V> vals) {
for (V val : vals) {
if (currentIndex > 0) {
onNext(val);
} else {
prevValues.add(val);
}
currentIndex--;
if (currentIndex == 0) {
onComplete();
}
}
}
protected void nextValues() {
final ReactiveSubscription<V> m = this;
final SubscriberBarrier<V, V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
@Override
@ -152,14 +181,13 @@ public class RedissonScoredSortedSetReactive<V> extends RedissonExpirableReactiv
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (V val : res.getValues()) {
m.onNext(val);
currentIndex--;
if (currentIndex == 0) {
m.onComplete();
return;
}
handle(res.getValues());
if (currentIndex == 0) {
return;
}
if (nextIterPos == -1) {
m.onComplete();
currentIndex = 0;

@ -32,8 +32,10 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandReactiveExecutor;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;
/**
* Distributed and concurrent implementation of {@link java.util.Set}
@ -42,7 +44,7 @@ import reactor.rx.subscription.ReactiveSubscription;
*
* @param <V> value
*/
public class RedissonSetReactive<V> extends RedissonExpirableReactive implements RSetReactive<V> {
public class RedissonSetReactive<V> extends RedissonCollectionReactive<V> implements RSetReactive<V> {
private static final RedisCommand<Boolean> EVAL_OBJECTS = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 4);
@ -54,6 +56,21 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
super(codec, commandExecutor, name);
}
@Override
public Publisher<Long> addAll(Publisher<? extends V> c) {
return addAll(c, new Function<V, Publisher<Long>>() {
@Override
public Publisher<Long> apply(V o) {
return add(o);
}
}, new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long left, Long right) {
return left + right;
}
});
}
@Override
public Publisher<Long> size() {
return commandExecutor.readReactive(getName(), codec, RedisCommands.SCARD, getName());
@ -148,22 +165,49 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {
t.onSubscribe(new SubscriberBarrier<V, V>(t) {
private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;
private long currentIndex;
private List<V> prevValues = new ArrayList<V>();
@Override
protected void onRequest(final long n) {
protected void doRequest(long n) {
currentIndex = n;
if (!prevValues.isEmpty()) {
List<V> vals = new ArrayList<V>(prevValues);
prevValues.clear();
handle(vals);
if (currentIndex == 0) {
return;
}
}
nextValues();
}
private void handle(List<V> vals) {
for (V val : vals) {
if (currentIndex > 0) {
onNext(val);
} else {
prevValues.add(val);
}
currentIndex--;
if (currentIndex == 0) {
onComplete();
}
}
}
protected void nextValues() {
final ReactiveSubscription<V> m = this;
final SubscriberBarrier<V, V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {
@Override
@ -188,14 +232,13 @@ public class RedissonSetReactive<V> extends RedissonExpirableReactive implements
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (V val : res.getValues()) {
m.onNext(val);
currentIndex--;
if (currentIndex == 0) {
m.onComplete();
return;
}
handle(res.getValues());
if (currentIndex == 0) {
return;
}
if (nextIterPos == -1) {
m.onComplete();
currentIndex = 0;

@ -12,6 +12,7 @@ import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RedissonReactiveClient;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
public abstract class BaseReactiveTest {
@ -41,15 +42,18 @@ public abstract class BaseReactiveTest {
}
public <V> V sync(Publisher<V> ob) {
Promise<List<V>> promise = Streams.create(ob).toList();
List<V> t = promise.poll();
Promise<V> promise;
if (Promise.class.isAssignableFrom(ob.getClass())) {
promise = (Promise<V>) ob;
} else {
promise = Streams.wrap(ob).next();
}
V val = promise.poll();
if (promise.isError()) {
throw new RuntimeException(promise.reason());
}
if (t == null) {
return null;
}
return t.iterator().next();
return val;
}
public static Config createConfig() {

@ -8,6 +8,20 @@ import org.redisson.api.RLexSortedSetReactive;
public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest {
@Test
public void testAddAllReactive() {
RLexSortedSetReactive list = redisson.getLexSortedSet("set");
Assert.assertTrue(sync(list.add("1")) == 1);
Assert.assertTrue(sync(list.add("2")) == 1);
Assert.assertTrue(sync(list.add("3")) == 1);
Assert.assertTrue(sync(list.add("4")) == 1);
Assert.assertTrue(sync(list.add("5")) == 1);
RLexSortedSetReactive list2 = redisson.getLexSortedSet("set2");
Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue());
Assert.assertEquals(5, sync(list2.size()).intValue());
}
@Test
public void testRemoveLexRangeTail() {
RLexSortedSetReactive set = redisson.getLexSortedSet("simple");

@ -25,6 +25,20 @@ public class RedissonListReactiveTest extends BaseReactiveTest {
MatcherAssert.assertThat(sync(test2), Matchers.contains("bar", "foo"));
}
@Test
public void testAddAllReactive() {
RListReactive<Integer> list = redisson.getList("list");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
RListReactive<Integer> list2 = redisson.getList("list2");
Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue());
Assert.assertEquals(5, sync(list2.size()).intValue());
}
@Test
public void testAddAllWithIndex() throws InterruptedException {
final RListReactive<Long> list = redisson.getList("list");

@ -30,6 +30,20 @@ public class RedissonSetReactiveTest extends BaseReactiveTest {
}
@Test
public void testAddAllReactive() {
RSetReactive<Integer> list = redisson.getSet("set");
sync(list.add(1));
sync(list.add(2));
sync(list.add(3));
sync(list.add(4));
sync(list.add(5));
RSetReactive<Integer> list2 = redisson.getSet("set2");
Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue());
Assert.assertEquals(5, sync(list2.size()).intValue());
}
@Test
public void testRemoveRandom() {
RSetReactive<Integer> set = redisson.getSet("simple");

Loading…
Cancel
Save