From aca56397bd6fa7a3c3e5b473d15e4a4dd17ffb9f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 15 Dec 2023 10:17:36 +0300 Subject: [PATCH] Fixed - EvictionTask keeps running even after destroy() method has been called #5504 --- .../org/redisson/RedissonExecutorService.java | 2 +- .../RedissonPriorityBlockingQueue.java | 2 +- .../org/redisson/RedissonRateLimiter.java | 6 +- .../redisson/cache/LocalCacheListener.java | 9 +- .../cluster/ClusterConnectionManager.java | 90 +++++++++---------- .../org/redisson/connection/DNSMonitor.java | 8 +- .../ReplicatedConnectionManager.java | 8 +- .../connection/SentinelConnectionManager.java | 44 ++++----- .../redisson/eviction/EvictionScheduler.java | 4 +- .../org/redisson/eviction/EvictionTask.java | 19 ++-- .../RedissonExecutorRemoteService.java | 2 +- .../org/redisson/remote/BaseRemoteProxy.java | 55 ++++++------ .../org/redisson/remote/ResponseEntry.java | 11 +-- 13 files changed, 123 insertions(+), 137 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonExecutorService.java b/redisson/src/main/java/org/redisson/RedissonExecutorService.java index 9c6070377..d78d20956 100644 --- a/redisson/src/main/java/org/redisson/RedissonExecutorService.java +++ b/redisson/src/main/java/org/redisson/RedissonExecutorService.java @@ -806,7 +806,7 @@ public class RedissonExecutorService implements RScheduledExecutorService { List list = entry.getResponses().remove(requestId); if (list != null) { for (Result result : list) { - result.getResponseTimeoutFuture().cancel(true); + result.cancelResponseTimeout(); } } if (entry.getResponses().isEmpty()) { diff --git a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java index 5c23a6c33..898c8697c 100644 --- a/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java +++ b/redisson/src/main/java/org/redisson/RedissonPriorityBlockingQueue.java @@ -74,7 +74,7 @@ public class RedissonPriorityBlockingQueue extends RedissonPriorityQueue i protected void takeAsync(CompletableFuture result, long delay, long timeoutInMicro, RedisCommand command, Object... params) { long start = System.currentTimeMillis(); - commandExecutor.getServiceManager().getGroup().schedule(() -> { + commandExecutor.getServiceManager().newTimeout(t -> { RFuture future = wrapLockedAsync(command, params); future.whenComplete((res, e) -> { if (e != null && !(e instanceof RedisConnectionException)) { diff --git a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java index 33a6246ae..4bc22eac2 100644 --- a/redisson/src/main/java/org/redisson/RedissonRateLimiter.java +++ b/redisson/src/main/java/org/redisson/RedissonRateLimiter.java @@ -134,7 +134,7 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit if (timeoutInMillis == -1) { CompletableFuture f = new CompletableFuture<>(); - getServiceManager().getGroup().schedule(() -> { + getServiceManager().newTimeout(t -> { CompletableFuture r = tryAcquireAsync(permits, timeoutInMillis); commandExecutor.transfer(r, f); }, delay, TimeUnit.MILLISECONDS); @@ -149,12 +149,12 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit CompletableFuture f = new CompletableFuture<>(); if (remains < delay) { - getServiceManager().getGroup().schedule(() -> { + getServiceManager().newTimeout(t -> { f.complete(false); }, remains, TimeUnit.MILLISECONDS); } else { long start = System.currentTimeMillis(); - getServiceManager().getGroup().schedule(() -> { + getServiceManager().newTimeout(t -> { long elapsed = System.currentTimeMillis() - start; if (remains <= elapsed) { f.complete(false); diff --git a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java index f02a272dd..c8958afb8 100644 --- a/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java +++ b/redisson/src/main/java/org/redisson/cache/LocalCacheListener.java @@ -329,12 +329,9 @@ public abstract class LocalCacheListener { cache.remove(key); } - commandExecutor.getServiceManager().getGroup().schedule(new Runnable() { - @Override - public void run() { - for (CacheKey cacheKey : keys) { - disabledKeys.remove(cacheKey, requestId); - } + commandExecutor.getServiceManager().newTimeout(t -> { + for (CacheKey cacheKey : keys) { + disabledKeys.remove(cacheKey, requestId); } }, timeout, TimeUnit.MILLISECONDS); } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 5e04eb059..8b5b70df6 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -16,9 +16,9 @@ package org.redisson.cluster; import io.netty.buffer.ByteBuf; +import io.netty.util.Timeout; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ScheduledFuture; import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.*; @@ -55,7 +55,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private final ConcurrentMap lastPartitions = new ConcurrentHashMap<>(); - private ScheduledFuture monitorFuture; + private volatile Timeout monitorFuture; private volatile RedisURI lastClusterNode; @@ -356,58 +356,54 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } private void scheduleClusterChangeCheck(ClusterServersConfig cfg) { - monitorFuture = serviceManager.getGroup().schedule(new Runnable() { - @Override - public void run() { - if (configEndpointHostName != null) { - String address = cfg.getNodeAddresses().iterator().next(); - RedisURI uri = new RedisURI(address); - Future> allNodes = serviceManager.resolveAll(uri); - allNodes.addListener(new FutureListener>() { - @Override - public void operationComplete(Future> future) throws Exception { - AtomicReference lastException = new AtomicReference(future.cause()); - if (!future.isSuccess()) { - checkClusterState(cfg, Collections.emptyIterator(), lastException); - return; - } - - List nodes = new ArrayList<>(); - for (InetSocketAddress addr : future.getNow()) { - RedisURI address = serviceManager.toURI(uri.getScheme(), addr.getAddress().getHostAddress(), "" + addr.getPort()); - nodes.add(address); - } - - Iterator nodesIterator = nodes.iterator(); - checkClusterState(cfg, nodesIterator, lastException); + monitorFuture = serviceManager.newTimeout(t -> { + if (configEndpointHostName != null) { + String address = cfg.getNodeAddresses().iterator().next(); + RedisURI uri = new RedisURI(address); + Future> allNodes = serviceManager.resolveAll(uri); + allNodes.addListener(new FutureListener>() { + @Override + public void operationComplete(Future> future) throws Exception { + AtomicReference lastException = new AtomicReference(future.cause()); + if (!future.isSuccess()) { + checkClusterState(cfg, Collections.emptyIterator(), lastException); + return; } - }); - } else { - AtomicReference lastException = new AtomicReference<>(); - List nodes = new ArrayList<>(); - List slaves = new ArrayList<>(); - - for (ClusterPartition partition : getLastPartitions()) { - if (!partition.isMasterFail()) { - nodes.add(partition.getMasterAddress()); + + List nodes = new ArrayList<>(); + for (InetSocketAddress addr : future.getNow()) { + RedisURI address = serviceManager.toURI(uri.getScheme(), addr.getAddress().getHostAddress(), "" + addr.getPort()); + nodes.add(address); } - Set partitionSlaves = new HashSet<>(partition.getSlaveAddresses()); - partitionSlaves.removeAll(partition.getFailedSlaveAddresses()); - slaves.addAll(partitionSlaves); + Iterator nodesIterator = nodes.iterator(); + checkClusterState(cfg, nodesIterator, lastException); } - Collections.shuffle(nodes); - Collections.shuffle(slaves); - - // master nodes first - nodes.addAll(slaves); + }); + } else { + AtomicReference lastException = new AtomicReference<>(); + List nodes = new ArrayList<>(); + List slaves = new ArrayList<>(); - Iterator nodesIterator = nodes.iterator(); + for (ClusterPartition partition : getLastPartitions()) { + if (!partition.isMasterFail()) { + nodes.add(partition.getMasterAddress()); + } - checkClusterState(cfg, nodesIterator, lastException); + Set partitionSlaves = new HashSet<>(partition.getSlaveAddresses()); + partitionSlaves.removeAll(partition.getFailedSlaveAddresses()); + slaves.addAll(partitionSlaves); } - } + Collections.shuffle(nodes); + Collections.shuffle(slaves); + + // master nodes first + nodes.addAll(slaves); + Iterator nodesIterator = nodes.iterator(); + + checkClusterState(cfg, nodesIterator, lastException); + } }, cfg.getScanInterval(), TimeUnit.MILLISECONDS); } @@ -922,7 +918,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { @Override public void shutdown(long quietPeriod, long timeout, TimeUnit unit) { if (monitorFuture != null) { - monitorFuture.cancel(true); + monitorFuture.cancel(); } closeNodeConnections(); diff --git a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java index 572602ccc..eecfba0e7 100644 --- a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java +++ b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java @@ -17,9 +17,9 @@ package org.redisson.connection; import io.netty.resolver.AddressResolver; import io.netty.resolver.AddressResolverGroup; +import io.netty.util.Timeout; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ScheduledFuture; import org.redisson.client.RedisClient; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.RedisURI; @@ -47,7 +47,7 @@ public class DNSMonitor { private final Map masters = new HashMap<>(); private final Map slaves = new HashMap<>(); - private ScheduledFuture dnsMonitorFuture; + private volatile Timeout dnsMonitorFuture; private long dnsMonitoringInterval; public DNSMonitor(ConnectionManager connectionManager, RedisClient masterHost, Collection slaveHosts, long dnsMonitoringInterval, AddressResolverGroup resolverGroup) { @@ -72,12 +72,12 @@ public class DNSMonitor { public void stop() { if (dnsMonitorFuture != null) { - dnsMonitorFuture.cancel(true); + dnsMonitorFuture.cancel(); } } private void monitorDnsChange() { - dnsMonitorFuture = connectionManager.getServiceManager().getGroup().schedule(() -> { + dnsMonitorFuture = connectionManager.getServiceManager().newTimeout(t -> { if (connectionManager.getServiceManager().isShuttingDown()) { return; } diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index d51cc3cf9..98bfaaa46 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -15,7 +15,7 @@ */ package org.redisson.connection; -import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.Timeout; import org.redisson.api.NodeType; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; @@ -59,7 +59,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { private final AtomicReference currentMaster = new AtomicReference<>(); - private ScheduledFuture monitorFuture; + private volatile Timeout monitorFuture; private enum Role { master, @@ -129,7 +129,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { return; } - monitorFuture = serviceManager.getGroup().schedule(() -> { + monitorFuture = serviceManager.newTimeout(t -> { if (serviceManager.isShuttingDown()) { return; } @@ -260,7 +260,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { @Override public void shutdown(long quietPeriod, long timeout, TimeUnit unit) { if (monitorFuture != null) { - monitorFuture.cancel(true); + monitorFuture.cancel(); } closeNodeConnections(); diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 0d6900e4f..e49473b6f 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -16,9 +16,9 @@ package org.redisson.connection; import io.netty.util.NetUtil; +import io.netty.util.Timeout; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.StringUtil; import org.redisson.api.NodeType; import org.redisson.api.RFuture; @@ -56,7 +56,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final ConcurrentMap sentinels = new ConcurrentHashMap<>(); private final AtomicReference currentMaster = new AtomicReference<>(); - private ScheduledFuture monitorFuture; + private volatile Timeout monitorFuture; private final Set disconnectedSentinels = Collections.newSetFromMap(new ConcurrentHashMap<>()); private RedisStrictCommand masterHostCommand; @@ -276,16 +276,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private void scheduleSentinelDNSCheck() { - monitorFuture = serviceManager.getGroup().schedule(new Runnable() { - @Override - public void run() { - AtomicInteger sentinelsCounter = new AtomicInteger(sentinelHosts.size()); - performSentinelDNSCheck(future -> { - if (sentinelsCounter.decrementAndGet() == 0) { - scheduleSentinelDNSCheck(); - } - }); - } + monitorFuture = serviceManager.newTimeout(t -> { + AtomicInteger sentinelsCounter = new AtomicInteger(sentinelHosts.size()); + performSentinelDNSCheck(future -> { + if (sentinelsCounter.decrementAndGet() == 0) { + scheduleSentinelDNSCheck(); + } + }); }, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS); } @@ -312,19 +309,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator iterator) { - monitorFuture = serviceManager.getGroup().schedule(new Runnable() { - @Override - public void run() { - AtomicReference lastException = new AtomicReference(); - Iterator iter = iterator; - if (iter == null) { - // Shuffle the list so all clients don't prefer the same sentinel - List clients = new ArrayList<>(sentinels.values()); - Collections.shuffle(clients); - iter = clients.iterator(); - } - checkState(cfg, iter, lastException); + monitorFuture = serviceManager.newTimeout(t -> { + AtomicReference lastException = new AtomicReference(); + Iterator iter = iterator; + if (iter == null) { + // Shuffle the list so all clients don't prefer the same sentinel + List clients = new ArrayList<>(sentinels.values()); + Collections.shuffle(clients); + iter = clients.iterator(); } + checkState(cfg, iter, lastException); }, cfg.getScanInterval(), TimeUnit.MILLISECONDS); } @@ -682,7 +676,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { @Override public void shutdown(long quietPeriod, long timeout, TimeUnit unit) { if (monitorFuture != null) { - monitorFuture.cancel(true); + monitorFuture.cancel(); } sentinels.values().stream() diff --git a/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java b/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java index 62f528b5d..b8817339e 100644 --- a/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java +++ b/redisson/src/main/java/org/redisson/eviction/EvictionScheduler.java @@ -88,8 +88,8 @@ public class EvictionScheduler { public void remove(String name) { EvictionTask task = tasks.remove(name); - if (task != null && task.getScheduledFuture() != null) { - task.getScheduledFuture().cancel(false); + if (task != null) { + task.cancel(); } } diff --git a/redisson/src/main/java/org/redisson/eviction/EvictionTask.java b/redisson/src/main/java/org/redisson/eviction/EvictionTask.java index 918290220..71ef407b6 100644 --- a/redisson/src/main/java/org/redisson/eviction/EvictionTask.java +++ b/redisson/src/main/java/org/redisson/eviction/EvictionTask.java @@ -15,7 +15,8 @@ */ package org.redisson.eviction; -import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import org.redisson.api.RFuture; import org.redisson.command.CommandAsyncExecutor; import org.slf4j.Logger; @@ -30,11 +31,11 @@ import java.util.concurrent.TimeUnit; * @author Nikita Koksharov * */ -abstract class EvictionTask implements Runnable { +abstract class EvictionTask implements TimerTask { private final Logger log = LoggerFactory.getLogger(getClass()); - final Deque sizeHistory = new LinkedList(); + final Deque sizeHistory = new LinkedList<>(); final int minDelay; final int maxDelay; final int keysLimit; @@ -43,8 +44,8 @@ abstract class EvictionTask implements Runnable { final CommandAsyncExecutor executor; - ScheduledFuture scheduledFuture; - + volatile Timeout timeout; + EvictionTask(CommandAsyncExecutor executor) { super(); this.executor = executor; @@ -54,11 +55,11 @@ abstract class EvictionTask implements Runnable { } public void schedule() { - scheduledFuture = executor.getServiceManager().getGroup().schedule(this, delay, TimeUnit.SECONDS); + timeout = executor.getServiceManager().newTimeout(this, delay, TimeUnit.SECONDS); } - public ScheduledFuture getScheduledFuture() { - return scheduledFuture; + public void cancel() { + timeout.cancel(); } abstract RFuture execute(); @@ -66,7 +67,7 @@ abstract class EvictionTask implements Runnable { abstract String getName(); @Override - public void run() { + public void run(Timeout timeout) { if (executor.getServiceManager().isShuttingDown()) { return; } diff --git a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java index ee6015547..bf88531c6 100644 --- a/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java +++ b/redisson/src/main/java/org/redisson/executor/RedissonExecutorRemoteService.java @@ -97,7 +97,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService { startedListeners.forEach(l -> l.onStarted(request.getId())); if (taskTimeout > 0) { - commandExecutor.getServiceManager().getGroup().schedule(() -> { + commandExecutor.getServiceManager().newTimeout(t -> { cancelRequestFuture.complete(new RemoteServiceCancelRequest(true, false)); }, taskTimeout, TimeUnit.MILLISECONDS); } diff --git a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java index b3ab43c93..e3070591e 100644 --- a/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java +++ b/redisson/src/main/java/org/redisson/remote/BaseRemoteProxy.java @@ -15,7 +15,7 @@ */ package org.redisson.remote; -import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.Timeout; import org.redisson.RedissonBlockingQueue; import org.redisson.RedissonShutdownException; import org.redisson.api.RBlockingQueue; @@ -101,7 +101,7 @@ public abstract class BaseRemoteProxy { addCancelHandling(requestId, responseFuture); - ScheduledFuture responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture); + Timeout responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture); Map> entryResponses = entry.getResponses(); List list = entryResponses.computeIfAbsent(requestId, k -> new ArrayList<>(3)); @@ -122,31 +122,28 @@ public abstract class BaseRemoteProxy { return responseFuture; } - private ScheduledFuture createResponseTimeout(long timeout, String requestId, CompletableFuture responseFuture) { - return commandExecutor.getServiceManager().getGroup().schedule(new Runnable() { - @Override - public void run() { - locked.execute(() -> { - ResponseEntry entry = responses.get(responseQueueName); - if (entry == null) { - return; - } - - RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); - if (!responseFuture.completeExceptionally(ex)) { - return; - } - - List list = entry.getResponses().get(requestId); - list.remove(0); - if (list.isEmpty()) { - entry.getResponses().remove(requestId); - } - if (entry.getResponses().isEmpty()) { - responses.remove(responseQueueName, entry); - } - }); - } + private Timeout createResponseTimeout(long timeout, String requestId, CompletableFuture responseFuture) { + return commandExecutor.getServiceManager().newTimeout(t -> { + locked.execute(() -> { + ResponseEntry entry = responses.get(responseQueueName); + if (entry == null) { + return; + } + + RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms"); + if (!responseFuture.completeExceptionally(ex)) { + return; + } + + List list = entry.getResponses().get(requestId); + list.remove(0); + if (list.isEmpty()) { + entry.getResponses().remove(requestId); + } + if (entry.getResponses().isEmpty()) { + responses.remove(responseQueueName, entry); + } + }); }, timeout, TimeUnit.MILLISECONDS); } @@ -166,7 +163,7 @@ public abstract class BaseRemoteProxy { for (Iterator iterator = list.iterator(); iterator.hasNext();) { Result result = iterator.next(); if (result.getPromise() == responseFuture) { - result.getResponseTimeoutFuture().cancel(true); + result.cancelResponseTimeout(); iterator.remove(); } } @@ -222,7 +219,7 @@ public abstract class BaseRemoteProxy { } CompletableFuture f = res.getPromise(); - res.getResponseTimeoutFuture().cancel(true); + res.cancelResponseTimeout(); if (entry.getResponses().isEmpty()) { responses.remove(responseQueueName, entry); diff --git a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java index 1c6f265bf..617ee538c 100644 --- a/redisson/src/main/java/org/redisson/remote/ResponseEntry.java +++ b/redisson/src/main/java/org/redisson/remote/ResponseEntry.java @@ -15,11 +15,12 @@ */ package org.redisson.remote; +import io.netty.util.Timeout; + import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -32,9 +33,9 @@ public class ResponseEntry { public static class Result { private final CompletableFuture promise; - private final ScheduledFuture responseTimeoutFuture; + private final Timeout responseTimeoutFuture; - public Result(CompletableFuture promise, ScheduledFuture responseTimeoutFuture) { + public Result(CompletableFuture promise, Timeout responseTimeoutFuture) { super(); this.promise = promise; this.responseTimeoutFuture = responseTimeoutFuture; @@ -44,8 +45,8 @@ public class ResponseEntry { return (CompletableFuture) promise; } - public ScheduledFuture getResponseTimeoutFuture() { - return responseTimeoutFuture; + public void cancelResponseTimeout() { + responseTimeoutFuture.cancel(); } }