Feature - RBlockingQueueRx.takeElements method added

pull/1721/head
Nikita Koksharov 6 years ago
parent 0444fe2c50
commit 045c2c2e2a

@ -65,6 +65,7 @@ import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.rx.CommandRxExecutor;
import org.redisson.rx.CommandRxService;
import org.redisson.rx.RedissonBatchRx;
import org.redisson.rx.RedissonBlockingQueueRx;
import org.redisson.rx.RedissonKeysRx;
import org.redisson.rx.RedissonLexSortedSetRx;
import org.redisson.rx.RedissonListMultimapRx;
@ -320,14 +321,14 @@ public class RedissonRx implements RedissonRxClient {
public <V> RBlockingQueueRx<V> getBlockingQueue(String name) {
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonListRx<V>(queue), RBlockingQueueRx.class);
new RedissonBlockingQueueRx<V>(queue), RBlockingQueueRx.class);
}
@Override
public <V> RBlockingQueueRx<V> getBlockingQueue(String name, Codec codec) {
RedissonBlockingQueue<V> queue = new RedissonBlockingQueue<V>(codec, commandExecutor, name, null);
return RxProxyBuilder.create(commandExecutor, queue,
new RedissonListRx<V>(queue), RBlockingQueueRx.class);
new RedissonBlockingQueueRx<V>(queue), RBlockingQueueRx.class);
}
@Override

@ -143,4 +143,11 @@ public interface RBlockingQueueRx<V> extends RQueueRx<V> {
*/
Flowable<Void> put(V e);
/**
* Retrieves and removes stream of elements from the head of this queue.
*
* @return stream of messages
*/
Flowable<V> takeElements();
}

@ -124,7 +124,7 @@ public class RxProxyBuilder {
}
if (implementation != null
&& instanceMethod.getDeclaringClass() == implementation.getClass()) {
&& instanceMethod.getDeclaringClass().isAssignableFrom(implementation.getClass())) {
return instanceMethod.invoke(implementation, args);
}

@ -5,6 +5,7 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -13,10 +14,44 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBlockingQueueRx;
public class RedissonBlockingQueueRxTest extends BaseRxTest {
@Test
public void testTakeElements() {
RBlockingQueueRx<Integer> queue = redisson.getBlockingQueue("test");
List<Integer> elements = new ArrayList<>();
queue.takeElements().subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(4);
}
@Override
public void onNext(Integer t) {
elements.add(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
for (int i = 0; i < 10; i++) {
sync(queue.add(i));
}
assertThat(elements).containsExactly(0, 1, 2, 3);
}
@Test
public void testPollFromAny() throws InterruptedException {
final RBlockingQueueRx<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");

Loading…
Cancel
Save