diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java index 009baab2f..93419660d 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java @@ -100,12 +100,14 @@ public abstract class BaseRemoteProxy { addCancelHandling(requestId, responseFuture); - Timeout responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture); + Result res = new Result(responseFuture); + + Timeout responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture, res); + res.setResponseTimeoutFuture(responseTimeoutFuture); Map> entryResponses = entry.getResponses(); List list = entryResponses.computeIfAbsent(requestId, k -> new ArrayList<>(3)); - Result res = new Result(responseFuture, responseTimeoutFuture); if (insertFirst) { list.add(0, res); } else { @@ -121,7 +123,8 @@ public abstract class BaseRemoteProxy { return responseFuture; } - private Timeout createResponseTimeout(long timeout, String requestId, CompletableFuture responseFuture) { + private Timeout createResponseTimeout(long timeout, String requestId, + CompletableFuture responseFuture, Result res) { return commandExecutor.getServiceManager().newTimeout(t -> { responses.computeIfPresent(responseQueueName, (k, entry) -> { RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); @@ -130,7 +133,7 @@ public abstract class BaseRemoteProxy { } List list = entry.getResponses().get(requestId); - list.remove(0); + list.remove(res); if (list.isEmpty()) { entry.getResponses().remove(requestId); } diff --git a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java index 6fb9a59d0..49d3b92cd 100644 --- a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java +++ b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java @@ -33,18 +33,21 @@ public class ResponseEntry { public static class Result { private final CompletableFuture promise; - private final Timeout responseTimeoutFuture; + private Timeout responseTimeoutFuture; - public Result(CompletableFuture promise, Timeout responseTimeoutFuture) { + public Result(CompletableFuture promise) { super(); this.promise = promise; - this.responseTimeoutFuture = responseTimeoutFuture; } public CompletableFuture getPromise() { return (CompletableFuture) promise; } - + + public void setResponseTimeoutFuture(Timeout responseTimeoutFuture) { + this.responseTimeoutFuture = responseTimeoutFuture; + } + public void cancelResponseTimeout() { responseTimeoutFuture.cancel(); }