RBlockingQueue.takeLastAndOfferFirstTo method added. #881

pull/903/head
Nikita 8 years ago
parent 9a756fb96a
commit 946f712931

@ -123,6 +123,17 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit);
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}
@Override
public int remainingCapacity() {

@ -133,6 +133,17 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
RFuture<V> res = pollLastAndOfferFirstToAsync(queueName, timeout, unit);
return res.await().getNow();
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}
@Override
public int remainingCapacity() {

@ -33,6 +33,7 @@ import org.redisson.command.CommandExecutor;
import org.redisson.connection.decoder.ListDrainToDecoder;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.pubsub.SemaphorePubSub;
import io.netty.util.concurrent.Future;
@ -136,7 +137,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
}
private RPromise<V> wrapTakeFuture(final RFuture<V> takeFuture) {
final RPromise<V> result = new PromiseDelegator<V>(commandExecutor.getConnectionManager().<V>newPromise()) {
final RPromise<V> result = new RedissonPromise<V>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
@ -253,6 +254,17 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
return wrapTakeFuture(takeFuture);
}
@Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
RFuture<V> res = takeLastAndOfferFirstToAsync(queueName);
return res.await().getNow();
}
@Override
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS);
}
@Override
public RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) {
RFuture<V> takeFuture = commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));

@ -43,5 +43,7 @@ public interface RBlockingQueue<V> extends BlockingQueue<V>, RQueue<V>, RBlockin
V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException;
V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException;
V takeLastAndOfferFirstTo(String queueName) throws InterruptedException;
}

@ -93,6 +93,8 @@ public interface RBlockingQueueAsync<V> extends RQueueAsync<V> {
RFuture<Integer> drainToAsync(Collection<? super V> c);
RFuture<V> pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit);
RFuture<V> takeLastAndOfferFirstToAsync(String queueName);
/**
* Retrieves and removes the head of this queue in async mode, waiting up to the

@ -296,16 +296,42 @@ public class RedissonBlockingQueueTest extends BaseTest {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 10, TimeUnit.SECONDS);
}, 5, TimeUnit.SECONDS);
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("{queue}2");
queue2.put(4);
queue2.put(5);
queue2.put(6);
queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS);
Integer value = queue1.pollLastAndOfferFirstTo(queue2.getName(), 5, TimeUnit.SECONDS);
assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6);
}
@Test
public void testTakeLastAndOfferFirstTo() throws InterruptedException {
final RBlockingQueue<Integer> queue1 = redisson.getBlockingQueue("{queue}1");
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
try {
queue1.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 3, TimeUnit.SECONDS);
RBlockingQueue<Integer> queue2 = redisson.getBlockingQueue("{queue}2");
queue2.put(4);
queue2.put(5);
queue2.put(6);
long startTime = System.currentTimeMillis();
Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName());
assertThat(System.currentTimeMillis() - startTime).isBetween(2900L, 3200L);
assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6);
}
@Test
public void testAddOfferOrigin() {

@ -509,10 +509,38 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest {
queue2.put(5);
queue2.put(6);
queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS);
Integer value = queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS);
assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6);
}
@Test
public void testTakeLastAndOfferFirstTo() throws InterruptedException {
final RBoundedBlockingQueue<Integer> queue1 = redisson.getBoundedBlockingQueue("{queue}1");
queue1.trySetCapacity(10);
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
try {
queue1.put(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 3, TimeUnit.SECONDS);
RBoundedBlockingQueue<Integer> queue2 = redisson.getBoundedBlockingQueue("{queue}2");
queue2.trySetCapacity(10);
queue2.put(4);
queue2.put(5);
queue2.put(6);
long startTime = System.currentTimeMillis();
Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName());
assertThat(System.currentTimeMillis() - startTime).isBetween(3000L, 3200L);
assertThat(value).isEqualTo(3);
assertThat(queue2).containsExactly(3, 4, 5, 6);
}
@Test
public void testOffer() {
RBoundedBlockingQueue<Integer> queue = redisson.getBoundedBlockingQueue("blocking:queue");

Loading…
Cancel
Save