From 91f38a30d59d59431afea0b9ad6963e216ed09c1 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 5 Nov 2018 16:31:42 +0300 Subject: [PATCH] Feature - RBlockingQueueReactive.takeElements streaming method added --- .../java/org/redisson/RedissonReactive.java | 11 ++- .../redisson/api/RBlockingQueueReactive.java | 8 ++ .../reactive/ReactiveProxyBuilder.java | 2 +- .../RedissonBlockingQueueReactive.java | 77 +++++++++++++++++++ .../reactive/RedissonListReactive.java | 4 + .../RedissonBlockingQueueReactiveTest.java | 35 +++++++++ 6 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index d3b2d4997..a3497ba75 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -69,6 +69,7 @@ import org.redisson.pubsub.SemaphorePubSub; import org.redisson.reactive.CommandReactiveService; import org.redisson.reactive.ReactiveProxyBuilder; import org.redisson.reactive.RedissonBatchReactive; +import org.redisson.reactive.RedissonBlockingQueueReactive; import org.redisson.reactive.RedissonKeysReactive; import org.redisson.reactive.RedissonLexSortedSetReactive; import org.redisson.reactive.RedissonListMultimapReactive; @@ -332,14 +333,16 @@ public class RedissonReactive implements RedissonReactiveClient { @Override public RBlockingQueueReactive getBlockingQueue(String name) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingQueue(commandExecutor, name, null), - new RedissonListReactive(commandExecutor, name), RBlockingQueueReactive.class); + RedissonBlockingQueue queue = new RedissonBlockingQueue(commandExecutor, name, null); + return ReactiveProxyBuilder.create(commandExecutor, queue, + new RedissonBlockingQueueReactive(queue), RBlockingQueueReactive.class); } @Override public RBlockingQueueReactive getBlockingQueue(String name, Codec codec) { - return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingQueue(codec, commandExecutor, name, null), - new RedissonListReactive(codec, commandExecutor, name), RBlockingQueueReactive.class); + RedissonBlockingQueue queue = new RedissonBlockingQueue(codec, commandExecutor, name, null); + return ReactiveProxyBuilder.create(commandExecutor, queue, + new RedissonBlockingQueueReactive(queue), RBlockingQueueReactive.class); } @Override diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java index fb09ba840..c02860425 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java @@ -18,6 +18,7 @@ package org.redisson.api; import java.util.Collection; import java.util.concurrent.TimeUnit; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -143,4 +144,11 @@ public interface RBlockingQueueReactive extends RQueueReactive { */ Mono put(V e); + /** + * Retrieves and removes stream of elements from the head of this queue. + * + * @return stream of messages + */ + Flux takeElements(); + } diff --git a/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java b/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java index 347c1f7c0..52f883ecc 100644 --- a/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java +++ b/redisson/src/main/java/org/redisson/reactive/ReactiveProxyBuilder.java @@ -125,7 +125,7 @@ public class ReactiveProxyBuilder { } if (implementation != null - && instanceMethod.getDeclaringClass() == implementation.getClass()) { + && instanceMethod.getDeclaringClass().isAssignableFrom(implementation.getClass())) { return instanceMethod.invoke(implementation, args); } diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java new file mode 100644 index 000000000..716d1e22b --- /dev/null +++ b/redisson/src/main/java/org/redisson/reactive/RedissonBlockingQueueReactive.java @@ -0,0 +1,77 @@ +/** + * Copyright 2018 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.reactive; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.redisson.RedissonBlockingQueue; +import org.redisson.api.RFuture; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +/** + * + * @author Nikita Koksharov + * + * @param - value type + */ +public class RedissonBlockingQueueReactive extends RedissonListReactive { + + private final RedissonBlockingQueue queue; + + public RedissonBlockingQueueReactive(RedissonBlockingQueue queue) { + super(queue); + this.queue = queue; + } + + public Flux takeElements() { + return Flux.create(emitter -> { + emitter.onRequest(n -> { + AtomicLong counter = new AtomicLong(n); + AtomicReference> futureRef = new AtomicReference>(); + take(emitter, counter, futureRef); + emitter.onDispose(() -> { + futureRef.get().cancel(true); + }); + }); + }); + } + + private void take(final FluxSink emitter, final AtomicLong counter, final AtomicReference> futureRef) { + RFuture future = queue.takeAsync(); + futureRef.set(future); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + emitter.error(future.cause()); + return; + } + + emitter.next(future.getNow()); + if (counter.decrementAndGet() == 0) { + emitter.complete(); + } + + take(emitter, counter, futureRef); + } + }); + } +} diff --git a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java index 816e44b04..f666e8194 100644 --- a/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java +++ b/redisson/src/main/java/org/redisson/reactive/RedissonListReactive.java @@ -38,6 +38,10 @@ import reactor.core.publisher.FluxSink; public class RedissonListReactive { private final RedissonList instance; + + public RedissonListReactive(RedissonList instance) { + this.instance = instance; + } public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) { this.instance = new RedissonList(commandExecutor, name, null); diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java index 4b5c3a996..ea9333da4 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueReactiveTest.java @@ -5,6 +5,7 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -13,10 +14,44 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.redisson.api.RBlockingQueueReactive; public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest { + @Test + public void testTakeElements() { + RBlockingQueueReactive queue = redisson.getBlockingQueue("test"); + List elements = new ArrayList<>(); + queue.takeElements().subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(4); + } + + @Override + public void onNext(Integer t) { + elements.add(t); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + for (int i = 0; i < 10; i++) { + sync(queue.add(i)); + } + + assertThat(elements).containsExactly(0, 1, 2, 3); + } + @Test public void testPollFromAny() throws InterruptedException { final RBlockingQueueReactive queue1 = redisson.getBlockingQueue("queue:pollany");