RBlockingQueueReactive added. #210

pull/337/head
Nikita 9 years ago
parent 8eed10c39d
commit f376c74b2b

@ -0,0 +1,111 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandReactiveExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.core.RBlockingQueueReactive;
/**
* Offers blocking queue facilities through an intermediary
* {@link LinkedBlockingQueue} where items are added as soon as
* <code>blpop</code> returns. All {@link BlockingQueue} methods are actually
* delegated to this intermediary queue.
*
* @author Nikita Koksharov
*/
public class RedissonBlockingQueueReactive<V> extends RedissonQueueReactive<V> implements RBlockingQueueReactive<V> {
protected RedissonBlockingQueueReactive(CommandReactiveExecutor commandExecutor, String name) {
super(commandExecutor, name);
}
protected RedissonBlockingQueueReactive(Codec codec, CommandReactiveExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}
@Override
public Publisher<Long> put(V e) {
return offer(e);
}
@Override
public Publisher<V> take() {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}
@Override
public Publisher<V> poll(long timeout, TimeUnit unit) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
}
/*
* (non-Javadoc)
* @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/
@Override
public Publisher<V> pollFromAny(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}
params.add(unit.toSeconds(timeout));
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray());
}
@Override
public Publisher<V> pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
}
@Override
public Publisher<Integer> drainTo(Collection<? super V> c) {
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"return vals", Collections.<Object>singletonList(getName()));
}
@Override
public Publisher<Integer> drainTo(Collection<? super V> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +
"return vals",
Collections.<Object>singletonList(getName()), maxElements);
}
}

@ -29,11 +29,11 @@ import org.redisson.connection.ElasticacheConnectionManager;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RLexSortedSetReactive;
import org.redisson.core.RListReactive;
import org.redisson.core.RMap;
import org.redisson.core.RMapReactive;
import org.redisson.core.RPatternTopicReactive;
import org.redisson.core.RQueueReactive;
@ -193,6 +193,16 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonQueueReactive<V>(codec, commandExecutor, name);
}
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(String name) {
return new RedissonBlockingQueueReactive<V>(commandExecutor, name);
}
@Override
public <V> RBlockingQueueReactive<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueueReactive<V>(codec, commandExecutor, name);
}
public Config getConfig() {
return config;
}

@ -18,6 +18,7 @@ package org.redisson;
import java.util.List;
import org.redisson.client.codec.Codec;
import org.redisson.core.RBlockingQueueReactive;
import org.redisson.core.RBucketReactive;
import org.redisson.core.RHyperLogLogReactive;
import org.redisson.core.RLexSortedSet;
@ -143,16 +144,16 @@ public interface RedissonReactiveClient {
<V> RQueueReactive<V> getQueue(String name, Codec codec);
// /**
// * Returns blocking queue instance by name.
// *
// * @param name of queue
// * @return
// */
// <V> RBlockingQueue<V> getBlockingQueue(String name);
//
// <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec);
//
/**
* Returns blocking queue instance by name.
*
* @param name of queue
* @return
*/
<V> RBlockingQueueReactive<V> getBlockingQueue(String name);
<V> RBlockingQueueReactive<V> getBlockingQueue(String name, Codec codec);
// /**
// * Returns deque instance by name.
// *

