Merge branch 'master' into 3.0.0

# Conflicts:
#	redisson/src/main/java/org/redisson/api/RBlockingQueueReactive.java
pull/1821/head
Nikita Koksharov 6 years ago
commit 6ea0cc4ed7

@ -65,6 +65,7 @@ import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.rx.CommandRxExecutor;
import org.redisson.rx.CommandRxService;
import org.redisson.rx.RedissonBatchRx;
import org.redisson.rx.RedissonBlockingDequeRx;
import org.redisson.rx.RedissonBlockingQueueRx;
import org.redisson.rx.RedissonKeysRx;
import org.redisson.rx.RedissonLexSortedSetRx;
@ -479,14 +480,14 @@ public class RedissonRx implements RedissonRxClient {
public <V> RBlockingDequeRx<V> getBlockingDeque(String name) {
RedissonBlockingDeque<V> deque = new RedissonBlockingDeque<V>(commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, deque,
new RedissonListRx<V>(deque), RBlockingDequeRx.class);
new RedissonBlockingDequeRx<V>(deque), RBlockingDequeRx.class);
}
@Override
public <V> RBlockingDequeRx<V> getBlockingDeque(String name, Codec codec) {
RedissonBlockingDeque<V> deque = new RedissonBlockingDeque<V>(codec, commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, deque,
new RedissonListRx<V>(deque), RBlockingDequeRx.class);
new RedissonBlockingDequeRx<V>(deque), RBlockingDequeRx.class);
}
}

@ -111,4 +111,18 @@ public interface RBlockingDequeRx<V> extends RDequeRx<V>, RBlockingQueueRx<V> {
*/
Flowable<V> takeFirst();
/**
* Retrieves and removes stream of elements from the head of this queue. Waits for an element become available.
*
* @return the head element of this queue
*/
Flowable<V> takeFirstElements();
/**
* Retrieves and removes stream of elements from the tail of this queue. Waits for an element become available.
*
* @return the head element of this queue
*/
Flowable<V> takeLastElements();
}

@ -121,6 +121,17 @@ public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {
* specified waiting time elapses before an element is available
*/
Mono<V> poll(long timeout, TimeUnit unit);
/**
* Retrieves and removes last available tail element of <b>any</b> queue and adds it at the head of <code>queueName</code>,
* waiting if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueName - names of destination queue
* @return the tail of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
*/
Mono<V> takeLastAndOfferFirstTo(String queueName);
/**
* Retrieves and removes the head of this queue in async mode, waiting if necessary

@ -128,6 +128,17 @@ public interface RBlockingQueueRx<V> extends RQueueRx<V> {
* @return the head of this queue
*/
Flowable<V> take();
/**
* Retrieves and removes last available tail element of <b>any</b> queue and adds it at the head of <code>queueName</code>,
* waiting if necessary for an element to become available
* in any of defined queues <b>including</b> queue itself.
*
* @param queueName - names of destination queue
* @return the tail of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
*/
Flowable<V> takeLastAndOfferFirstTo(String queueName);
/**
* Inserts the specified element into this queue in async mode, waiting if necessary
@ -145,6 +156,7 @@ public interface RBlockingQueueRx<V> extends RQueueRx<V> {
/**
* Retrieves and removes stream of elements from the head of this queue.
* Waits for an element become available.
*
* @return stream of messages
*/

@ -0,0 +1,58 @@
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.rx;
import java.util.concurrent.Callable;
import org.redisson.RedissonBlockingDeque;
import org.redisson.api.RFuture;
import io.reactivex.Flowable;
/**
*
* @author Nikita Koksharov
*
* @param <V> - value type
*/
public class RedissonBlockingDequeRx<V> extends RedissonBlockingQueueRx<V> {
private final RedissonBlockingDeque<V> queue;
public RedissonBlockingDequeRx(RedissonBlockingDeque<V> queue) {
super(queue);
this.queue = queue;
}
public Flowable<V> takeFirstElements() {
return takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeFirstAsync();
}
});
}
public Flowable<V> takeLastElements() {
return takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeLastAsync();
}
});
}
}

