diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index 5699208b0..b82bb0761 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -123,6 +123,17 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { return blockingQueue.pollLastAndOfferFirstTo(queueName, timeout, unit); } + + @Override + public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { + RFuture res = takeLastAndOfferFirstToAsync(queueName); + return res.await().getNow(); + } + + @Override + public RFuture takeLastAndOfferFirstToAsync(String queueName) { + return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); + } @Override public int remainingCapacity() { diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index 46d8b6e0f..4f518e681 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -133,6 +133,17 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock RFuture res = pollLastAndOfferFirstToAsync(queueName, timeout, unit); return res.await().getNow(); } + + @Override + public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { + RFuture res = takeLastAndOfferFirstToAsync(queueName); + return res.await().getNow(); + } + + @Override + public RFuture takeLastAndOfferFirstToAsync(String queueName) { + return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); + } @Override public int remainingCapacity() { diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index f349774aa..ac309c5a9 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -33,6 +33,7 @@ import org.redisson.command.CommandExecutor; import org.redisson.connection.decoder.ListDrainToDecoder; import org.redisson.misc.PromiseDelegator; import org.redisson.misc.RPromise; +import org.redisson.misc.RedissonPromise; import org.redisson.pubsub.SemaphorePubSub; import io.netty.util.concurrent.Future; @@ -136,7 +137,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements } private RPromise wrapTakeFuture(final RFuture takeFuture) { - final RPromise result = new PromiseDelegator(commandExecutor.getConnectionManager().newPromise()) { + final RPromise result = new RedissonPromise() { @Override public boolean cancel(boolean mayInterruptIfRunning) { super.cancel(mayInterruptIfRunning); @@ -253,6 +254,17 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements return wrapTakeFuture(takeFuture); } + @Override + public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { + RFuture res = takeLastAndOfferFirstToAsync(queueName); + return res.await().getNow(); + } + + @Override + public RFuture takeLastAndOfferFirstToAsync(String queueName) { + return pollLastAndOfferFirstToAsync(queueName, 0, TimeUnit.SECONDS); + } + @Override public RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit) { RFuture takeFuture = commandExecutor.writeAsync(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout)); diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java index b39c79c5d..d259c316a 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueue.java @@ -43,5 +43,7 @@ public interface RBlockingQueue extends BlockingQueue, RQueue, RBlockin V pollFromAny(long timeout, TimeUnit unit, String ... queueNames) throws InterruptedException; V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException; + + V takeLastAndOfferFirstTo(String queueName) throws InterruptedException; } diff --git a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java index d52047bce..3e09054e6 100644 --- a/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java +++ b/redisson/src/main/java/org/redisson/api/RBlockingQueueAsync.java @@ -93,6 +93,8 @@ public interface RBlockingQueueAsync extends RQueueAsync { RFuture drainToAsync(Collection c); RFuture pollLastAndOfferFirstToAsync(String queueName, long timeout, TimeUnit unit); + + RFuture takeLastAndOfferFirstToAsync(String queueName); /** * Retrieves and removes the head of this queue in async mode, waiting up to the diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index 54e4192ed..4dab4f004 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -296,16 +296,42 @@ public class RedissonBlockingQueueTest extends BaseTest { // TODO Auto-generated catch block e.printStackTrace(); } - }, 10, TimeUnit.SECONDS); + }, 5, TimeUnit.SECONDS); RBlockingQueue queue2 = redisson.getBlockingQueue("{queue}2"); queue2.put(4); queue2.put(5); queue2.put(6); - queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS); + Integer value = queue1.pollLastAndOfferFirstTo(queue2.getName(), 5, TimeUnit.SECONDS); + assertThat(value).isEqualTo(3); assertThat(queue2).containsExactly(3, 4, 5, 6); } + + @Test + public void testTakeLastAndOfferFirstTo() throws InterruptedException { + final RBlockingQueue queue1 = redisson.getBlockingQueue("{queue}1"); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + try { + queue1.put(3); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }, 3, TimeUnit.SECONDS); + + RBlockingQueue queue2 = redisson.getBlockingQueue("{queue}2"); + queue2.put(4); + queue2.put(5); + queue2.put(6); + + long startTime = System.currentTimeMillis(); + Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName()); + assertThat(System.currentTimeMillis() - startTime).isBetween(2900L, 3200L); + assertThat(value).isEqualTo(3); + assertThat(queue2).containsExactly(3, 4, 5, 6); + } + @Test public void testAddOfferOrigin() { diff --git a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java index 2e9cab970..3d533f5d4 100644 --- a/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBoundedBlockingQueueTest.java @@ -509,10 +509,38 @@ public class RedissonBoundedBlockingQueueTest extends BaseTest { queue2.put(5); queue2.put(6); - queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS); + Integer value = queue1.pollLastAndOfferFirstTo(queue2.getName(), 10, TimeUnit.SECONDS); + assertThat(value).isEqualTo(3); assertThat(queue2).containsExactly(3, 4, 5, 6); } + @Test + public void testTakeLastAndOfferFirstTo() throws InterruptedException { + final RBoundedBlockingQueue queue1 = redisson.getBoundedBlockingQueue("{queue}1"); + queue1.trySetCapacity(10); + Executors.newSingleThreadScheduledExecutor().schedule(() -> { + try { + queue1.put(3); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }, 3, TimeUnit.SECONDS); + + RBoundedBlockingQueue queue2 = redisson.getBoundedBlockingQueue("{queue}2"); + queue2.trySetCapacity(10); + queue2.put(4); + queue2.put(5); + queue2.put(6); + + long startTime = System.currentTimeMillis(); + Integer value = queue1.takeLastAndOfferFirstTo(queue2.getName()); + assertThat(System.currentTimeMillis() - startTime).isBetween(3000L, 3200L); + assertThat(value).isEqualTo(3); + assertThat(queue2).containsExactly(3, 4, 5, 6); + } + + @Test public void testOffer() { RBoundedBlockingQueue queue = redisson.getBoundedBlockingQueue("blocking:queue");