Fixed - Interrupted blocking methods aren't canceled #2403

pull/2450/head
Nikita Koksharov 5 years ago
parent 8e74c0e9f6
commit 1694dd04ae

@ -142,8 +142,13 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <V> V getInterrupted(RFuture<V> future) throws InterruptedException { public <V> V getInterrupted(RFuture<V> future) throws InterruptedException {
future.await(); try {
future.await();
} catch (InterruptedException e) {
((RPromise)future).tryFailure(e);
throw e;
}
if (future.isSuccess()) { if (future.isSuccess()) {
return future.getNow(); return future.getNow();
} }

@ -372,7 +372,9 @@ public class RedisExecutor<V, R> {
} }
// handling cancel operation for blocking commands // handling cancel operation for blocking commands
if (mainPromise.isCancelled() && !attemptPromise.isDone()) { if ((mainPromise.isCancelled()
|| mainPromise.cause() instanceof InterruptedException)
&& !attemptPromise.isDone()) {
log.debug("Canceled blocking operation {} used {}", command, connection); log.debug("Canceled blocking operation {} used {}", command, connection);
connection.forceFastReconnectAsync().onComplete((r, ex) -> { connection.forceFastReconnectAsync().onComplete((r, ex) -> {
attemptPromise.cancel(true); attemptPromise.cancel(true);

@ -295,12 +295,17 @@ public class RedissonBlockingQueueTest extends RedissonQueueTest {
} }
}; };
}; };
t.start(); t.start();
t.join(1000); t.join(1000);
t.interrupt(); t.interrupt();
Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(interrupted); Awaitility.await().atMost(Duration.ONE_SECOND).untilTrue(interrupted);
RBlockingQueue<Integer> q = getQueue(redisson);
q.add(1);
Thread.sleep(1000);
assertThat(q.contains(1)).isTrue();
} }
@Test @Test

Loading…
Cancel
Save