@ -15,11 +15,13 @@
*/
package org.redisson.rx;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.RedissonBlockingQueue;
import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -36,21 +38,32 @@ import io.reactivex.processors.ReplayProcessor;
*/
public class RedissonBlockingQueueRx<V> extends RedissonListRx<V> {
private final RedissonBlockingQueue<V> queue;
private final RBlockingQueueAsync<V> queue;
public RedissonBlockingQueueRx(RedissonBlockingQueue<V> queue) {
super(queue);
public RedissonBlockingQueueRx(RBlockingQueueAsync<V> queue) {
super((RListAsync<V>) queue);
this.queue = queue;
}
public Flowable<V> takeElements() {
return takeElements(new Callable<RFuture<V>>() {
@Override
public RFuture<V> call() throws Exception {
return queue.takeAsync();
}
});
}
protected final Flowable<V> takeElements(final Callable<RFuture<V>> callable) {
final ReplayProcessor<V> p = ReplayProcessor.create();
return p.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Exception {
final AtomicLong counter = new AtomicLong(n);
final AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(p, counter, futureRef);
take(callable, p, counter, futureRef);
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
@ -61,8 +74,8 @@ public class RedissonBlockingQueueRx<V> extends RedissonListRx<V> {
});
}
private void take(final ReplayProcessor<V> p, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) {
RFuture<V> future = queue.takeAsync();
private void take(final Callable<RFuture<V>> factory, final ReplayProcessor<V> p, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) throws Exception {
RFuture<V> future = factory.call();
futureRef.set(future);
future.addListener(new FutureListener<V>() {
@Override
@ -77,8 +90,9 @@ public class RedissonBlockingQueueRx<V> extends RedissonListRx<V> {
p.onComplete();
}
take(p, counter, futureRef);
take(factory, p, counter, futureRef);
}
});
}
}

@ -16,8 +16,8 @@
package org.redisson.rx;
import org.reactivestreams.Publisher;
import org.redisson.RedissonList;
import org.redisson.api.RFuture;
import org.redisson.api.RListAsync;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -33,9 +33,9 @@ import io.reactivex.processors.ReplayProcessor;
*/
public class RedissonListRx<V> {
private final RedissonList<V> instance;
private final RListAsync<V> instance;
public RedissonListRx(RedissonList<V> instance) {
public RedissonListRx(RListAsync<V> instance) {
this.instance = instance;
}

@ -0,0 +1,213 @@
package org.redisson.rx;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBlockingDequeRx;
public class RedissonBlockingDequeRxTest extends BaseRxTest {
@Test
public void testTakeFirstElements() {
RBlockingDequeRx<Integer> queue = redisson.getBlockingDeque("test");
List<Integer> elements = new ArrayList<>();
queue.takeFirstElements().subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(4);
}
@Override
public void onNext(Integer t) {
elements.add(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
for (int i = 0; i < 10; i++) {
sync(queue.add(i));
}
assertThat(elements).containsExactly(0, 1, 2, 3);
}
@Test
public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingDequeRx<String> blockingDeque = redisson.getBlockingDeque("blocking_deque");
long start = System.currentTimeMillis();
String redisTask = sync(blockingDeque.pollLastAndOfferFirstTo("deque", 1, TimeUnit.SECONDS));
assertThat(System.currentTimeMillis() - start).isBetween(950L, 1100L);
assertThat(redisTask).isNull();
}
@Test(timeout = 3000)
public void testShortPoll() throws InterruptedException {
RBlockingDequeRx<Integer> queue = redisson.getBlockingDeque("queue:pollany");
sync(queue.pollLast(500, TimeUnit.MILLISECONDS));
sync(queue.pollFirst(10, TimeUnit.MICROSECONDS));
}
@Test
public void testPollLastFromAny() throws InterruptedException {
final RBlockingDequeRx<Integer> queue1 = redisson.getBlockingDeque("deque:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingDequeRx<Integer> queue2 = redisson.getBlockingDeque("deque:pollany1");
RBlockingDequeRx<Integer> queue3 = redisson.getBlockingDeque("deque:pollany2");
sync(queue3.put(2));
sync(queue1.put(1));
sync(queue2.put(3));
}
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = sync(queue1.pollLastFromAny(4, TimeUnit.SECONDS, "deque:pollany1", "deque:pollany2"));
assertThat(l).isEqualTo(2);
assertThat(System.currentTimeMillis() - s).isGreaterThan(2000);
}
@Test
public void testFirstLast() throws InterruptedException {
RBlockingDequeRx<Integer> deque = redisson.getBlockingDeque("deque");
sync(deque.putFirst(1));
sync(deque.putFirst(2));
sync(deque.putLast(3));
sync(deque.putLast(4));
assertThat(sync(deque)).containsExactly(2, 1, 3, 4);
}
@Test
public void testOfferFirstLast() throws InterruptedException {
RBlockingDequeRx<Integer> deque = redisson.getBlockingDeque("deque");
sync(deque.offerFirst(1));
sync(deque.offerFirst(2));
sync(deque.offerLast(3));
sync(deque.offerLast(4));
assertThat(sync(deque)).containsExactly(2, 1, 3, 4);
}
@Test
public void testTakeFirst() throws InterruptedException {
RBlockingDequeRx<Integer> deque = redisson.getBlockingDeque("queue:take");
sync(deque.offerFirst(1));
sync(deque.offerFirst(2));
sync(deque.offerLast(3));
sync(deque.offerLast(4));
assertThat(sync(deque.takeFirst())).isEqualTo(2);
assertThat(sync(deque.takeFirst())).isEqualTo(1);
assertThat(sync(deque.takeFirst())).isEqualTo(3);
assertThat(sync(deque.takeFirst())).isEqualTo(4);
assertThat(sync(deque.size())).isZero();
}
@Test
public void testTakeLast() throws InterruptedException {
RBlockingDequeRx<Integer> deque = redisson.getBlockingDeque("queue:take");
sync(deque.offerFirst(1));
sync(deque.offerFirst(2));
sync(deque.offerLast(3));
sync(deque.offerLast(4));
assertThat(sync(deque.takeLast())).isEqualTo(4);
assertThat(sync(deque.takeLast())).isEqualTo(3);
assertThat(sync(deque.takeLast())).isEqualTo(1);
assertThat(sync(deque.takeLast())).isEqualTo(2);
assertThat(sync(deque.size())).isZero();
}
@Test
public void testTakeFirstAwait() throws InterruptedException {
RBlockingDequeRx<Integer> deque = redisson.getBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingDequeRx<Integer> deque1 = redisson.getBlockingDeque("queue:take");
sync(deque1.putFirst(1));
sync(deque1.putFirst(2));
sync(deque1.putLast(3));
sync(deque1.putLast(4));
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
assertThat(sync(deque.takeFirst())).isEqualTo(1);
assertThat(System.currentTimeMillis() - s).isGreaterThan(9000);
Thread.sleep(50);
assertThat(sync(deque.takeFirst())).isEqualTo(2);
assertThat(sync(deque.takeFirst())).isEqualTo(3);
assertThat(sync(deque.takeFirst())).isEqualTo(4);
}
@Test
public void testTakeLastAwait() throws InterruptedException {
RBlockingDequeRx<Integer> deque = redisson.getBlockingDeque("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
RBlockingDequeRx<Integer> deque1 = redisson.getBlockingDeque("queue:take");
sync(deque1.putFirst(1));
sync(deque1.putFirst(2));
sync(deque1.putLast(3));
sync(deque1.putLast(4));
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
assertThat(sync(deque.takeLast())).isEqualTo(1);
assertThat(System.currentTimeMillis() - s).isGreaterThan(9000);
Thread.sleep(50);
assertThat(sync(deque.takeLast())).isEqualTo(4);
assertThat(sync(deque.takeLast())).isEqualTo(3);
assertThat(sync(deque.takeLast())).isEqualTo(2);
}
@Test
public void testPollFirst() throws InterruptedException {
RBlockingDequeRx<Integer> queue1 = redisson.getBlockingDeque("queue1");
sync(queue1.put(1));
sync(queue1.put(2));
sync(queue1.put(3));
assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(1);
assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(2);
assertThat(sync(queue1.pollFirst(2, TimeUnit.SECONDS))).isEqualTo(3);
long s = System.currentTimeMillis();
assertThat(sync(queue1.pollFirst(5, TimeUnit.SECONDS))).isNull();
assertThat(System.currentTimeMillis() - s).isGreaterThan(5000);
}
@Test
public void testPollLast() throws InterruptedException {
RBlockingDequeRx<Integer> queue1 = redisson.getBlockingDeque("queue1");
sync(queue1.putLast(1));
sync(queue1.putLast(2));
sync(queue1.putLast(3));
assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(3);
assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(2);
assertThat(sync(queue1.pollLast(2, TimeUnit.SECONDS))).isEqualTo(1);
long s = System.currentTimeMillis();
assertThat(sync(queue1.pollLast(5, TimeUnit.SECONDS))).isNull();
assertThat(System.currentTimeMillis() - s).isGreaterThan(5000);
}
}
Loading…
Cancel
Save