Merge pull request #133 from pdeschen/feature-blocking-queue

Initial attempt at blocking queue (WIP)
pull/139/head
Nikita Koksharov 10 years ago
commit ee4044afaf

@ -203,6 +203,17 @@ public class Redisson implements RedissonClient {
return new RedissonQueue<V>(connectionManager, name); return new RedissonQueue<V>(connectionManager, name);
} }
/**
* Returns distributed blocking queue instance by name.
*
* @param name of the distributed blocking queue
* @return distributed queue
*/
@Override
public <V> RBlockingQueue<V> getBlockingQueue(String name) {
return new RedissonBlockingQueue(connectionManager, name);
}
/** /**
* Returns distributed deque instance by name. * Returns distributed deque instance by name.
* *

@ -0,0 +1,87 @@
package org.redisson;
import java.util.*;
import java.util.concurrent.*;
import org.redisson.async.*;
import org.redisson.connection.*;
import org.redisson.core.*;
import com.lambdaworks.redis.*;
/**
* Offers blocking queue facilities through an intermediary
* {@link LinkedBlockingQueue} where items are added as soon as
* <code>blpop</code> returns. All {@link BlockingQueue} methods are actually
* delegated to this intermediary queue.
*
* @author pdeschen@gmail.com
*/
public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {
private final static int BLPOP_TIMEOUT_IN_MS = 1000;
private final LinkedBlockingQueue<V> blockingQueue = new LinkedBlockingQueue();
public RedissonBlockingQueue(final ConnectionManager connection, final String name) {
super(connection, name);
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
final java.util.concurrent.Future<?> future = executor.submit(new Runnable() {
@Override
public void run() {
while (true) {
V item = connection.write(name, new SyncOperation<V, V>() {
@Override
public V execute(RedisConnection<Object, V> conn) {
// Get this timeout from config?
return conn.blpop(BLPOP_TIMEOUT_IN_MS, name).value;
}
});
blockingQueue.add(item);
}
}
});
Runtime.getRuntime().addShutdownHook(new Thread("redis-blpop-shutdown-hook-thread") {
@Override
public void run() {
future.cancel(true);
executor.shutdown();
}
});
}
@Override
public void put(V e) throws InterruptedException {
offer(e);
}
@Override
public boolean offer(V e, long timeout, TimeUnit unit) throws InterruptedException {
return offer(e);
}
@Override
public V take() throws InterruptedException {
return blockingQueue.take();
}
@Override
public V poll(final long timeout, final TimeUnit unit) throws InterruptedException {
return blockingQueue.poll(timeout, unit);
}
@Override
public int remainingCapacity() {
return blockingQueue.remainingCapacity();
}
@Override
public int drainTo(Collection<? super V> c) {
return blockingQueue.drainTo(c);
}
@Override
public int drainTo(Collection<? super V> c, int maxElements) {
return blockingQueue.drainTo(c, maxElements);
}
}

@ -98,6 +98,14 @@ public interface RedissonClient {
*/ */
<V> RQueue<V> getQueue(String name); <V> RQueue<V> getQueue(String name);
/**
* Returns blocking queue instance by name.
*
* @param name of queue
* @return
*/
<V> RBlockingQueue<V> getBlockingQueue(String name);
/** /**
* Returns deque instance by name. * Returns deque instance by name.
* *

@ -0,0 +1,13 @@
package org.redisson.core;
import java.util.concurrent.*;
/**
* {@link BlockingQueue} backed by Redis
*
* @author Nikita Koksharov
* @param <V> the type of elements held in this collection
*/
public interface RBlockingQueue<V> extends BlockingQueue<V>, RExpirable {
}

@ -0,0 +1,124 @@
package org.redisson;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.hamcrest.*;
import org.junit.*;
import org.redisson.core.*;
public class RedissonBlockingQueueTest extends BaseTest {
@Test
public void testAddOfferOrigin() {
Queue<Integer> queue = new LinkedList<Integer>();
queue.add(1);
queue.offer(2);
queue.add(3);
queue.offer(4);
MatcherAssert.assertThat(queue, Matchers.contains(1, 2, 3, 4));
Assert.assertEquals((Integer) 1, queue.poll());
MatcherAssert.assertThat(queue, Matchers.contains(2, 3, 4));
Assert.assertEquals((Integer) 2, queue.element());
}
@Test
public void testAddOffer() {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("blocking:queue");
queue.add(1);
queue.offer(2);
queue.add(3);
queue.offer(4);
//MatcherAssert.assertThat(queue, Matchers.contains(1, 2, 3, 4));
Assert.assertEquals((Integer) 1, queue.poll());
MatcherAssert.assertThat(queue, Matchers.contains(2, 3, 4));
Assert.assertEquals((Integer) 2, queue.element());
}
@Test
public void testRemoveOrigin() {
Queue<Integer> queue = new LinkedList<Integer>();
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.remove();
queue.remove();
MatcherAssert.assertThat(queue, Matchers.contains(3, 4));
queue.remove();
queue.remove();
Assert.assertTrue(queue.isEmpty());
}
@Test
public void testRemove() {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("blocking:queue");
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.remove();
queue.remove();
MatcherAssert.assertThat(queue, Matchers.contains(3, 4));
queue.remove();
queue.remove();
Assert.assertTrue(queue.isEmpty());
}
@Test(expected = NoSuchElementException.class)
public void testRemoveEmpty() {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("blocking:queue");
queue.remove();
}
@Test
public void testBlockingQueue() {
RBlockingQueue<Integer> queue = redisson.getBlockingQueue("test_:blocking:queue:");
ExecutorService executor = Executors.newFixedThreadPool(10);
final AtomicInteger counter = new AtomicInteger();
int total = 100;
for (int i = 0; i < total; i++) {
// runnable won't be executed in any particular order, and hence, int value as well.
executor.submit(new Runnable() {
@Override
public void run() {
redisson.getQueue("test_:blocking:queue:").add(counter.incrementAndGet());
}
});
}
int count = 0;
while (count < total) {
try {
// blocking
int item = queue.take();
assertTrue(item > 0 && item <= total);
} catch (InterruptedException exception) {
fail();
}
count++;
}
assertThat(counter.get(), equalTo(total));
queue.delete();
redisson.shutdown();
}
}
Loading…
Cancel
Save