Fixed - queue blocking methods don't re-throw InterruptedException #2327

pull/2329/head
Nikita Koksharov 6 years ago
parent 941ce121dc
commit 05920f51c6

@ -124,7 +124,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override @Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return get(takeLastAndOfferFirstToAsync(queueName)); return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
} }
@Override @Override
@ -191,7 +191,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override @Override
public V takeFirst() throws InterruptedException { public V takeFirst() throws InterruptedException {
return get(takeFirstAsync()); return commandExecutor.getInterrupted(takeFirstAsync());
} }
@Override @Override
@ -206,7 +206,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override @Override
public V takeLast() throws InterruptedException { public V takeLast() throws InterruptedException {
return get(takeLastAsync()); return commandExecutor.getInterrupted(takeLastAsync());
} }
@Override @Override
@ -216,7 +216,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override @Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); return commandExecutor.getInterrupted(pollFirstFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
@ -226,7 +226,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override @Override
public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollLastFromAnyAsync(timeout, unit, queueNames)); return commandExecutor.getInterrupted(pollLastFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
@ -236,7 +236,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override @Override
public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException { public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollFirstAsync(timeout, unit)); return commandExecutor.getInterrupted(pollFirstAsync(timeout, unit));
} }
@Override @Override
@ -246,7 +246,7 @@ public class RedissonBlockingDeque<V> extends RedissonDeque<V> implements RBlock
@Override @Override
public V pollLast(long timeout, TimeUnit unit) throws InterruptedException { public V pollLast(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollLastAsync(timeout, unit)); return commandExecutor.getInterrupted(pollLastAsync(timeout, unit));
} }
} }

