From e4a325824bc3ddde8687a8e867b0447384ac48f3 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 2 Dec 2015 17:21:42 +0300 Subject: [PATCH] RCollectionReactive.addAll from Publisher source added. #210 --- .../org/redisson/api/RCollectionReactive.java | 2 + .../reactive/RedissonCollectionReactive.java | 94 +++++++++++++++++++ .../RedissonLexSortedSetReactive.java | 18 ++++ .../reactive/RedissonListReactive.java | 19 +++- .../reactive/RedissonObjectReactive.java | 2 - .../RedissonScoredSortedSetReactive.java | 50 +++++++--- .../reactive/RedissonSetReactive.java | 67 ++++++++++--- .../java/org/redisson/BaseReactiveTest.java | 16 ++-- .../RedissonLexSortedSetReactiveTest.java | 14 +++ .../redisson/RedissonListReactiveTest.java | 14 +++ .../org/redisson/RedissonSetReactiveTest.java | 14 +++ 11 files changed, 278 insertions(+), 32 deletions(-) create mode 100644 src/main/java/org/redisson/reactive/RedissonCollectionReactive.java diff --git a/src/main/java/org/redisson/api/RCollectionReactive.java b/src/main/java/org/redisson/api/RCollectionReactive.java index a17a41d5e..bf1f49c57 100644 --- a/src/main/java/org/redisson/api/RCollectionReactive.java +++ b/src/main/java/org/redisson/api/RCollectionReactive.java @@ -37,6 +37,8 @@ public interface RCollectionReactive extends RExpirableReactive { Publisher add(V e); + Publisher addAll(Publisher c); + Publisher addAll(Collection c); } diff --git a/src/main/java/org/redisson/reactive/RedissonCollectionReactive.java b/src/main/java/org/redisson/reactive/RedissonCollectionReactive.java new file mode 100644 index 000000000..38deade2b --- /dev/null +++ b/src/main/java/org/redisson/reactive/RedissonCollectionReactive.java @@ -0,0 +1,94 @@ +/** + * Copyright 2014 Nikita Koksharov, Nickolay Borbit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.redisson.client.codec.Codec; +import org.redisson.command.CommandReactiveExecutor; + +import reactor.fn.BiFunction; +import reactor.fn.Function; +import reactor.rx.Promise; +import reactor.rx.Promises; +import reactor.rx.action.support.DefaultSubscriber; + + +abstract class RedissonCollectionReactive extends RedissonExpirableReactive { + + RedissonCollectionReactive(CommandReactiveExecutor connectionManager, String name) { + super(connectionManager, name); + } + + RedissonCollectionReactive(Codec codec, CommandReactiveExecutor connectionManager, String name) { + super(codec, connectionManager, name); + } + + protected Publisher addAll(Publisher c, final Function> function, final BiFunction sizeFunc) { + final Promise promise = Promises.prepare(); + + c.subscribe(new DefaultSubscriber() { + + Subscription s; + Long lastSize = 0L; + V lastValue; + + @Override + public void onSubscribe(Subscription s) { + this.s = s; + s.request(1); + } + + @Override + public void onNext(V o) { + lastValue = o; + function.apply(o).subscribe(new DefaultSubscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onError(Throwable t) { + promise.onError(t); + } + + @Override + public void onNext(Long o) { + lastSize = sizeFunc.apply(lastSize, o); + } + + @Override + public void onComplete() { + lastValue = null; + s.request(1); + } + }); + } + + @Override + public void onComplete() { + if (lastValue == null) { + promise.onNext(lastSize); + } + } + }); + + return promise; + } + +} diff --git a/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java b/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java index fd1119ba1..d7abda02c 100644 --- a/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonLexSortedSetReactive.java @@ -25,12 +25,30 @@ import org.redisson.client.codec.StringCodec; import org.redisson.client.protocol.RedisCommands; import org.redisson.command.CommandReactiveExecutor; +import reactor.fn.BiFunction; +import reactor.fn.Function; + public class RedissonLexSortedSetReactive extends RedissonScoredSortedSetReactive implements RLexSortedSetReactive { public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) { super(StringCodec.INSTANCE, commandExecutor, name); } + @Override + public Publisher addAll(Publisher c) { + return addAll(c, new Function>() { + @Override + public Publisher apply(String o) { + return add(o); + } + }, new BiFunction() { + @Override + public Long apply(Long left, Long right) { + return left + right; + } + }); + } + @Override public Publisher removeRangeHeadByLex(String toElement, boolean toInclusive) { String toValue = value(toElement, toInclusive); diff --git a/src/main/java/org/redisson/reactive/RedissonListReactive.java b/src/main/java/org/redisson/reactive/RedissonListReactive.java index afe5d5fa7..7bc64c8b6 100644 --- a/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -41,6 +41,8 @@ import org.redisson.client.protocol.convertor.IntegerReplayConvertor; import org.redisson.client.protocol.convertor.LongReplayConvertor; import org.redisson.command.CommandReactiveExecutor; +import reactor.fn.BiFunction; +import reactor.fn.Function; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; @@ -51,7 +53,7 @@ import reactor.rx.subscription.ReactiveSubscription; * * @param the type of elements held in this collection */ -public class RedissonListReactive extends RedissonExpirableReactive implements RListReactive { +public class RedissonListReactive extends RedissonCollectionReactive implements RListReactive { public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); @@ -170,6 +172,21 @@ public class RedissonListReactive extends RedissonExpirableReactive implement Collections.singletonList(getName()), c.toArray()); } + @Override + public Publisher addAll(Publisher c) { + return addAll(c, new Function>() { + @Override + public Publisher apply(V o) { + return add(o); + } + }, new BiFunction() { + @Override + public Long apply(Long left, Long right) { + return right; + } + }); + } + @Override public Publisher addAll(Collection c) { if (c.isEmpty()) { diff --git a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java index 9d0b4f345..92e77eaf3 100644 --- a/src/main/java/org/redisson/reactive/RedissonObjectReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonObjectReactive.java @@ -15,7 +15,6 @@ */ package org.redisson.reactive; -import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -27,7 +26,6 @@ import org.redisson.command.CommandReactiveExecutor; import reactor.core.reactivestreams.SubscriberBarrier; import reactor.rx.Stream; import reactor.rx.Streams; -import reactor.rx.broadcast.Broadcaster; /** * Base Redisson object diff --git a/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java b/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java index 2efa598e7..adbefc043 100644 --- a/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonScoredSortedSetReactive.java @@ -17,6 +17,7 @@ package org.redisson.reactive; import java.math.BigDecimal; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -34,10 +35,11 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandReactiveExecutor; +import reactor.core.reactivestreams.SubscriberBarrier; import reactor.rx.Stream; import reactor.rx.subscription.ReactiveSubscription; -public class RedissonScoredSortedSetReactive extends RedissonExpirableReactive implements RScoredSortedSetReactive { +public class RedissonScoredSortedSetReactive extends RedissonCollectionReactive implements RScoredSortedSetReactive { public RedissonScoredSortedSetReactive(CommandReactiveExecutor commandExecutor, String name) { super(commandExecutor, name); @@ -112,22 +114,49 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv @Override public void subscribe(final Subscriber t) { - t.onSubscribe(new ReactiveSubscription(this, t) { + t.onSubscribe(new SubscriberBarrier(t) { private List firstValues; private long nextIterPos; private InetSocketAddress client; private long currentIndex; + private List prevValues = new ArrayList(); @Override - protected void onRequest(final long n) { + protected void doRequest(long n) { currentIndex = n; + + if (!prevValues.isEmpty()) { + List vals = new ArrayList(prevValues); + prevValues.clear(); + + handle(vals); + + if (currentIndex == 0) { + return; + } + } + nextValues(); } + private void handle(List vals) { + for (V val : vals) { + if (currentIndex > 0) { + onNext(val); + } else { + prevValues.add(val); + } + currentIndex--; + if (currentIndex == 0) { + onComplete(); + } + } + } + protected void nextValues() { - final ReactiveSubscription m = this; + final SubscriberBarrier m = this; scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { @Override @@ -152,14 +181,13 @@ public class RedissonScoredSortedSetReactive extends RedissonExpirableReactiv if (prevIterPos == nextIterPos) { nextIterPos = -1; } - for (V val : res.getValues()) { - m.onNext(val); - currentIndex--; - if (currentIndex == 0) { - m.onComplete(); - return; - } + + handle(res.getValues()); + + if (currentIndex == 0) { + return; } + if (nextIterPos == -1) { m.onComplete(); currentIndex = 0; diff --git a/src/main/java/org/redisson/reactive/RedissonSetReactive.java b/src/main/java/org/redisson/reactive/RedissonSetReactive.java index 15ccd4fd4..8ab2cfa39 100644 --- a/src/main/java/org/redisson/reactive/RedissonSetReactive.java +++ b/src/main/java/org/redisson/reactive/RedissonSetReactive.java @@ -32,8 +32,10 @@ import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.command.CommandReactiveExecutor; +import reactor.core.reactivestreams.SubscriberBarrier; +import reactor.fn.BiFunction; +import reactor.fn.Function; import reactor.rx.Stream; -import reactor.rx.subscription.ReactiveSubscription; /** * Distributed and concurrent implementation of {@link java.util.Set} @@ -42,7 +44,7 @@ import reactor.rx.subscription.ReactiveSubscription; * * @param value */ -public class RedissonSetReactive extends RedissonExpirableReactive implements RSetReactive { +public class RedissonSetReactive extends RedissonCollectionReactive implements RSetReactive { private static final RedisCommand EVAL_OBJECTS = new RedisCommand("EVAL", new BooleanReplayConvertor(), 4); @@ -54,6 +56,21 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements super(codec, commandExecutor, name); } + @Override + public Publisher addAll(Publisher c) { + return addAll(c, new Function>() { + @Override + public Publisher apply(V o) { + return add(o); + } + }, new BiFunction() { + @Override + public Long apply(Long left, Long right) { + return left + right; + } + }); + } + @Override public Publisher size() { return commandExecutor.readReactive(getName(), codec, RedisCommands.SCARD, getName()); @@ -148,22 +165,49 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements @Override public void subscribe(final Subscriber t) { - t.onSubscribe(new ReactiveSubscription(this, t) { + t.onSubscribe(new SubscriberBarrier(t) { private List firstValues; private long nextIterPos; private InetSocketAddress client; private long currentIndex; + private List prevValues = new ArrayList(); @Override - protected void onRequest(final long n) { + protected void doRequest(long n) { currentIndex = n; + + if (!prevValues.isEmpty()) { + List vals = new ArrayList(prevValues); + prevValues.clear(); + + handle(vals); + + if (currentIndex == 0) { + return; + } + } + nextValues(); } + private void handle(List vals) { + for (V val : vals) { + if (currentIndex > 0) { + onNext(val); + } else { + prevValues.add(val); + } + currentIndex--; + if (currentIndex == 0) { + onComplete(); + } + } + } + protected void nextValues() { - final ReactiveSubscription m = this; + final SubscriberBarrier m = this; scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { @Override @@ -188,14 +232,13 @@ public class RedissonSetReactive extends RedissonExpirableReactive implements if (prevIterPos == nextIterPos) { nextIterPos = -1; } - for (V val : res.getValues()) { - m.onNext(val); - currentIndex--; - if (currentIndex == 0) { - m.onComplete(); - return; - } + + handle(res.getValues()); + + if (currentIndex == 0) { + return; } + if (nextIterPos == -1) { m.onComplete(); currentIndex = 0; diff --git a/src/test/java/org/redisson/BaseReactiveTest.java b/src/test/java/org/redisson/BaseReactiveTest.java index f991a5f07..670112119 100644 --- a/src/test/java/org/redisson/BaseReactiveTest.java +++ b/src/test/java/org/redisson/BaseReactiveTest.java @@ -12,6 +12,7 @@ import org.redisson.api.RScoredSortedSetReactive; import org.redisson.api.RedissonReactiveClient; import reactor.rx.Promise; +import reactor.rx.Stream; import reactor.rx.Streams; public abstract class BaseReactiveTest { @@ -41,15 +42,18 @@ public abstract class BaseReactiveTest { } public V sync(Publisher ob) { - Promise> promise = Streams.create(ob).toList(); - List t = promise.poll(); + Promise promise; + if (Promise.class.isAssignableFrom(ob.getClass())) { + promise = (Promise) ob; + } else { + promise = Streams.wrap(ob).next(); + } + + V val = promise.poll(); if (promise.isError()) { throw new RuntimeException(promise.reason()); } - if (t == null) { - return null; - } - return t.iterator().next(); + return val; } public static Config createConfig() { diff --git a/src/test/java/org/redisson/RedissonLexSortedSetReactiveTest.java b/src/test/java/org/redisson/RedissonLexSortedSetReactiveTest.java index b0de34bf2..4355745b7 100644 --- a/src/test/java/org/redisson/RedissonLexSortedSetReactiveTest.java +++ b/src/test/java/org/redisson/RedissonLexSortedSetReactiveTest.java @@ -8,6 +8,20 @@ import org.redisson.api.RLexSortedSetReactive; public class RedissonLexSortedSetReactiveTest extends BaseReactiveTest { + @Test + public void testAddAllReactive() { + RLexSortedSetReactive list = redisson.getLexSortedSet("set"); + Assert.assertTrue(sync(list.add("1")) == 1); + Assert.assertTrue(sync(list.add("2")) == 1); + Assert.assertTrue(sync(list.add("3")) == 1); + Assert.assertTrue(sync(list.add("4")) == 1); + Assert.assertTrue(sync(list.add("5")) == 1); + + RLexSortedSetReactive list2 = redisson.getLexSortedSet("set2"); + Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue()); + Assert.assertEquals(5, sync(list2.size()).intValue()); + } + @Test public void testRemoveLexRangeTail() { RLexSortedSetReactive set = redisson.getLexSortedSet("simple"); diff --git a/src/test/java/org/redisson/RedissonListReactiveTest.java b/src/test/java/org/redisson/RedissonListReactiveTest.java index 49fd1ddab..76ed7743a 100644 --- a/src/test/java/org/redisson/RedissonListReactiveTest.java +++ b/src/test/java/org/redisson/RedissonListReactiveTest.java @@ -25,6 +25,20 @@ public class RedissonListReactiveTest extends BaseReactiveTest { MatcherAssert.assertThat(sync(test2), Matchers.contains("bar", "foo")); } + @Test + public void testAddAllReactive() { + RListReactive list = redisson.getList("list"); + sync(list.add(1)); + sync(list.add(2)); + sync(list.add(3)); + sync(list.add(4)); + sync(list.add(5)); + + RListReactive list2 = redisson.getList("list2"); + Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue()); + Assert.assertEquals(5, sync(list2.size()).intValue()); + } + @Test public void testAddAllWithIndex() throws InterruptedException { final RListReactive list = redisson.getList("list"); diff --git a/src/test/java/org/redisson/RedissonSetReactiveTest.java b/src/test/java/org/redisson/RedissonSetReactiveTest.java index 8ab461f37..578fe517e 100644 --- a/src/test/java/org/redisson/RedissonSetReactiveTest.java +++ b/src/test/java/org/redisson/RedissonSetReactiveTest.java @@ -30,6 +30,20 @@ public class RedissonSetReactiveTest extends BaseReactiveTest { } + @Test + public void testAddAllReactive() { + RSetReactive list = redisson.getSet("set"); + sync(list.add(1)); + sync(list.add(2)); + sync(list.add(3)); + sync(list.add(4)); + sync(list.add(5)); + + RSetReactive list2 = redisson.getSet("set2"); + Assert.assertEquals(5, sync(list2.addAll(list.iterator())).intValue()); + Assert.assertEquals(5, sync(list2.size()).intValue()); + } + @Test public void testRemoveRandom() { RSetReactive set = redisson.getSet("simple");