RBlockingQueue.pollFromAny should include queue own name. #292

pull/297/head
Nikita 9 years ago
parent ae33c55a71
commit 98eb437750

@ -55,6 +55,10 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return offerAsync(e);
}
/*
* (non-Javadoc)
* @see java.util.concurrent.BlockingQueue#put(java.lang.Object)
*/
@Override
public void put(V e) throws InterruptedException {
offer(e);
@ -70,6 +74,10 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}
/*
* (non-Javadoc)
* @see java.util.concurrent.BlockingQueue#take()
*/
@Override
public V take() throws InterruptedException {
Future<V> res = takeAsync();
@ -81,19 +89,34 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
}
/*
* (non-Javadoc)
* @see java.util.concurrent.BlockingQueue#poll(long, java.util.concurrent.TimeUnit)
*/
@Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException {
Future<V> res = pollAsync(timeout, unit);
return res.await().getNow();
}
/*
* (non-Javadoc)
* @see org.redisson.core.RBlockingQueue#pollFromAny(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/
@Override
public V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException {
Future<V> res = pollFromAnyAsync(timeout, unit, queueNames);
return res.await().getNow();
}
/*
* (non-Javadoc)
* @see org.redisson.core.RBlockingQueueAsync#pollFromAnyAsync(long, java.util.concurrent.TimeUnit, java.lang.String[])
*/
@Override
public Future<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames) {
List<Object> params = new ArrayList<Object>(queueNames.length);
List<Object> params = new ArrayList<Object>(queueNames.length + 1);
params.add(getName());
for (Object name : queueNames) {
params.add(name);
}

@ -26,6 +26,19 @@ import java.util.concurrent.TimeUnit;
*/
public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockingQueueAsync<V> {
/**
* Retrieves and removes the head of this queue in async mode, waiting up to the
* specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException;
V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException;

@ -28,6 +28,19 @@ import io.netty.util.concurrent.Future;
*/
public interface RBlockingQueueAsync<V> extends RQueueAsync<V>, RExpirableAsync {
/**
* Retrieves and removes the head of this queue in async mode, waiting up to the
* specified wait time if necessary for an element to become available
* in any of defined queues <b>including</b> queue own.
*
* @param timeout how long to wait before giving up, in units of
* {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the
* {@code timeout} parameter
* @return Future object with the head of this queue, or {@code null} if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
Future<V> pollFromAnyAsync(long timeout, TimeUnit unit, String ... queueNames);
Future<Integer> drainToAsync(Collection<? super V> c, int maxElements);

@ -5,15 +5,22 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.hamcrest.*;
import org.junit.*;
import org.redisson.core.*;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.core.RBlockingQueue;
public class RedissonBlockingQueueTest extends BaseTest {
@ -26,12 +33,11 @@ public class RedissonBlockingQueueTest extends BaseTest {
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("queue:pollany1");
RBlockingQueue<Integer> queue3 = redisson.getBlockingQueue("queue:pollany2");
try {
queue1.put(1);
queue3.put(2);
queue1.put(1);
queue2.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
Assert.fail();
}
}
}, 3, TimeUnit.SECONDS);

Loading…
Cancel
Save