diff --git a/src/main/java/org/redisson/RedissonBlockingQueue.java b/src/main/java/org/redisson/RedissonBlockingQueue.java index 9e161313b..a524a8827 100644 --- a/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -55,6 +55,10 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock return offerAsync(e); } + /* + * (non-Javadoc) + * @see java.util.concurrent.BlockingQueue#put(java.lang.Object) + */ @Override public void put(V e) throws InterruptedException { offer(e); @@ -70,6 +74,10 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0); } + /* + * (non-Javadoc) + * @see java.util.concurrent.BlockingQueue#take() + */ @Override public V take() throws InterruptedException { Future res = takeAsync(); @@ -81,19 +89,34 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout)); } + /* + * (non-Javadoc) + * @see java.util.concurrent.BlockingQueue#poll(long, java.util.concurrent.TimeUnit) + */ @Override public V poll(long timeout, TimeUnit unit) throws InterruptedException { Future res = pollAsync(timeout, unit); return res.await().getNow(); } + /* + * (non-Javadoc) + * @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[]) + */ + @Override public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException { Future res = pollFromAnyAsync(timeout, unit, queueNames); return res.await().getNow(); } + /* + * (non-Javadoc) + * @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[]) + */ + @Override public Future pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) { - List params = new ArrayList(queueNames.length); + List params = new ArrayList(queueNames.length + 1); + params.add(getName()); for (Object name : queueNames) { params.add(name); } diff --git a/src/main/java/org/redisson/core/RBlockingQueue.java b/src/main/java/org/redisson/core/RBlockingQueue.java index f42269a3c..f7cf67862 100644 --- a/src/main/java/org/redisson/core/RBlockingQueue.java +++ b/src/main/java/org/redisson/core/RBlockingQueue.java @@ -26,6 +26,19 @@ import java.util.concurrent.TimeUnit; */ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockingQueueAsync { + /** + * Retrieves and removes the head of this queue in async mode, waiting up to the + * specified wait time if necessary for an element to become available + * in any of defined queues including queue own. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return the head of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException; V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException; diff --git a/src/main/java/org/redisson/core/RBlockingQueueAsync.java b/src/main/java/org/redisson/core/RBlockingQueueAsync.java index 3ba491274..279604a38 100644 --- a/src/main/java/org/redisson/core/RBlockingQueueAsync.java +++ b/src/main/java/org/redisson/core/RBlockingQueueAsync.java @@ -28,6 +28,19 @@ import io.netty.util.concurrent.Future; */ public interface RBlockingQueueAsync extends RQueueAsync, RExpirableAsync { + /** + * Retrieves and removes the head of this queue in async mode, waiting up to the + * specified wait time if necessary for an element to become available + * in any of defined queues including queue own. + * + * @param timeout how long to wait before giving up, in units of + * {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return Future object with the head of this queue, or {@code null} if the + * specified waiting time elapses before an element is available + * @throws InterruptedException if interrupted while waiting + */ Future pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames); Future drainToAsync(Collection c, int maxElements); diff --git a/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 598159d49..0b956d36f 100644 --- a/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -5,15 +5,22 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import org.hamcrest.*; -import org.junit.*; -import org.redisson.core.*; - -import io.netty.util.concurrent.Future; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; +import org.redisson.core.RBlockingQueue; public class RedissonBlockingQueueTest extends BaseTest { @@ -26,12 +33,11 @@ public class RedissonBlockingQueueTest extends BaseTest { RBlockingQueue queue2 = redisson.getBlockingQueue("queue:pollany1"); RBlockingQueue queue3 = redisson.getBlockingQueue("queue:pollany2"); try { - queue1.put(1); queue3.put(2); + queue1.put(1); queue2.put(3); } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + Assert.fail(); } } }, 3, TimeUnit.SECONDS);