@ -0,0 +1,59 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.core;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
/**
* {@link BlockingQueue} backed by Redis
*
* @author Nikita Koksharov
* @param <V> the type of elements held in this collection
*/
public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {
/**
* Retrieves and removes first available head of <b>any</b> queue in mode, waiting up to the
* specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return Publisher object with the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Publisher<V> pollFromAny(long timeout, TimeUnit unit, String ... queueNames);
Publisher<Integer> drainTo(Collection<? super V> c, int maxElements);
Publisher<Integer> drainTo(Collection<? super V> c);
Publisher<V> pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit);
Publisher<V> poll(long timeout, TimeUnit unit);
Publisher<V> take();
Publisher<Long> put(V e);
}

@ -0,0 +1,211 @@
package org.redisson;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RBlockingQueueReactive;
public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest {
@Test
public void testPollFromAny() throws InterruptedException {
final RBlockingQueueReactive<Integer> queue1 = redisson.getBlockingQueue("queue:pollany");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingQueueReactive<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueueReactive<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
sync(queue3.put(2));
sync(queue1.put(1));
sync(queue2.put(3));
}
}, 3, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = sync(queue1.pollFromAny(4, TimeUnit.SECONDS, "queue:pollany1", "queue:pollany2"));
Assert.assertEquals(2, l);
Assert.assertTrue(System.currentTimeMillis() - s > 2000);
}
@Test
public void testTake() throws InterruptedException {
RBlockingQueueReactive<Integer> queue1 = redisson.getBlockingQueue("queue:take");
Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
@Override
public void run() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("queue:take");
sync(queue.put(3));
}
}, 10, TimeUnit.SECONDS);
long s = System.currentTimeMillis();
int l = sync(queue1.take());
Assert.assertEquals(3, l);
Assert.assertTrue(System.currentTimeMillis() - s > 9000);
}
@Test
public void testPoll() throws InterruptedException {
RBlockingQueueReactive<Integer> queue1 = redisson.getBlockingQueue("queue1");
sync(queue1.put(1));
Assert.assertEquals((Integer)1, sync(queue1.poll(2, TimeUnit.SECONDS)));
long s = System.currentTimeMillis();
Assert.assertNull(sync(queue1.poll(5, TimeUnit.SECONDS)));
Assert.assertTrue(System.currentTimeMillis() - s > 5000);
}
@Test
public void testAwait() throws InterruptedException {
RBlockingQueueReactive<Integer> queue1 = redisson.getBlockingQueue("queue1");
sync(queue1.put(1));
Assert.assertEquals((Integer)1, sync(queue1.poll(10, TimeUnit.SECONDS)));
}
@Test
public void testPollLastAndOfferFirstTo() throws InterruptedException {
RBlockingQueueReactive<Integer> queue1 = redisson.getBlockingQueue("queue1");
sync(queue1.put(1));
sync(queue1.put(2));
sync(queue1.put(3));
RBlockingQueueReactive<Integer> queue2 = redisson.getBlockingQueue("queue2");
sync(queue2.put(4));
sync(queue2.put(5));
sync(queue2.put(6));
sync(queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS));
MatcherAssert.assertThat(sync(queue2), Matchers.contains(3, 4, 5, 6));
}
@Test
public void testAddOffer() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("blocking:queue");
sync(queue.add(1));
sync(queue.offer(2));
sync(queue.add(3));
sync(queue.offer(4));
//MatcherAssert.assertThat(queue, Matchers.contains(1, 2, 3, 4));
Assert.assertEquals((Integer) 1, sync(queue.poll()));
MatcherAssert.assertThat(sync(queue), Matchers.contains(2, 3, 4));
Assert.assertEquals((Integer) 2, sync(queue.peek()));
}
@Test
public void testRemove() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("blocking:queue");
sync(queue.add(1));
sync(queue.add(2));
sync(queue.add(3));
sync(queue.add(4));
sync(queue.poll());
sync(queue.poll());
MatcherAssert.assertThat(sync(queue), Matchers.contains(3, 4));
sync(queue.poll());
sync(queue.poll());
Assert.assertEquals(0, sync(queue.size()).intValue());
}
@Test
public void testRemoveEmpty() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("blocking:queue");
Assert.assertNull(sync(queue.poll()));
}
@Test
public void testDrainTo() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("queue");
for (int i = 0 ; i < 100; i++) {
sync(queue.offer(i));
}
Assert.assertEquals(100, sync(queue.size()).intValue());
Set<Integer> batch = new HashSet<Integer>();
int count = sync(queue.drainTo(batch, 10));
Assert.assertEquals(10, count);
Assert.assertEquals(10, batch.size());
Assert.assertEquals(90, sync(queue.size()).intValue());
sync(queue.drainTo(batch, 10));
sync(queue.drainTo(batch, 20));
sync(queue.drainTo(batch, 60));
Assert.assertEquals(0, sync(queue.size()).intValue());
}
@Test
public void testBlockingQueue() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("test_:blocking:queue:");
ExecutorService executor = Executors.newFixedThreadPool(10);
final AtomicInteger counter = new AtomicInteger();
int total = 100;
for (int i = 0; i < total; i++) {
// runnable won't be executed in any particular order, and hence, int value as well.
executor.submit(new Runnable() {
@Override
public void run() {
redisson.getQueue("test_:blocking:queue:").add(counter.incrementAndGet());
}
});
}
int count = 0;
while (count < total) {
int item = sync(queue.take());
assertTrue(item > 0 && item <= total);
count++;
}
assertThat(counter.get(), equalTo(total));
}
@Test
public void testDrainToCollection() throws Exception {
RBlockingQueueReactive<Object> queue1 = redisson.getBlockingQueue("queue1");
sync(queue1.put(1));
sync(queue1.put(2L));
sync(queue1.put("e"));
ArrayList<Object> dst = new ArrayList<Object>();
sync(queue1.drainTo(dst));
MatcherAssert.assertThat(dst, Matchers.<Object>contains(1, 2L, "e"));
Assert.assertEquals(0, sync(queue1.size()).intValue());
}
@Test
public void testDrainToCollectionLimited() throws Exception {
RBlockingQueueReactive<Object> queue1 = redisson.getBlockingQueue("queue1");
sync(queue1.put(1));
sync(queue1.put(2L));
sync(queue1.put("e"));
ArrayList<Object> dst = new ArrayList<Object>();
sync(queue1.drainTo(dst, 2));
MatcherAssert.assertThat(dst, Matchers.<Object>contains(1, 2L));
Assert.assertEquals(1, sync(queue1.size()).intValue());
dst.clear();
sync(queue1.drainTo(dst, 2));
MatcherAssert.assertThat(dst, Matchers.<Object>contains("e"));
}
}
Loading…
Cancel
Save