diff --git a/src/main/java/org/redisson/Redisson.java b/src/main/java/org/redisson/Redisson.java index 9d80b6206..811fcd471 100755 --- a/src/main/java/org/redisson/Redisson.java +++ b/src/main/java/org/redisson/Redisson.java @@ -203,6 +203,17 @@ public class Redisson implements RedissonClient { return new RedissonQueue(connectionManager, name); } + /** + * Returns distributed blocking queue instance by name. + * + * @param name of the distributed blocking queue + * @return distributed queue + */ + @Override + public RBlockingQueue getBlockingQueue(String name) { + return new RedissonBlockingQueue(connectionManager, name); + } + /** * Returns distributed deque instance by name. * diff --git a/src/main/java/org/redisson/RedissonBlockingQueue.java b/src/main/java/org/redisson/RedissonBlockingQueue.java new file mode 100644 index 000000000..79faea57e --- /dev/null +++ b/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -0,0 +1,87 @@ +package org.redisson; + +import java.util.*; +import java.util.concurrent.*; + +import org.redisson.async.*; +import org.redisson.connection.*; +import org.redisson.core.*; + +import com.lambdaworks.redis.*; + +/** + * Offers blocking queue facilities through an intermediary + * {@link LinkedBlockingQueue} where items are added as soon as + * blpop returns. All {@link BlockingQueue} methods are actually + * delegated to this intermediary queue. + * + * @author pdeschen@gmail.com + */ +public class RedissonBlockingQueue extends RedissonQueue implements RBlockingQueue { + + private final static int BLPOP_TIMEOUT_IN_MS = 1000; + + private final LinkedBlockingQueue blockingQueue = new LinkedBlockingQueue(); + + public RedissonBlockingQueue(final ConnectionManager connection, final String name) { + super(connection, name); + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + final java.util.concurrent.Future future = executor.submit(new Runnable() { + @Override + public void run() { + while (true) { + V item = connection.write(name, new SyncOperation() { + @Override + public V execute(RedisConnection conn) { + // Get this timeout from config? + return conn.blpop(BLPOP_TIMEOUT_IN_MS, name).value; + } + }); + blockingQueue.add(item); + } + } + }); + Runtime.getRuntime().addShutdownHook(new Thread("redis-blpop-shutdown-hook-thread") { + @Override + public void run() { + future.cancel(true); + executor.shutdown(); + } + }); + } + + @Override + public void put(V e) throws InterruptedException { + offer(e); + } + + @Override + public boolean offer(V e, long timeout, TimeUnit unit) throws InterruptedException { + return offer(e); + } + + @Override + public V take() throws InterruptedException { + return blockingQueue.take(); + } + + @Override + public V poll(final long timeout, final TimeUnit unit) throws InterruptedException { + return blockingQueue.poll(timeout, unit); + } + + @Override + public int remainingCapacity() { + return blockingQueue.remainingCapacity(); + } + + @Override + public int drainTo(Collection c) { + return blockingQueue.drainTo(c); + } + + @Override + public int drainTo(Collection c, int maxElements) { + return blockingQueue.drainTo(c, maxElements); + } +} \ No newline at end of file diff --git a/src/main/java/org/redisson/RedissonClient.java b/src/main/java/org/redisson/RedissonClient.java index e3dfc33ed..8e96f1e37 100755 --- a/src/main/java/org/redisson/RedissonClient.java +++ b/src/main/java/org/redisson/RedissonClient.java @@ -98,6 +98,14 @@ public interface RedissonClient { */ RQueue getQueue(String name); + /** + * Returns blocking queue instance by name. + * + * @param name of queue + * @return + */ + RBlockingQueue getBlockingQueue(String name); + /** * Returns deque instance by name. * diff --git a/src/main/java/org/redisson/core/RBlockingQueue.java b/src/main/java/org/redisson/core/RBlockingQueue.java new file mode 100644 index 000000000..b7ae5f32e --- /dev/null +++ b/src/main/java/org/redisson/core/RBlockingQueue.java @@ -0,0 +1,13 @@ +package org.redisson.core; + +import java.util.concurrent.*; + +/** + * {@link BlockingQueue} backed by Redis + * + * @author Nikita Koksharov + * @param the type of elements held in this collection + */ +public interface RBlockingQueue extends BlockingQueue, RExpirable { + +} diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java new file mode 100644 index 000000000..ea9ad77cd --- /dev/null +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -0,0 +1,124 @@ +package org.redisson; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.hamcrest.*; +import org.junit.*; +import org.redisson.core.*; + +public class RedissonBlockingQueueTest extends BaseTest { + + @Test + public void testAddOfferOrigin() { + Queue queue = new LinkedList(); + queue.add(1); + queue.offer(2); + queue.add(3); + queue.offer(4); + + MatcherAssert.assertThat(queue, Matchers.contains(1, 2, 3, 4)); + Assert.assertEquals((Integer) 1, queue.poll()); + MatcherAssert.assertThat(queue, Matchers.contains(2, 3, 4)); + Assert.assertEquals((Integer) 2, queue.element()); + } + + @Test + public void testAddOffer() { + RBlockingQueue queue = redisson.getBlockingQueue("blocking:queue"); + queue.add(1); + queue.offer(2); + queue.add(3); + queue.offer(4); + + //MatcherAssert.assertThat(queue, Matchers.contains(1, 2, 3, 4)); + Assert.assertEquals((Integer) 1, queue.poll()); + MatcherAssert.assertThat(queue, Matchers.contains(2, 3, 4)); + Assert.assertEquals((Integer) 2, queue.element()); + } + + @Test + public void testRemoveOrigin() { + Queue queue = new LinkedList(); + queue.add(1); + queue.add(2); + queue.add(3); + queue.add(4); + + queue.remove(); + queue.remove(); + + MatcherAssert.assertThat(queue, Matchers.contains(3, 4)); + queue.remove(); + queue.remove(); + + Assert.assertTrue(queue.isEmpty()); + } + + @Test + public void testRemove() { + RBlockingQueue queue = redisson.getBlockingQueue("blocking:queue"); + queue.add(1); + queue.add(2); + queue.add(3); + queue.add(4); + + queue.remove(); + queue.remove(); + + MatcherAssert.assertThat(queue, Matchers.contains(3, 4)); + queue.remove(); + queue.remove(); + + Assert.assertTrue(queue.isEmpty()); + } + + @Test(expected = NoSuchElementException.class) + public void testRemoveEmpty() { + RBlockingQueue queue = redisson.getBlockingQueue("blocking:queue"); + queue.remove(); + } + + @Test + public void testBlockingQueue() { + + RBlockingQueue queue = redisson.getBlockingQueue("test_:blocking:queue:"); + + ExecutorService executor = Executors.newFixedThreadPool(10); + + final AtomicInteger counter = new AtomicInteger(); + int total = 100; + for (int i = 0; i < total; i++) { + // runnable won't be executed in any particular order, and hence, int value as well. + executor.submit(new Runnable() { + @Override + public void run() { + redisson.getQueue("test_:blocking:queue:").add(counter.incrementAndGet()); + } + }); + } + int count = 0; + while (count < total) { + try { + // blocking + int item = queue.take(); + assertTrue(item > 0 && item <= total); + } catch (InterruptedException exception) { + fail(); + } + count++; + } + + assertThat(counter.get(), equalTo(total)); + queue.delete(); + + redisson.shutdown(); + + } +}