From a3fec2c062867c6a2a5a5ca778105a43b928b047 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 5 Nov 2018 17:11:19 +0300 Subject: [PATCH] Feature - takeFirstElements and takeLastElements methods added to RBlockingDequeRx --- .../main/java/org/redisson/RedissonRx.java | 5 +- .../org/redisson/api/RBlockingDequeRx.java | 14 ++ .../redisson/api/RBlockingQueueReactive.java | 12 +- .../org/redisson/api/RBlockingQueueRx.java | 12 + .../redisson/rx/RedissonBlockingDequeRx.java | 58 +++++ .../redisson/rx/RedissonBlockingQueueRx.java | 30 ++- .../java/org/redisson/rx/RedissonListRx.java | 6 +- .../rx/RedissonBlockingDequeRxTest.java | 213 ++++++++++++++++++ 8 files changed, 336 insertions(+), 14 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java create mode 100644 redisson/src/test/java/org/redisson/rx/RedissonBlockingDequeRxTest.java diff --git a/redisson/src/main/java/org/redisson/RedissonRx.java b/redisson/src/main/java/org/redisson/RedissonRx.java index d353fe964..53938b1fc 100644 --- a/redisson/src/main/java/org/redisson/RedissonRx.java +++ b/redisson/src/main/java/org/redisson/RedissonRx.java @@ -65,6 +65,7 @@ import org.redisson.pubsub.SemaphorePubSub; import org.redisson.rx.CommandRxExecutor; import org.redisson.rx.CommandRxService; import org.redisson.rx.RedissonBatchRx; +import org.redisson.rx.RedissonBlockingDequeRx; import org.redisson.rx.RedissonBlockingQueueRx; import org.redisson.rx.RedissonKeysRx; import org.redisson.rx.RedissonLexSortedSetRx; @@ -479,14 +480,14 @@ public class RedissonRx implements RedissonRxClient { public RBlockingDequeRx getBlockingDeque(String name) { RedissonBlockingDeque deque = new RedissonBlockingDeque(commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, deque, - new RedissonListRx(deque), RBlockingDequeRx.class); + new RedissonBlockingDequeRx(deque), RBlockingDequeRx.class); } @Override public RBlockingDequeRx getBlockingDeque(String name, Codec codec) { RedissonBlockingDeque deque = new RedissonBlockingDeque(codec, commandExecutor, name, null); return RxProxyBuilder.create(commandExecutor, deque, - new RedissonListRx(deque), RBlockingDequeRx.class); + new RedissonBlockingDequeRx(deque), RBlockingDequeRx.class); } } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingDequeRx.java b/redisson/src/main/java/org/redisson/api/RBlockingDequeRx.java index 76b75ec9a..73b9d58a0 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingDequeRx.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingDequeRx.java @@ -111,4 +111,18 @@ public interface RBlockingDequeRx extends RDequeRx, RBlockingQueueRx { */ Flowable takeFirst(); + /** + * Retrieves and removes stream of elements from the head of this queue. Waits for an element become available. + * + * @return the head element of this queue + */ + Flowable takeFirstElements(); + + /** + * Retrieves and removes stream of elements from the tail of this queue. Waits for an element become available. + * + * @return the head element of this queue + */ + Flowable takeLastElements(); + } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java index ccad9c2e7..b261765f5 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java @@ -16,7 +16,6 @@ package org.redisson.api; import java.util.Collection; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; @@ -121,6 +120,17 @@ public interface RBlockingQueueReactive extends RQueueReactive { * specified waiting time elapses before an element is available */ Publisher poll(long timeout, TimeUnit unit); + + /** + * Retrieves and removes last available tail element of any queue and adds it at the head of queueName, + * waiting if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueName - names of destination queue + * @return the tail of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + */ + Publisher takeLastAndOfferFirstTo(String queueName); /** * Retrieves and removes the head of this queue in async mode, waiting if necessary diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java index 6055779b0..f23fac4b2 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueRx.java @@ -128,6 +128,17 @@ public interface RBlockingQueueRx extends RQueueRx { * @return the head of this queue */ Flowable take(); + + /** + * Retrieves and removes last available tail element of any queue and adds it at the head of queueName, + * waiting if necessary for an element to become available + * in any of defined queues including queue itself. + * + * @param queueName - names of destination queue + * @return the tail of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + */ + Flowable takeLastAndOfferFirstTo(String queueName); /** * Inserts the specified element into this queue in async mode, waiting if necessary @@ -145,6 +156,7 @@ public interface RBlockingQueueRx extends RQueueRx { /** * Retrieves and removes stream of elements from the head of this queue. + * Waits for an element become available. * * @return stream of messages */ diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java new file mode 100644 index 000000000..d66285b17 --- /dev/null +++ b/redisson/src/main/java/org/redisson/rx/RedissonBlockingDequeRx.java @@ -0,0 +1,58 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.rx; + +import java.util.concurrent.Callable; + +import org.redisson.RedissonBlockingDeque; +import org.redisson.api.RFuture; + +import io.reactivex.Flowable; + +/** + * + * @author Nikita Koksharov + * + * @param - value type + */ +public class RedissonBlockingDequeRx extends RedissonBlockingQueueRx { + + private final RedissonBlockingDeque queue; + + public RedissonBlockingDequeRx(RedissonBlockingDeque queue) { + super(queue); + this.queue = queue; + } + + public Flowable takeFirstElements() { + return takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return queue.takeFirstAsync(); + } + }); + } + + public Flowable takeLastElements() { + return takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return queue.takeLastAsync(); + } + }); + } + +} diff --git a/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java b/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java index 0e3a8416a..82aaa11d8 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonBlockingQueueRx.java @@ -15,11 +15,13 @@ */ package org.redisson.rx; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.RedissonBlockingQueue; +import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RFuture; +import org.redisson.api.RListAsync; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -36,21 +38,32 @@ import io.reactivex.processors.ReplayProcessor; */ public class RedissonBlockingQueueRx extends RedissonListRx { - private final RedissonBlockingQueue queue; + private final RBlockingQueueAsync queue; - public RedissonBlockingQueueRx(RedissonBlockingQueue queue) { - super(queue); + public RedissonBlockingQueueRx(RBlockingQueueAsync queue) { + super((RListAsync) queue); this.queue = queue; } public Flowable takeElements() { + return takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return queue.takeAsync(); + } + }); + } + + protected final Flowable takeElements(final Callable> callable) { final ReplayProcessor p = ReplayProcessor.create(); return p.doOnRequest(new LongConsumer() { @Override public void accept(long n) throws Exception { final AtomicLong counter = new AtomicLong(n); final AtomicReference> futureRef = new AtomicReference>(); - take(p, counter, futureRef); + + take(callable, p, counter, futureRef); + p.doOnCancel(new Action() { @Override public void run() throws Exception { @@ -61,8 +74,8 @@ public class RedissonBlockingQueueRx extends RedissonListRx { }); } - private void take(final ReplayProcessor p, final AtomicLong counter, final AtomicReference> futureRef) { - RFuture future = queue.takeAsync(); + private void take(final Callable> factory, final ReplayProcessor p, final AtomicLong counter, final AtomicReference> futureRef) throws Exception { + RFuture future = factory.call(); futureRef.set(future); future.addListener(new FutureListener() { @Override @@ -77,8 +90,9 @@ public class RedissonBlockingQueueRx extends RedissonListRx { p.onComplete(); } - take(p, counter, futureRef); + take(factory, p, counter, futureRef); } }); } + } diff --git a/redisson/src/main/java/org/redisson/rx/RedissonListRx.java b/redisson/src/main/java/org/redisson/rx/RedissonListRx.java index 24b8d2004..de5fa5a35 100644 --- a/redisson/src/main/java/org/redisson/rx/RedissonListRx.java +++ b/redisson/src/main/java/org/redisson/rx/RedissonListRx.java @@ -16,8 +16,8 @@ package org.redisson.rx; import org.reactivestreams.Publisher; -import org.redisson.RedissonList; import org.redisson.api.RFuture; +import org.redisson.api.RListAsync; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -33,9 +33,9 @@ import io.reactivex.processors.ReplayProcessor; */ public class RedissonListRx { - private final RedissonList instance; + private final RListAsync instance; - public RedissonListRx(RedissonList instance) { + public RedissonListRx(RListAsync instance) { this.instance = instance; } diff --git a/redisson/src/test/java/org/redisson/rx/RedissonBlockingDequeRxTest.java b/redisson/src/test/java/org/redisson/rx/RedissonBlockingDequeRxTest.java new file mode 100644 index 000000000..62697f7a9 --- /dev/null +++ b/redisson/src/test/java/org/redisson/rx/RedissonBlockingDequeRxTest.java @@ -0,0 +1,213 @@ +package org.redisson.rx; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.api.RBlockingDequeRx; + +public class RedissonBlockingDequeRxTest extends BaseRxTest { + + @Test + public void testTakeFirstElements() { + RBlockingDequeRx queue = redisson.getBlockingDeque("test"); + List elements = new ArrayList<>(); + queue.takeFirstElements().subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(4); + } + + @Override + public void onNext(Integer t) { + elements.add(t); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + for (int i = 0; i < 10; i++) { + sync(queue.add(i)); + } + + assertThat(elements).containsExactly(0, 1, 2, 3); + } + + @Test + public void testPollLastAndOfferFirstTo() throws InterruptedException { + RBlockingDequeRx blockingDeque = redisson.getBlockingDeque("blocking_deque"); + long start = System.currentTimeMillis(); + String redisTask = sync(blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS)); + assertThat(System.currentTimeMillis() - start).isBetween(950L, 1100L); + assertThat(redisTask).isNull(); + } + + @Test(timeout = 3000) + public void testShortPoll() throws InterruptedException { + RBlockingDequeRx queue = redisson.getBlockingDeque("queue:pollany"); + sync(queue.pollLast(500, TimeUnit.MILLISECONDS)); + sync(queue.pollFirst(10, TimeUnit.MICROSECONDS)); + } + + @Test + public void testPollLastFromAny() throws InterruptedException { + final RBlockingDequeRx queue1 = redisson.getBlockingDeque("deque:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() { + @Override + public void run() { + RBlockingDequeRx queue2 = redisson.getBlockingDeque("deque:pollany1"); + RBlockingDequeRx queue3 = redisson.getBlockingDeque("deque:pollany2"); + sync(queue3.put(2)); + sync(queue1.put(1)); + sync(queue2.put(3)); + } + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = sync(queue1.pollLastFromAny(4, TimeUnit.SECONDS, "deque:pollany1", "deque:pollany2")); + + assertThat(l).isEqualTo(2); + assertThat(System.currentTimeMillis() - s).isGreaterThan(2000); + } + + @Test + public void testFirstLast() throws InterruptedException { + RBlockingDequeRx deque = redisson.getBlockingDeque("deque"); + sync(deque.putFirst(1)); + sync(deque.putFirst(2)); + sync(deque.putLast(3)); + sync(deque.putLast(4)); + + assertThat(sync(deque)).containsExactly(2, 1, 3, 4); + } + + @Test + public void testOfferFirstLast() throws InterruptedException { + RBlockingDequeRx deque = redisson.getBlockingDeque("deque"); + sync(deque.offerFirst(1)); + sync(deque.offerFirst(2)); + sync(deque.offerLast(3)); + sync(deque.offerLast(4)); + + assertThat(sync(deque)).containsExactly(2, 1, 3, 4); + } + + @Test + public void testTakeFirst() throws InterruptedException { + RBlockingDequeRx deque = redisson.getBlockingDeque("queue:take"); + + sync(deque.offerFirst(1)); + sync(deque.offerFirst(2)); + sync(deque.offerLast(3)); + sync(deque.offerLast(4)); + + assertThat(sync(deque.takeFirst())).isEqualTo(2); + assertThat(sync(deque.takeFirst())).isEqualTo(1); + assertThat(sync(deque.takeFirst())).isEqualTo(3); + assertThat(sync(deque.takeFirst())).isEqualTo(4); + assertThat(sync(deque.size())).isZero(); + } + + @Test + public void testTakeLast() throws InterruptedException { + RBlockingDequeRx deque = redisson.getBlockingDeque("queue:take"); + + sync(deque.offerFirst(1)); + sync(deque.offerFirst(2)); + sync(deque.offerLast(3)); + sync(deque.offerLast(4)); + + assertThat(sync(deque.takeLast())).isEqualTo(4); + assertThat(sync(deque.takeLast())).isEqualTo(3); + assertThat(sync(deque.takeLast())).isEqualTo(1); + assertThat(sync(deque.takeLast())).isEqualTo(2); + assertThat(sync(deque.size())).isZero(); + } + + + @Test + public void testTakeFirstAwait() throws InterruptedException { + RBlockingDequeRx deque = redisson.getBlockingDeque("queue:take"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingDequeRx deque1 = redisson.getBlockingDeque("queue:take"); + sync(deque1.putFirst(1)); + sync(deque1.putFirst(2)); + sync(deque1.putLast(3)); + sync(deque1.putLast(4)); + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + assertThat(sync(deque.takeFirst())).isEqualTo(1); + assertThat(System.currentTimeMillis() - s).isGreaterThan(9000); + Thread.sleep(50); + assertThat(sync(deque.takeFirst())).isEqualTo(2); + assertThat(sync(deque.takeFirst())).isEqualTo(3); + assertThat(sync(deque.takeFirst())).isEqualTo(4); + } + + @Test + public void testTakeLastAwait() throws InterruptedException { + RBlockingDequeRx deque = redisson.getBlockingDeque("queue:take"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingDequeRx deque1 = redisson.getBlockingDeque("queue:take"); + sync(deque1.putFirst(1)); + sync(deque1.putFirst(2)); + sync(deque1.putLast(3)); + sync(deque1.putLast(4)); + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + assertThat(sync(deque.takeLast())).isEqualTo(1); + assertThat(System.currentTimeMillis() - s).isGreaterThan(9000); + Thread.sleep(50); + assertThat(sync(deque.takeLast())).isEqualTo(4); + assertThat(sync(deque.takeLast())).isEqualTo(3); + assertThat(sync(deque.takeLast())).isEqualTo(2); + } + + @Test + public void testPollFirst() throws InterruptedException { + RBlockingDequeRx queue1 = redisson.getBlockingDeque("queue1"); + sync(queue1.put(1)); + sync(queue1.put(2)); + sync(queue1.put(3)); + + assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(1); + assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(2); + assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(3); + + long s = System.currentTimeMillis(); + assertThat(sync(queue1.pollFirst(5, TimeUnit.SECONDS))).isNull(); + assertThat(System.currentTimeMillis() - s).isGreaterThan(5000); + } + + @Test + public void testPollLast() throws InterruptedException { + RBlockingDequeRx queue1 = redisson.getBlockingDeque("queue1"); + sync(queue1.putLast(1)); + sync(queue1.putLast(2)); + sync(queue1.putLast(3)); + + assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(3); + assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(2); + assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(1); + + long s = System.currentTimeMillis(); + assertThat(sync(queue1.pollLast(5, TimeUnit.SECONDS))).isNull(); + assertThat(System.currentTimeMillis() - s).isGreaterThan(5000); + } + +}