diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index a3497ba75..6e8586411 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -69,6 +69,7 @@ import org.redisson.pubsub.SemaphorePubSub; import org.redisson.reactive.CommandReactiveService; import org.redisson.reactive.ReactiveProxyBuilder; import org.redisson.reactive.RedissonBatchReactive; +import org.redisson.reactive.RedissonBlockingDequeReactive; import org.redisson.reactive.RedissonBlockingQueueReactive; import org.redisson.reactive.RedissonKeysReactive; import org.redisson.reactive.RedissonLexSortedSetReactive; @@ -495,13 +496,15 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBlockingDequeReactive getBlockingDeque(String name) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingDeque(commandExecutor, name, null), - new RedissonListReactive(commandExecutor, name), RBlockingDequeReactive.class); + RedissonBlockingDeque deque = new RedissonBlockingDeque(commandExecutor, name, null); + return ReactiveProxyBuilder.create(commandExecutor, deque, + new RedissonBlockingDequeReactive(deque), RBlockingDequeReactive.class); } @Override public RBlockingDequeReactive getBlockingDeque(String name, Codec codec) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingDeque(codec, commandExecutor, name, null), - new RedissonListReactive(codec, commandExecutor, name), RBlockingDequeReactive.class); + RedissonBlockingDeque deque = new RedissonBlockingDeque(codec, commandExecutor, name, null); + return ReactiveProxyBuilder.create(commandExecutor, deque, + new RedissonBlockingDequeReactive(deque), RBlockingDequeReactive.class); } } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingDequeReactive.java b/redisson/src/main/java/org/redisson/api/RBlockingDequeReactive.java index f44b150b5..deaaa48d8 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingDequeReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingDequeReactive.java @@ -17,6 +17,7 @@ package org.redisson.api; import java.util.concurrent.TimeUnit; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -111,4 +112,18 @@ public interface RBlockingDequeReactive extends RDequeReactive, RBlockingQ */ Mono takeFirst(); + /** + * Retrieves and removes stream of elements from the head of this queue. Waits for an element become available. + * + * @return the head element of this queue + */ + Flux takeFirstElements(); + + /** + * Retrieves and removes stream of elements from the tail of this queue. Waits for an element become available. + * + * @return the head element of this queue + */ + Flux takeLastElements(); + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBlockingDequeReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBlockingDequeReactive.java new file mode 100644 index 000000000..0f14e07ee --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBlockingDequeReactive.java @@ -0,0 +1,58 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.concurrent.Callable; + +import org.redisson.RedissonBlockingDeque; +import org.redisson.api.RFuture; + +import reactor.core.publisher.Flux; + +/** + * + * @author Nikita Koksharov + * + * @param - value type + */ +public class RedissonBlockingDequeReactive extends RedissonBlockingQueueReactive { + + private final RedissonBlockingDeque queue; + + public RedissonBlockingDequeReactive(RedissonBlockingDeque queue) { + super(queue); + this.queue = queue; + } + + public Flux takeFirstElements() { + return takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return queue.takeFirstAsync(); + } + }); + } + + public Flux takeLastElements() { + return takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return queue.takeLastAsync(); + } + }); + } + +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java index 716d1e22b..a40c26e52 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java @@ -15,11 +15,13 @@ */ package org.redisson.reactive; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.redisson.RedissonBlockingQueue; +import org.redisson.api.RBlockingQueue; import org.redisson.api.RFuture; +import org.redisson.api.RListAsync; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -34,28 +36,21 @@ import reactor.core.publisher.FluxSink; */ public class RedissonBlockingQueueReactive extends RedissonListReactive { - private final RedissonBlockingQueue queue; + private final RBlockingQueue queue; - public RedissonBlockingQueueReactive(RedissonBlockingQueue queue) { - super(queue); + public RedissonBlockingQueueReactive(RBlockingQueue queue) { + super((RListAsync)queue); this.queue = queue; } - public Flux takeElements() { - return Flux.create(emitter -> { - emitter.onRequest(n -> { - AtomicLong counter = new AtomicLong(n); - AtomicReference> futureRef = new AtomicReference>(); - take(emitter, counter, futureRef); - emitter.onDispose(() -> { - futureRef.get().cancel(true); - }); - }); - }); - } - - private void take(final FluxSink emitter, final AtomicLong counter, final AtomicReference> futureRef) { - RFuture future = queue.takeAsync(); + private void take(final Callable> factory, final FluxSink emitter, final AtomicLong counter, final AtomicReference> futureRef) { + RFuture future; + try { + future = factory.call(); + } catch (Exception e) { + emitter.error(e); + return; + } futureRef.set(future); future.addListener(new FutureListener() { @Override @@ -70,8 +65,31 @@ public class RedissonBlockingQueueReactive extends RedissonListReactive { emitter.complete(); } - take(emitter, counter, futureRef); + take(factory, emitter, counter, futureRef); + } + }); + } + + public Flux takeElements() { + return takeElements(new Callable>() { + @Override + public RFuture call() throws Exception { + return queue.takeAsync(); } }); } + + protected final Flux takeElements(Callable> callable) { + return Flux.create(emitter -> { + emitter.onRequest(n -> { + AtomicLong counter = new AtomicLong(n); + AtomicReference> futureRef = new AtomicReference>(); + take(callable, emitter, counter, futureRef); + emitter.onDispose(() -> { + futureRef.get().cancel(true); + }); + }); + }); + } + } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index f666e8194..1b5412181 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -21,6 +21,7 @@ import java.util.function.LongConsumer; import org.reactivestreams.Publisher; import org.redisson.RedissonList; import org.redisson.api.RFuture; +import org.redisson.api.RListAsync; import org.redisson.client.codec.Codec; import io.netty.util.concurrent.Future; @@ -37,9 +38,9 @@ import reactor.core.publisher.FluxSink; */ public class RedissonListReactive { - private final RedissonList instance; + private final RListAsync instance; - public RedissonListReactive(RedissonList instance) { + public RedissonListReactive(RListAsync instance) { this.instance = instance; } diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingDequeReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingDequeReactiveTest.java new file mode 100644 index 000000000..351ffd66d --- /dev/null +++ b/redisson/src/test/java/org/redisson/RedissonBlockingDequeReactiveTest.java @@ -0,0 +1,214 @@ +package org.redisson; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.redisson.api.RBlockingDequeReactive; +import org.redisson.api.RBlockingDequeRx; + +public class RedissonBlockingDequeReactiveTest extends BaseReactiveTest { + + @Test + public void testTakeFirstElements() { + RBlockingDequeReactive queue = redisson.getBlockingDeque("test"); + List elements = new ArrayList<>(); + queue.takeFirstElements().subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(4); + } + + @Override + public void onNext(Integer t) { + elements.add(t); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + for (int i = 0; i < 10; i++) { + sync(queue.add(i)); + } + + assertThat(elements).containsExactly(0, 1, 2, 3); + } + + @Test + public void testPollLastAndOfferFirstTo() throws InterruptedException { + RBlockingDequeReactive blockingDeque = redisson.getBlockingDeque("blocking_deque"); + long start = System.currentTimeMillis(); + String redisTask = sync(blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS)); + assertThat(System.currentTimeMillis() - start).isBetween(950L, 1100L); + assertThat(redisTask).isNull(); + } + + @Test(timeout = 3000) + public void testShortPoll() throws InterruptedException { + RBlockingDequeReactive queue = redisson.getBlockingDeque("queue:pollany"); + sync(queue.pollLast(500, TimeUnit.MILLISECONDS)); + sync(queue.pollFirst(10, TimeUnit.MICROSECONDS)); + } + + @Test + public void testPollLastFromAny() throws InterruptedException { + final RBlockingDequeReactive queue1 = redisson.getBlockingDeque("deque:pollany"); + Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() { + @Override + public void run() { + RBlockingDequeReactive queue2 = redisson.getBlockingDeque("deque:pollany1"); + RBlockingDequeReactive queue3 = redisson.getBlockingDeque("deque:pollany2"); + sync(queue3.put(2)); + sync(queue1.put(1)); + sync(queue2.put(3)); + } + }, 3, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + int l = sync(queue1.pollLastFromAny(4, TimeUnit.SECONDS, "deque:pollany1", "deque:pollany2")); + + assertThat(l).isEqualTo(2); + assertThat(System.currentTimeMillis() - s).isGreaterThan(2000); + } + + @Test + public void testFirstLast() throws InterruptedException { + RBlockingDequeReactive deque = redisson.getBlockingDeque("deque"); + sync(deque.putFirst(1)); + sync(deque.putFirst(2)); + sync(deque.putLast(3)); + sync(deque.putLast(4)); + + assertThat(sync(deque)).containsExactly(2, 1, 3, 4); + } + + @Test + public void testOfferFirstLast() throws InterruptedException { + RBlockingDequeReactive deque = redisson.getBlockingDeque("deque"); + sync(deque.offerFirst(1)); + sync(deque.offerFirst(2)); + sync(deque.offerLast(3)); + sync(deque.offerLast(4)); + + assertThat(sync(deque)).containsExactly(2, 1, 3, 4); + } + + @Test + public void testTakeFirst() throws InterruptedException { + RBlockingDequeReactive deque = redisson.getBlockingDeque("queue:take"); + + sync(deque.offerFirst(1)); + sync(deque.offerFirst(2)); + sync(deque.offerLast(3)); + sync(deque.offerLast(4)); + + assertThat(sync(deque.takeFirst())).isEqualTo(2); + assertThat(sync(deque.takeFirst())).isEqualTo(1); + assertThat(sync(deque.takeFirst())).isEqualTo(3); + assertThat(sync(deque.takeFirst())).isEqualTo(4); + assertThat(sync(deque.size())).isZero(); + } + + @Test + public void testTakeLast() throws InterruptedException { + RBlockingDequeReactive deque = redisson.getBlockingDeque("queue:take"); + + sync(deque.offerFirst(1)); + sync(deque.offerFirst(2)); + sync(deque.offerLast(3)); + sync(deque.offerLast(4)); + + assertThat(sync(deque.takeLast())).isEqualTo(4); + assertThat(sync(deque.takeLast())).isEqualTo(3); + assertThat(sync(deque.takeLast())).isEqualTo(1); + assertThat(sync(deque.takeLast())).isEqualTo(2); + assertThat(sync(deque.size())).isZero(); + } + + + @Test + public void testTakeFirstAwait() throws InterruptedException { + RBlockingDequeReactive deque = redisson.getBlockingDeque("queue:take"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingDequeReactive deque1 = redisson.getBlockingDeque("queue:take"); + sync(deque1.putFirst(1)); + sync(deque1.putFirst(2)); + sync(deque1.putLast(3)); + sync(deque1.putLast(4)); + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + assertThat(sync(deque.takeFirst())).isEqualTo(1); + assertThat(System.currentTimeMillis() - s).isGreaterThan(9000); + Thread.sleep(50); + assertThat(sync(deque.takeFirst())).isEqualTo(2); + assertThat(sync(deque.takeFirst())).isEqualTo(3); + assertThat(sync(deque.takeFirst())).isEqualTo(4); + } + + @Test + public void testTakeLastAwait() throws InterruptedException { + RBlockingDequeReactive deque = redisson.getBlockingDeque("queue:take"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + RBlockingDequeReactive deque1 = redisson.getBlockingDeque("queue:take"); + sync(deque1.putFirst(1)); + sync(deque1.putFirst(2)); + sync(deque1.putLast(3)); + sync(deque1.putLast(4)); + }, 10, TimeUnit.SECONDS); + + long s = System.currentTimeMillis(); + assertThat(sync(deque.takeLast())).isEqualTo(1); + assertThat(System.currentTimeMillis() - s).isGreaterThan(9000); + Thread.sleep(50); + assertThat(sync(deque.takeLast())).isEqualTo(4); + assertThat(sync(deque.takeLast())).isEqualTo(3); + assertThat(sync(deque.takeLast())).isEqualTo(2); + } + + @Test + public void testPollFirst() throws InterruptedException { + RBlockingDequeReactive queue1 = redisson.getBlockingDeque("queue1"); + sync(queue1.put(1)); + sync(queue1.put(2)); + sync(queue1.put(3)); + + assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(1); + assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(2); + assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(3); + + long s = System.currentTimeMillis(); + assertThat(sync(queue1.pollFirst(5, TimeUnit.SECONDS))).isNull(); + assertThat(System.currentTimeMillis() - s).isGreaterThan(5000); + } + + @Test + public void testPollLast() throws InterruptedException { + RBlockingDequeReactive queue1 = redisson.getBlockingDeque("queue1"); + sync(queue1.putLast(1)); + sync(queue1.putLast(2)); + sync(queue1.putLast(3)); + + assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(3); + assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(2); + assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(1); + + long s = System.currentTimeMillis(); + assertThat(sync(queue1.pollLast(5, TimeUnit.SECONDS))).isNull(); + assertThat(System.currentTimeMillis() - s).isGreaterThan(5000); + } + +}