@ -77,7 +77,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/ */
@Override @Override
public V take() throws InterruptedException { public V take() throws InterruptedException {
return get(takeAsync()); return commandExecutor.getInterrupted(takeAsync());
} }
@Override @Override
@ -91,7 +91,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/ */
@Override @Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException { public V poll(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollAsync(timeout, unit)); return commandExecutor.getInterrupted(pollAsync(timeout, unit));
} }
/* /*
@ -100,7 +100,7 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
*/ */
@Override @Override
public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollFromAnyAsync(timeout, unit, queueNames)); return commandExecutor.getInterrupted(pollFromAnyAsync(timeout, unit, queueNames));
} }
/* /*
@ -119,12 +119,12 @@ public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlock
@Override @Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
return get(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); return commandExecutor.getInterrupted(pollLastAndOfferFirstToAsync(queueName, timeout, unit));
} }
@Override @Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return get(takeLastAndOfferFirstToAsync(queueName)); return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
} }
@Override @Override

@ -192,7 +192,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
*/ */
@Override @Override
public V take() throws InterruptedException { public V take() throws InterruptedException {
return get(takeAsync()); return commandExecutor.getInterrupted(takeAsync());
} }
@Override @Override
@ -207,7 +207,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
*/ */
@Override @Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException { public V poll(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollAsync(timeout, unit)); return commandExecutor.getInterrupted(pollAsync(timeout, unit));
} }
/* /*
@ -216,7 +216,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
*/ */
@Override @Override
public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { public V pollFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollFromAnyAsync(timeout, unit, queueNames)); return commandExecutor.getInterrupted(pollFromAnyAsync(timeout, unit, queueNames));
} }
/* /*
@ -231,7 +231,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override @Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return get(takeLastAndOfferFirstToAsync(queueName)); return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
} }
@Override @Override
@ -247,7 +247,7 @@ public class RedissonBoundedBlockingQueue<V> extends RedissonQueue<V> implements
@Override @Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
return get(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); return commandExecutor.getInterrupted(pollLastAndOfferFirstToAsync(queueName, timeout, unit));
} }
@Override @Override

@ -96,7 +96,7 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override @Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return get(takeLastAndOfferFirstToAsync(queueName)); return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
} }
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) { public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {
@ -175,7 +175,7 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override @Override
public V takeFirst() throws InterruptedException { public V takeFirst() throws InterruptedException {
return get(takeFirstAsync()); return commandExecutor.getInterrupted(takeFirstAsync());
} }
@Override @Override
@ -192,7 +192,7 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override @Override
public V takeLast() throws InterruptedException { public V takeLast() throws InterruptedException {
return get(takeLastAsync()); return commandExecutor.getInterrupted(takeLastAsync());
} }
@Override @Override
@ -202,7 +202,7 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override @Override
public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { public V pollFirstFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollFirstFromAnyAsync(timeout, unit, queueNames)); return commandExecutor.getInterrupted(pollFirstFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
@ -212,7 +212,7 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override @Override
public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException { public V pollLastFromAny(long timeout, TimeUnit unit, String... queueNames) throws InterruptedException {
return get(pollLastFromAnyAsync(timeout, unit, queueNames)); return commandExecutor.getInterrupted(pollLastFromAnyAsync(timeout, unit, queueNames));
} }
@Override @Override
@ -222,7 +222,7 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override @Override
public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException { public V pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollFirstAsync(timeout, unit)); return commandExecutor.getInterrupted(pollFirstAsync(timeout, unit));
} }
@Override @Override
@ -234,7 +234,7 @@ public class RedissonPriorityBlockingDeque<V> extends RedissonPriorityDeque<V> i
@Override @Override
public V pollLast(long timeout, TimeUnit unit) throws InterruptedException { public V pollLast(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollLastAsync(timeout, unit)); return commandExecutor.getInterrupted(pollLastAsync(timeout, unit));
} }
} }

@ -107,7 +107,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
@Override @Override
public V take() throws InterruptedException { public V take() throws InterruptedException {
return get(takeAsync()); return commandExecutor.getInterrupted(takeAsync());
} }
public RFuture<V> pollAsync(long timeout, TimeUnit unit) { public RFuture<V> pollAsync(long timeout, TimeUnit unit) {
@ -118,7 +118,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
@Override @Override
public V poll(long timeout, TimeUnit unit) throws InterruptedException { public V poll(long timeout, TimeUnit unit) throws InterruptedException {
return get(pollAsync(timeout, unit)); return commandExecutor.getInterrupted(pollAsync(timeout, unit));
} }
@Override @Override
@ -135,12 +135,12 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
@Override @Override
public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException { public V pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) throws InterruptedException {
return get(pollLastAndOfferFirstToAsync(queueName, timeout, unit)); return commandExecutor.getInterrupted(pollLastAndOfferFirstToAsync(queueName, timeout, unit));
} }
@Override @Override
public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException { public V takeLastAndOfferFirstTo(String queueName) throws InterruptedException {
return get(takeLastAndOfferFirstToAsync(queueName)); return commandExecutor.getInterrupted(takeLastAndOfferFirstToAsync(queueName));
} }
public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) { public RFuture<V> takeLastAndOfferFirstToAsync(String queueName) {

@ -58,6 +58,8 @@ public interface CommandAsyncExecutor {
void syncSubscription(RFuture<?> future); void syncSubscription(RFuture<?> future);
<V> V get(RFuture<V> future); <V> V get(RFuture<V> future);
<V> V getInterrupted(RFuture<V> future) throws InterruptedException;
<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params); <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params);

@ -130,29 +130,22 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <V> V get(RFuture<V> future) { public <V> V get(RFuture<V> future) {
if (!future.isDone()) { try {
CountDownLatch l = new CountDownLatch(1); future.await();
future.onComplete((res, e) -> { } catch (InterruptedException e) {
l.countDown(); Thread.currentThread().interrupt();
}); }
if (future.isSuccess()) {
boolean interrupted = false; return future.getNow();
while (!future.isDone()) {
try {
l.await();
} catch (InterruptedException e) {
interrupted = true;
break;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} }
// commented out due to blocking issues up to 200 ms per minute for each thread throw convertException(future);
// future.awaitUninterruptibly(); }
@Override
public <V> V getInterrupted(RFuture<V> future) throws InterruptedException {
future.await();
if (future.isSuccess()) { if (future.isSuccess()) {
return future.getNow(); return future.getNow();
} }

@ -15,6 +15,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.redisson.ClusterRunner.ClusterProcesses; import org.redisson.ClusterRunner.ClusterProcesses;
@ -121,7 +123,7 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
t.join(); t.join();
await().atMost(7, TimeUnit.SECONDS).until(() -> executed.get()); await().atMost(7, TimeUnit.SECONDS).untilTrue(executed);
redisson.shutdown(); redisson.shutdown();
runner.stop(); runner.stop();
@ -279,6 +281,50 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
redisson.shutdown(); redisson.shutdown();
} }
@Test
public void testTakeInterrupted() throws InterruptedException {
final AtomicBoolean interrupted = new AtomicBoolean();
Thread t = new Thread() {
public void run() {
try {
RBlockingQueue<Integer> queue1 = getQueue(redisson);
queue1.take();
} catch (InterruptedException e) {
interrupted.set(true);
}
};
};
t.start();
t.join(1000);
t.interrupt();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(interrupted);
}
@Test
public void testPollInterrupted() throws InterruptedException {
final AtomicBoolean interrupted = new AtomicBoolean();
Thread t = new Thread() {
public void run() {
try {
RBlockingQueue<Integer> queue1 = getQueue(redisson);
queue1.poll(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
interrupted.set(true);
}
};
};
t.start();
t.join(1000);
t.interrupt();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(interrupted);
}
@Test @Test
public void testTakeAsyncCancel() { public void testTakeAsyncCancel() {
Config config = createConfig(); Config config = createConfig();

Loading…
Cancel
Save