From 05920f51c649d65b223c867a7fa414edecb53395 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 19 Sep 2019 11:14:22 +0300 Subject: [PATCH] Fixed - queue blocking methods don't re-throw InterruptedException #2327 --- .../org/redisson/RedissonBlockingDeque.java | 14 +++--- .../org/redisson/RedissonBlockingQueue.java | 10 ++-- .../RedissonBoundedBlockingQueue.java | 10 ++-- .../RedissonPriorityBlockingDeque.java | 14 +++--- .../RedissonPriorityBlockingQueue.java | 8 ++-- .../command/CommandAsyncExecutor.java | 2 + .../redisson/command/CommandAsyncService.java | 35 ++++++-------- .../redisson/RedissonBlockingQueueTest.java | 48 ++++++++++++++++++- 8 files changed, 91 insertions(+), 50 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java index 34a0f2b6e..6534070f0 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingDeque.java @@ -124,7 +124,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { - return get(takeLastAndOfferFirstToAsync(queueName)); + return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } @Override @@ -191,7 +191,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V takeFirst() throws InterruptedException { - return get(takeFirstAsync()); + return commandExecutor.getInterrupted(takeFirstAsync()); } @Override @@ -206,7 +206,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V takeLast() throws InterruptedException { - return get(takeLastAsync()); + return commandExecutor.getInterrupted(takeLastAsync()); } @Override @@ -216,7 +216,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { - return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); + return commandExecutor.getInterrupted(pollFirstFromAnyAsync(timeout, unit, queueNames)); } @Override @@ -226,7 +226,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { - return get(pollLastFromAnyAsync(timeout, unit, queueNames)); + return commandExecutor.getInterrupted(pollLastFromAnyAsync(timeout, unit, queueNames)); } @Override @@ -236,7 +236,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException { - return get(pollFirstAsync(timeout, unit)); + return commandExecutor.getInterrupted(pollFirstAsync(timeout, unit)); } @Override @@ -246,7 +246,7 @@ public class RedissonBlockingDeque extends RedissonDeque implements RBlock @Override public V pollLast(long timeout, TimeUnit unit) throws InterruptedException { - return get(pollLastAsync(timeout, unit)); + return commandExecutor.getInterrupted(pollLastAsync(timeout, unit)); } } \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java index 4f25ba2f6..fb541c7f1 100644 --- a/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBlockingQueue.java @@ -77,7 +77,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock */ @Override public V take() throws InterruptedException { - return get(takeAsync()); + return commandExecutor.getInterrupted(takeAsync()); } @Override @@ -91,7 +91,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock */ @Override public V poll(long timeout, TimeUnit unit) throws InterruptedException { - return get(pollAsync(timeout, unit)); + return commandExecutor.getInterrupted(pollAsync(timeout, unit)); } /* @@ -100,7 +100,7 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock */ @Override public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { - return get(pollFromAnyAsync(timeout, unit, queueNames)); + return commandExecutor.getInterrupted(pollFromAnyAsync(timeout, unit, queueNames)); } /* @@ -119,12 +119,12 @@ public class RedissonBlockingQueue extends RedissonQueue implements RBlock @Override public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { - return get(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); + return commandExecutor.getInterrupted(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); } @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { - return get(takeLastAndOfferFirstToAsync(queueName)); + return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java index 9f744e5da..72a0c7ba8 100644 --- a/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonBoundedBlockingQueue.java @@ -192,7 +192,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements */ @Override public V take() throws InterruptedException { - return get(takeAsync()); + return commandExecutor.getInterrupted(takeAsync()); } @Override @@ -207,7 +207,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements */ @Override public V poll(long timeout, TimeUnit unit) throws InterruptedException { - return get(pollAsync(timeout, unit)); + return commandExecutor.getInterrupted(pollAsync(timeout, unit)); } /* @@ -216,7 +216,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements */ @Override public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { - return get(pollFromAnyAsync(timeout, unit, queueNames)); + return commandExecutor.getInterrupted(pollFromAnyAsync(timeout, unit, queueNames)); } /* @@ -231,7 +231,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { - return get(takeLastAndOfferFirstToAsync(queueName)); + return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } @Override @@ -247,7 +247,7 @@ public class RedissonBoundedBlockingQueue extends RedissonQueue implements @Override public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { - return get(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); + return commandExecutor.getInterrupted(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); } @Override diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java index f2b065af4..00a505cbe 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingDeque.java @@ -96,7 +96,7 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { - return get(takeLastAndOfferFirstToAsync(queueName)); + return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } public RFuture takeLastAndOfferFirstToAsync(String queueName) { @@ -175,7 +175,7 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public V takeFirst() throws InterruptedException { - return get(takeFirstAsync()); + return commandExecutor.getInterrupted(takeFirstAsync()); } @Override @@ -192,7 +192,7 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public V takeLast() throws InterruptedException { - return get(takeLastAsync()); + return commandExecutor.getInterrupted(takeLastAsync()); } @Override @@ -202,7 +202,7 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { - return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); + return commandExecutor.getInterrupted(pollFirstFromAnyAsync(timeout, unit, queueNames)); } @Override @@ -212,7 +212,7 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { - return get(pollLastFromAnyAsync(timeout, unit, queueNames)); + return commandExecutor.getInterrupted(pollLastFromAnyAsync(timeout, unit, queueNames)); } @Override @@ -222,7 +222,7 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException { - return get(pollFirstAsync(timeout, unit)); + return commandExecutor.getInterrupted(pollFirstAsync(timeout, unit)); } @Override @@ -234,7 +234,7 @@ public class RedissonPriorityBlockingDeque extends RedissonPriorityDeque i @Override public V pollLast(long timeout, TimeUnit unit) throws InterruptedException { - return get(pollLastAsync(timeout, unit)); + return commandExecutor.getInterrupted(pollLastAsync(timeout, unit)); } } \ No newline at end of file diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index dc66bd051..75093784b 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -107,7 +107,7 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i @Override public V take() throws InterruptedException { - return get(takeAsync()); + return commandExecutor.getInterrupted(takeAsync()); } public RFuture pollAsync(long timeout, TimeUnit unit) { @@ -118,7 +118,7 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i @Override public V poll(long timeout, TimeUnit unit) throws InterruptedException { - return get(pollAsync(timeout, unit)); + return commandExecutor.getInterrupted(pollAsync(timeout, unit)); } @Override @@ -135,12 +135,12 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i @Override public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { - return get(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); + return commandExecutor.getInterrupted(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); } @Override public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { - return get(takeLastAndOfferFirstToAsync(queueName)); + return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName)); } public RFuture takeLastAndOfferFirstToAsync(String queueName) { diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java index d8ff5bb05..9beb58b8e 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncExecutor.java @@ -58,6 +58,8 @@ public interface CommandAsyncExecutor { void syncSubscription(RFuture future); V get(RFuture future); + + V getInterrupted(RFuture future) throws InterruptedException; RFuture writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand command, Object... params); diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 65dc9f0fb..daeb319ef 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -130,29 +130,22 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public V get(RFuture future) { - if (!future.isDone()) { - CountDownLatch l = new CountDownLatch(1); - future.onComplete((res, e) -> { - l.countDown(); - }); - - boolean interrupted = false; - while (!future.isDone()) { - try { - l.await(); - } catch (InterruptedException e) { - interrupted = true; - break; - } - } - - if (interrupted) { - Thread.currentThread().interrupt(); - } + try { + future.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (future.isSuccess()) { + return future.getNow(); } - // commented out due to blocking issues up to 200 ms per minute for each thread - // future.awaitUninterruptibly(); + throw convertException(future); + } + + @Override + public V getInterrupted(RFuture future) throws InterruptedException { + future.await(); + if (future.isSuccess()) { return future.getNow(); } diff --git a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java index b3dd4c34f..d6dd87032 100644 --- a/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java +++ b/redisson/src/test/java/org/redisson/RedissonBlockingQueueTest.java @@ -15,6 +15,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; +import org.awaitility.Duration; import org.junit.Assert; import org.junit.Test; import org.redisson.ClusterRunner.ClusterProcesses; @@ -121,7 +123,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { t.join(); - await().atMost(7, TimeUnit.SECONDS).until(() -> executed.get()); + await().atMost(7, TimeUnit.SECONDS).untilTrue(executed); redisson.shutdown(); runner.stop(); @@ -279,6 +281,50 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest { redisson.shutdown(); } + @Test + public void testTakeInterrupted() throws InterruptedException { + final AtomicBoolean interrupted = new AtomicBoolean(); + + Thread t = new Thread() { + public void run() { + try { + RBlockingQueue queue1 = getQueue(redisson); + queue1.take(); + } catch (InterruptedException e) { + interrupted.set(true); + } + }; + }; + + t.start(); + t.join(1000); + + t.interrupt(); + Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(interrupted); + } + + @Test + public void testPollInterrupted() throws InterruptedException { + final AtomicBoolean interrupted = new AtomicBoolean(); + + Thread t = new Thread() { + public void run() { + try { + RBlockingQueue queue1 = getQueue(redisson); + queue1.poll(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + interrupted.set(true); + } + }; + }; + + t.start(); + t.join(1000); + + t.interrupt(); + Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(interrupted); + } + @Test public void testTakeAsyncCancel() { Config config = createConfig();