Fixed - IllegalReferenceCountException is thrown when canceling a method call. #5519

pull/5540/head
Nikita Koksharov 1 year ago
parent 93de8e673a
commit 7a9b0a184d

@ -132,20 +132,20 @@ public class RedisExecutor<V, R> {
return; return;
} }
if (connectionFuture.cancel(false)) { if (connectionFuture.completeExceptionally(new CancellationException())) {
log.debug("Connection obtaining canceled for {}", command); log.debug("Connection obtaining canceled for {}", command);
timeout.ifPresent(Timeout::cancel); timeout.ifPresent(Timeout::cancel);
if (attemptPromise.cancel(false)) { if (attemptPromise.completeExceptionally(new CancellationException())) {
free(); free();
} }
} else { } else {
if (command.isBlockingCommand()) { if (command.isBlockingCommand()) {
RedisConnection c = connectionFuture.getNow(null); RedisConnection c = connectionFuture.getNow(null);
if (writeFuture.cancel(false)) { if (writeFuture.cancel(false)) {
attemptPromise.cancel(false); attemptPromise.completeExceptionally(new CancellationException());
} else { } else {
c.forceFastReconnectAsync().whenComplete((res, ex) -> { c.forceFastReconnectAsync().whenComplete((res, ex) -> {
attemptPromise.cancel(true); attemptPromise.completeExceptionally(new CancellationException());
}); });
} }
} }
@ -212,7 +212,7 @@ public class RedisExecutor<V, R> {
timeout.ifPresent(Timeout::cancel); timeout.ifPresent(Timeout::cancel);
TimerTask task = timeout -> { TimerTask task = timeout -> {
if (connectionFuture.cancel(false)) { if (connectionFuture.completeExceptionally(new CancellationException())) {
exception = new RedisTimeoutException("Unable to acquire connection! " + this.connectionFuture + exception = new RedisTimeoutException("Unable to acquire connection! " + this.connectionFuture +
"Increase connection pool size or timeout. " "Increase connection pool size or timeout. "
+ "Node source: " + source + "Node source: " + source
@ -261,7 +261,7 @@ public class RedisExecutor<V, R> {
return; return;
} }
if (connectionFuture.cancel(false)) { if (connectionFuture.completeExceptionally(new CancellationException())) {
exception = new RedisTimeoutException("Unable to acquire connection! " + connectionFuture + exception = new RedisTimeoutException("Unable to acquire connection! " + connectionFuture +
"Increase connection pool size. " "Increase connection pool size. "
+ "Node source: " + source + "Node source: " + source
@ -297,7 +297,7 @@ public class RedisExecutor<V, R> {
} }
if (mainPromise.isCancelled()) { if (mainPromise.isCancelled()) {
if (attemptPromise.cancel(false)) { if (attemptPromise.completeExceptionally(new CancellationException())) {
free(); free();
} }
return; return;
@ -310,7 +310,7 @@ public class RedisExecutor<V, R> {
} }
return; return;
} }
if (!attemptPromise.cancel(false)) { if (!attemptPromise.completeExceptionally(new CancellationException())) {
return; return;
} }
@ -407,7 +407,7 @@ public class RedisExecutor<V, R> {
long timeoutAmount = timeoutTime; long timeoutAmount = timeoutTime;
TimerTask timeoutResponseTask = timeout -> { TimerTask timeoutResponseTask = timeout -> {
if (isResendAllowed(attempt, attempts)) { if (isResendAllowed(attempt, attempts)) {
if (!attemptPromise.cancel(false)) { if (!attemptPromise.completeExceptionally(new CancellationException())) {
return; return;
} }
@ -469,7 +469,7 @@ public class RedisExecutor<V, R> {
&& !attemptPromise.isDone()) { && !attemptPromise.isDone()) {
log.debug("Canceled blocking operation {} used {}", command, connection); log.debug("Canceled blocking operation {} used {}", command, connection);
connection.forceFastReconnectAsync().whenComplete((r, ex) -> { connection.forceFastReconnectAsync().whenComplete((r, ex) -> {
attemptPromise.cancel(true); attemptPromise.completeExceptionally(new CancellationException());
}); });
return; return;
} }

Loading…
Cancel
Save