Feature - takeFirstElements and takeLastElements methods added to RBlockingDequeReactive

pull/1821/head
Nikita Koksharov 6 years ago
parent 6ea0cc4ed7
commit 59aeef0e8b

@ -69,6 +69,7 @@ import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.reactive.CommandReactiveService;
import org.redisson.reactive.ReactiveProxyBuilder;
import org.redisson.reactive.RedissonBatchReactive;
import org.redisson.reactive.RedissonBlockingDequeReactive;
import org.redisson.reactive.RedissonBlockingQueueReactive;
import org.redisson.reactive.RedissonKeysReactive;
import org.redisson.reactive.RedissonLexSortedSetReactive;
@ -495,13 +496,15 @@ public class RedissonReactive implements RedissonReactiveClient {
@Override
public <V> RBlockingDequeReactive<V> getBlockingDeque(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingDeque<V>(commandExecutor, name, null),
new RedissonListReactive<V>(commandExecutor, name), RBlockingDequeReactive.class);
RedissonBlockingDeque<V> deque = new RedissonBlockingDeque<V>(commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, deque,
new RedissonBlockingDequeReactive<V>(deque), RBlockingDequeReactive.class);
}
@Override
public <V> RBlockingDequeReactive<V> getBlockingDeque(String name, Codec codec) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonBlockingDeque<V>(codec, commandExecutor, name, null),
new RedissonListReactive<V>(codec, commandExecutor, name), RBlockingDequeReactive.class);
RedissonBlockingDeque<V> deque = new RedissonBlockingDeque<V>(codec, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, deque,
new RedissonBlockingDequeReactive<V>(deque), RBlockingDequeReactive.class);
}
}

@ -17,6 +17,7 @@ package org.redisson.api;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
@ -111,4 +112,18 @@ public interface RBlockingDequeReactive<V> extends RDequeReactive<V>, RBlockingQ
*/
Mono<V> takeFirst();
/**
* Retrieves and removes stream of elements from the head of this queue. Waits for an element become available.
*
* @return the head element of this queue
*/
Flux<V> takeFirstElements();
/**
* Retrieves and removes stream of elements from the tail of this queue. Waits for an element become available.
*
* @return the head element of this queue
*/
Flux<V> takeLastElements();
}

@ -0,0 +1,58 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.reactive;
import java.util.concurrent.Callable;
import org.redisson.RedissonBlockingDeque;
import org.redisson.api.RFuture;
import reactor.core.publisher.Flux;
/**
*
* @author Nikita Koksharov
*
* @param <V> - value type
*/
public class RedissonBlockingDequeReactive<V> extends RedissonBlockingQueueReactive<V> {
private final RedissonBlockingDeque<V> queue;
public RedissonBlockingDequeReactive(RedissonBlockingDeque<V> queue) {
super(queue);
this.queue = queue;
}
public Flux<V> takeFirstElements() {
return takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeFirstAsync();
}
});
}
public Flux<V> takeLastElements() {
return takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeLastAsync();
}
});
}
}

@ -15,11 +15,13 @@
*/
package org.redisson.reactive;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.RedissonBlockingQueue;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -34,28 +36,21 @@ import reactor.core.publisher.FluxSink;
*/
public class RedissonBlockingQueueReactive<V> extends RedissonListReactive<V> {
private final RedissonBlockingQueue<V> queue;
private final RBlockingQueue<V> queue;
public RedissonBlockingQueueReactive(RedissonBlockingQueue<V> queue) {
super(queue);
public RedissonBlockingQueueReactive(RBlockingQueue<V> queue) {
super((RListAsync<V>)queue);
this.queue = queue;
}
public Flux<V> takeElements() {
return Flux.<V>create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(emitter, counter, futureRef);
emitter.onDispose(() -> {
futureRef.get().cancel(true);
});
});
});
}
private void take(final FluxSink<V> emitter, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) {
RFuture<V> future = queue.takeAsync();
private void take(final Callable<RFuture<V>> factory, final FluxSink<V> emitter, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) {
RFuture<V> future;
try {
future = factory.call();
} catch (Exception e) {
emitter.error(e);
return;
}
futureRef.set(future);
future.addListener(new FutureListener<V>() {
@Override
@ -70,8 +65,31 @@ public class RedissonBlockingQueueReactive<V> extends RedissonListReactive<V> {
emitter.complete();
}
take(emitter, counter, futureRef);
take(factory, emitter, counter, futureRef);
}
});
}
public Flux<V> takeElements() {
return takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeAsync();
}
});
}
protected final Flux<V> takeElements(Callable<RFuture<V>> callable) {
return Flux.<V>create(emitter -> {
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, emitter, counter, futureRef);
emitter.onDispose(() -> {
futureRef.get().cancel(true);
});
});
});
}
}

