Feature - RBlockingQueueReactive.takeElements streaming method added

pull/1821/head
Nikita Koksharov 6 years ago
parent 93d99c10cc
commit 91f38a30d5

@ -69,6 +69,7 @@ import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.reactive.CommandReactiveService;
import org.redisson.reactive.ReactiveProxyBuilder;
import org.redisson.reactive.RedissonBatchReactive;
import org.redisson.reactive.RedissonBlockingQueueReactive;
import org.redisson.reactive.RedissonKeysReactive;
import org.redisson.reactive.RedissonLexSortedSetReactive;
import org.redisson.reactive.RedissonListMultimapReactive;
@ -332,14 +333,16 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingQueue<V>(commandExecutor, name, null),
new RedissonListReactive<V>(commandExecutor, name), RBlockingQueueReactive.class);
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, queue,
new RedissonBlockingQueueReactive<V>(queue), RBlockingQueueReactive.class);
}
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(String name, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingQueue<V>(codec, commandExecutor, name, null),
new RedissonListReactive<V>(codec, commandExecutor, name), RBlockingQueueReactive.class);
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(codec, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, queue,
new RedissonBlockingQueueReactive<V>(queue), RBlockingQueueReactive.class);
}
@Override

@ -18,6 +18,7 @@ package org.redisson.api;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@ -143,4 +144,11 @@ public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {
*/
Mono<Void> put(V e);
/**
* Retrieves and removes stream of elements from the head of this queue.
*
* @return stream of messages
*/
Flux<V> takeElements();
}

@ -125,7 +125,7 @@ public class ReactiveProxyBuilder {
}
if (implementation != null
&& instanceMethod.getDeclaringClass() == implementation.getClass()) {
&& instanceMethod.getDeclaringClass().isAssignableFrom(implementation.getClass())) {
return instanceMethod.invoke(implementation, args);
}

@ -0,0 +1,77 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.RedissonBlockingQueue;
import org.redisson.api.RFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
*
* @author Nikita Koksharov
*
* @param <V> - value type
*/
public class RedissonBlockingQueueReactive<V> extends RedissonListReactive<V> {
private final RedissonBlockingQueue<V> queue;
public RedissonBlockingQueueReactive(RedissonBlockingQueue<V> queue) {
super(queue);
this.queue = queue;
}
public Flux<V> takeElements() {
return Flux.<V>create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(emitter, counter, futureRef);
emitter.onDispose(() -> {
futureRef.get().cancel(true);
});
});
});
}
private void take(final FluxSink<V> emitter, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) {
RFuture<V> future = queue.takeAsync();
futureRef.set(future);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
emitter.error(future.cause());
return;
}
emitter.next(future.getNow());
if (counter.decrementAndGet() == 0) {
emitter.complete();
}
take(emitter, counter, futureRef);
}
});
}
}

@ -38,6 +38,10 @@ import reactor.core.publisher.FluxSink;
public class RedissonListReactive<V> {
private final RedissonList<V> instance;
public RedissonListReactive(RedissonList<V> instance) {
this.instance = instance;
}
public RedissonListReactive(CommandReactiveExecutor commandExecutor, String name) {
this.instance = new RedissonList<V>(commandExecutor, name, null);

@ -5,6 +5,7 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -13,10 +14,44 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBlockingQueueReactive;
public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest {
@Test
public void testTakeElements() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("test");
List<Integer> elements = new ArrayList<>();
queue.takeElements().subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(4);
}
@Override
public void onNext(Integer t) {
elements.add(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
for (int i = 0; i < 10; i++) {
sync(queue.add(i));
}
assertThat(elements).containsExactly(0, 1, 2, 3);
}
@Test
public void testPollFromAny() throws InterruptedException {
final RBlockingQueueReactive<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");

Loading…
Cancel
Save