@ -21,6 +21,7 @@ import java.util.function.LongConsumer;
import org.reactivestreams.Publisher;
import org.redisson.RedissonList;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import org.redisson.client.codec.Codec;
import io.netty.util.concurrent.Future;
@ -37,9 +38,9 @@ import reactor.core.publisher.FluxSink;
*/
public class RedissonListReactive<V> {
private final RedissonList<V> instance;
private final RListAsync<V> instance;
public RedissonListReactive(RedissonList<V> instance) {
public RedissonListReactive(RListAsync<V> instance) {
this.instance = instance;
}

@ -0,0 +1,214 @@
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBlockingDequeReactive;
import org.redisson.api.RBlockingDequeRx;
public class RedissonBlockingDequeReactiveTest extends BaseReactiveTest {
@Test
public void testTakeFirstElements() {
RBlockingDequeReactive<Integer> queue = redisson.getBlockingDeque("test");
List<Integer> elements = new ArrayList<>();
queue.takeFirstElements().subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(4);
}
@Override
public void onNext(Integer t) {
elements.add(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
for (int i = 0; i < 10; i++) {
sync(queue.add(i));
}
assertThat(elements).containsExactly(0, 1, 2, 3);
}
@Test
public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingDequeReactive<String> blockingDeque = redisson.getBlockingDeque("blocking_deque");
long start = System.currentTimeMillis();
String redisTask = sync(blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS));
assertThat(System.currentTimeMillis() - start).isBetween(950L, 1100L);
assertThat(redisTask).isNull();
}
@Test(timeout = 3000)
public void testShortPoll() throws InterruptedException {
RBlockingDequeReactive<Integer> queue = redisson.getBlockingDeque("queue:pollany");
sync(queue.pollLast(500, TimeUnit.MILLISECONDS));
sync(queue.pollFirst(10, TimeUnit.MICROSECONDS));
}
@Test
public void testPollLastFromAny() throws InterruptedException {
final RBlockingDequeReactive<Integer> queue1 = redisson.getBlockingDeque("deque:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingDequeReactive<Integer> queue2 = redisson.getBlockingDeque("deque:pollany1");
RBlockingDequeReactive<Integer> queue3 = redisson.getBlockingDeque("deque:pollany2");
sync(queue3.put(2));
sync(queue1.put(1));
sync(queue2.put(3));
}
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = sync(queue1.pollLastFromAny(4, TimeUnit.SECONDS, "deque:pollany1", "deque:pollany2"));
assertThat(l).isEqualTo(2);
assertThat(System.currentTimeMillis() - s).isGreaterThan(2000);
}
@Test
public void testFirstLast() throws InterruptedException {
RBlockingDequeReactive<Integer> deque = redisson.getBlockingDeque("deque");
sync(deque.putFirst(1));
sync(deque.putFirst(2));
sync(deque.putLast(3));
sync(deque.putLast(4));
assertThat(sync(deque)).containsExactly(2, 1, 3, 4);
}
@Test
public void testOfferFirstLast() throws InterruptedException {
RBlockingDequeReactive<Integer> deque = redisson.getBlockingDeque("deque");
sync(deque.offerFirst(1));
sync(deque.offerFirst(2));
sync(deque.offerLast(3));
sync(deque.offerLast(4));
assertThat(sync(deque)).containsExactly(2, 1, 3, 4);
}
@Test
public void testTakeFirst() throws InterruptedException {
RBlockingDequeReactive<Integer> deque = redisson.getBlockingDeque("queue:take");
sync(deque.offerFirst(1));
sync(deque.offerFirst(2));
sync(deque.offerLast(3));
sync(deque.offerLast(4));
assertThat(sync(deque.takeFirst())).isEqualTo(2);
assertThat(sync(deque.takeFirst())).isEqualTo(1);
assertThat(sync(deque.takeFirst())).isEqualTo(3);
assertThat(sync(deque.takeFirst())).isEqualTo(4);
assertThat(sync(deque.size())).isZero();
}
@Test
public void testTakeLast() throws InterruptedException {
RBlockingDequeReactive<Integer> deque = redisson.getBlockingDeque("queue:take");
sync(deque.offerFirst(1));
sync(deque.offerFirst(2));
sync(deque.offerLast(3));
sync(deque.offerLast(4));
assertThat(sync(deque.takeLast())).isEqualTo(4);
assertThat(sync(deque.takeLast())).isEqualTo(3);
assertThat(sync(deque.takeLast())).isEqualTo(1);
assertThat(sync(deque.takeLast())).isEqualTo(2);
assertThat(sync(deque.size())).isZero();
}
@Test
public void testTakeFirstAwait() throws InterruptedException {
RBlockingDequeReactive<Integer> deque = redisson.getBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingDequeReactive<Integer> deque1 = redisson.getBlockingDeque("queue:take");
sync(deque1.putFirst(1));
sync(deque1.putFirst(2));
sync(deque1.putLast(3));
sync(deque1.putLast(4));
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
assertThat(sync(deque.takeFirst())).isEqualTo(1);
assertThat(System.currentTimeMillis() - s).isGreaterThan(9000);
Thread.sleep(50);
assertThat(sync(deque.takeFirst())).isEqualTo(2);
assertThat(sync(deque.takeFirst())).isEqualTo(3);
assertThat(sync(deque.takeFirst())).isEqualTo(4);
}
@Test
public void testTakeLastAwait() throws InterruptedException {
RBlockingDequeReactive<Integer> deque = redisson.getBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingDequeReactive<Integer> deque1 = redisson.getBlockingDeque("queue:take");
sync(deque1.putFirst(1));
sync(deque1.putFirst(2));
sync(deque1.putLast(3));
sync(deque1.putLast(4));
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
assertThat(sync(deque.takeLast())).isEqualTo(1);
assertThat(System.currentTimeMillis() - s).isGreaterThan(9000);
Thread.sleep(50);
assertThat(sync(deque.takeLast())).isEqualTo(4);
assertThat(sync(deque.takeLast())).isEqualTo(3);
assertThat(sync(deque.takeLast())).isEqualTo(2);
}
@Test
public void testPollFirst() throws InterruptedException {
RBlockingDequeReactive<Integer> queue1 = redisson.getBlockingDeque("queue1");
sync(queue1.put(1));
sync(queue1.put(2));
sync(queue1.put(3));
assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(1);
assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(2);
assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(3);
long s = System.currentTimeMillis();
assertThat(sync(queue1.pollFirst(5, TimeUnit.SECONDS))).isNull();
assertThat(System.currentTimeMillis() - s).isGreaterThan(5000);
}
@Test
public void testPollLast() throws InterruptedException {
RBlockingDequeReactive<Integer> queue1 = redisson.getBlockingDeque("queue1");
sync(queue1.putLast(1));
sync(queue1.putLast(2));
sync(queue1.putLast(3));
assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(3);
assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(2);
assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(1);
long s = System.currentTimeMillis();
assertThat(sync(queue1.pollLast(5, TimeUnit.SECONDS))).isNull();
assertThat(System.currentTimeMillis() - s).isGreaterThan(5000);
}
}
Loading…
Cancel
Save