Fixed - EvictionTask keeps running even after destroy() method has been called #5504

pull/5520/head
Nikita Koksharov 1 year ago
parent 9384160bd8
commit aca56397bd

@ -806,7 +806,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<Result> list = entry.getResponses().remove(requestId);
if (list != null) {
for (Result result : list) {
result.getResponseTimeoutFuture().cancel(true);
result.cancelResponseTimeout();
}
}
if (entry.getResponses().isEmpty()) {

@ -74,7 +74,7 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
protected <T> void takeAsync(CompletableFuture<V> result, long delay, long timeoutInMicro, RedisCommand<T> command, Object... params) {
long start = System.currentTimeMillis();
commandExecutor.getServiceManager().getGroup().schedule(() -> {
commandExecutor.getServiceManager().newTimeout(t -> {
RFuture<V> future = wrapLockedAsync(command, params);
future.whenComplete((res, e) -> {
if (e != null && !(e instanceof RedisConnectionException)) {

@ -134,7 +134,7 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
if (timeoutInMillis == -1) {
CompletableFuture<Boolean> f = new CompletableFuture<>();
getServiceManager().getGroup().schedule(() -> {
getServiceManager().newTimeout(t -> {
CompletableFuture<Boolean> r = tryAcquireAsync(permits, timeoutInMillis);
commandExecutor.transfer(r, f);
}, delay, TimeUnit.MILLISECONDS);
@ -149,12 +149,12 @@ public class RedissonRateLimiter extends RedissonExpirable implements RRateLimit
CompletableFuture<Boolean> f = new CompletableFuture<>();
if (remains < delay) {
getServiceManager().getGroup().schedule(() -> {
getServiceManager().newTimeout(t -> {
f.complete(false);
}, remains, TimeUnit.MILLISECONDS);
} else {
long start = System.currentTimeMillis();
getServiceManager().getGroup().schedule(() -> {
getServiceManager().newTimeout(t -> {
long elapsed = System.currentTimeMillis() - start;
if (remains <= elapsed) {
f.complete(false);

@ -329,12 +329,9 @@ public abstract class LocalCacheListener {
cache.remove(key);
}
commandExecutor.getServiceManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
for (CacheKey cacheKey : keys) {
disabledKeys.remove(cacheKey, requestId);
}
commandExecutor.getServiceManager().newTimeout(t -> {
for (CacheKey cacheKey : keys) {
disabledKeys.remove(cacheKey, requestId);
}
}, timeout, TimeUnit.MILLISECONDS);
}

@ -16,9 +16,9 @@
package org.redisson.cluster;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.*;
@ -55,7 +55,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final ConcurrentMap<Integer, ClusterPartition> lastPartitions = new ConcurrentHashMap<>();
private ScheduledFuture<?> monitorFuture;
private volatile Timeout monitorFuture;
private volatile RedisURI lastClusterNode;
@ -356,58 +356,54 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
private void scheduleClusterChangeCheck(ClusterServersConfig cfg) {
monitorFuture = serviceManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
if (configEndpointHostName != null) {
String address = cfg.getNodeAddresses().iterator().next();
RedisURI uri = new RedisURI(address);
Future<List<InetSocketAddress>> allNodes = serviceManager.resolveAll(uri);
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(future.cause());
if (!future.isSuccess()) {
checkClusterState(cfg, Collections.emptyIterator(), lastException);
return;
}
List<RedisURI> nodes = new ArrayList<>();
for (InetSocketAddress addr : future.getNow()) {
RedisURI address = serviceManager.toURI(uri.getScheme(), addr.getAddress().getHostAddress(), "" + addr.getPort());
nodes.add(address);
}
Iterator<RedisURI> nodesIterator = nodes.iterator();
checkClusterState(cfg, nodesIterator, lastException);
monitorFuture = serviceManager.newTimeout(t -> {
if (configEndpointHostName != null) {
String address = cfg.getNodeAddresses().iterator().next();
RedisURI uri = new RedisURI(address);
Future<List<InetSocketAddress>> allNodes = serviceManager.resolveAll(uri);
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(future.cause());
if (!future.isSuccess()) {
checkClusterState(cfg, Collections.emptyIterator(), lastException);
return;
}
});
} else {
AtomicReference<Throwable> lastException = new AtomicReference<>();
List<RedisURI> nodes = new ArrayList<>();
List<RedisURI> slaves = new ArrayList<>();
for (ClusterPartition partition : getLastPartitions()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
List<RedisURI> nodes = new ArrayList<>();
for (InetSocketAddress addr : future.getNow()) {
RedisURI address = serviceManager.toURI(uri.getScheme(), addr.getAddress().getHostAddress(), "" + addr.getPort());
nodes.add(address);
}
Set<RedisURI> partitionSlaves = new HashSet<>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
Iterator<RedisURI> nodesIterator = nodes.iterator();
checkClusterState(cfg, nodesIterator, lastException);
}
Collections.shuffle(nodes);
Collections.shuffle(slaves);
// master nodes first
nodes.addAll(slaves);
});
} else {
AtomicReference<Throwable> lastException = new AtomicReference<>();
List<RedisURI> nodes = new ArrayList<>();
List<RedisURI> slaves = new ArrayList<>();
Iterator<RedisURI> nodesIterator = nodes.iterator();
for (ClusterPartition partition : getLastPartitions()) {
if (!partition.isMasterFail()) {
nodes.add(partition.getMasterAddress());
}
checkClusterState(cfg, nodesIterator, lastException);
Set<RedisURI> partitionSlaves = new HashSet<>(partition.getSlaveAddresses());
partitionSlaves.removeAll(partition.getFailedSlaveAddresses());
slaves.addAll(partitionSlaves);
}
}
Collections.shuffle(nodes);
Collections.shuffle(slaves);
// master nodes first
nodes.addAll(slaves);
Iterator<RedisURI> nodesIterator = nodes.iterator();
checkClusterState(cfg, nodesIterator, lastException);
}
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
@ -922,7 +918,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
@Override
public void shutdown(long quietPeriod, long timeout, TimeUnit unit) {
if (monitorFuture != null) {
monitorFuture.cancel(true);
monitorFuture.cancel();
}
closeNodeConnections();

@ -17,9 +17,9 @@ package org.redisson.connection;
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import org.redisson.client.RedisClient;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RedisURI;
@ -47,7 +47,7 @@ public class DNSMonitor {
private final Map<RedisURI, InetSocketAddress> masters = new HashMap<>();
private final Map<RedisURI, InetSocketAddress> slaves = new HashMap<>();
private ScheduledFuture<?> dnsMonitorFuture;
private volatile Timeout dnsMonitorFuture;
private long dnsMonitoringInterval;
public DNSMonitor(ConnectionManager connectionManager, RedisClient masterHost, Collection<RedisURI> slaveHosts, long dnsMonitoringInterval, AddressResolverGroup<InetSocketAddress> resolverGroup) {
@ -72,12 +72,12 @@ public class DNSMonitor {
public void stop() {
if (dnsMonitorFuture != null) {
dnsMonitorFuture.cancel(true);
dnsMonitorFuture.cancel();
}
}
private void monitorDnsChange() {
dnsMonitorFuture = connectionManager.getServiceManager().getGroup().schedule(() -> {
dnsMonitorFuture = connectionManager.getServiceManager().newTimeout(t -> {
if (connectionManager.getServiceManager().isShuttingDown()) {
return;
}

@ -15,7 +15,7 @@
*/
package org.redisson.connection;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.Timeout;
import org.redisson.api.NodeType;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
@ -59,7 +59,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
private final AtomicReference<InetSocketAddress> currentMaster = new AtomicReference<>();
private ScheduledFuture<?> monitorFuture;
private volatile Timeout monitorFuture;
private enum Role {
master,
@ -129,7 +129,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
return;
}
monitorFuture = serviceManager.getGroup().schedule(() -> {
monitorFuture = serviceManager.newTimeout(t -> {
if (serviceManager.isShuttingDown()) {
return;
}
@ -260,7 +260,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
@Override
public void shutdown(long quietPeriod, long timeout, TimeUnit unit) {
if (monitorFuture != null) {
monitorFuture.cancel(true);
monitorFuture.cancel();
}
closeNodeConnections();

@ -16,9 +16,9 @@
package org.redisson.connection;
import io.netty.util.NetUtil;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.StringUtil;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
@ -56,7 +56,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final ConcurrentMap<RedisURI, RedisClient> sentinels = new ConcurrentHashMap<>();
private final AtomicReference<RedisURI> currentMaster = new AtomicReference<>();
private ScheduledFuture<?> monitorFuture;
private volatile Timeout monitorFuture;
private final Set<RedisURI> disconnectedSentinels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private RedisStrictCommand<RedisURI> masterHostCommand;
@ -276,16 +276,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
private void scheduleSentinelDNSCheck() {
monitorFuture = serviceManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
AtomicInteger sentinelsCounter = new AtomicInteger(sentinelHosts.size());
performSentinelDNSCheck(future -> {
if (sentinelsCounter.decrementAndGet() == 0) {
scheduleSentinelDNSCheck();
}
});
}
monitorFuture = serviceManager.newTimeout(t -> {
AtomicInteger sentinelsCounter = new AtomicInteger(sentinelHosts.size());
performSentinelDNSCheck(future -> {
if (sentinelsCounter.decrementAndGet() == 0) {
scheduleSentinelDNSCheck();
}
});
}, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
}
@ -312,19 +309,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator<RedisClient> iterator) {
monitorFuture = serviceManager.getGroup().schedule(new Runnable() {
@Override
public void run() {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<RedisClient> iter = iterator;
if (iter == null) {
// Shuffle the list so all clients don't prefer the same sentinel
List<RedisClient> clients = new ArrayList<>(sentinels.values());
Collections.shuffle(clients);
iter = clients.iterator();
}
checkState(cfg, iter, lastException);
monitorFuture = serviceManager.newTimeout(t -> {
AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
Iterator<RedisClient> iter = iterator;
if (iter == null) {
// Shuffle the list so all clients don't prefer the same sentinel
List<RedisClient> clients = new ArrayList<>(sentinels.values());
Collections.shuffle(clients);
iter = clients.iterator();
}
checkState(cfg, iter, lastException);
}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
}
@ -682,7 +676,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override
public void shutdown(long quietPeriod, long timeout, TimeUnit unit) {
if (monitorFuture != null) {
monitorFuture.cancel(true);
monitorFuture.cancel();
}
sentinels.values().stream()

@ -88,8 +88,8 @@ public class EvictionScheduler {
public void remove(String name) {
EvictionTask task = tasks.remove(name);
if (task != null && task.getScheduledFuture() != null) {
task.getScheduledFuture().cancel(false);
if (task != null) {
task.cancel();
}
}

@ -15,7 +15,8 @@
*/
package org.redisson.eviction;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncExecutor;
import org.slf4j.Logger;
@ -30,11 +31,11 @@ import java.util.concurrent.TimeUnit;
* @author Nikita Koksharov
*
*/
abstract class EvictionTask implements Runnable {
abstract class EvictionTask implements TimerTask {
private final Logger log = LoggerFactory.getLogger(getClass());
final Deque<Integer> sizeHistory = new LinkedList<Integer>();
final Deque<Integer> sizeHistory = new LinkedList<>();
final int minDelay;
final int maxDelay;
final int keysLimit;
@ -43,7 +44,7 @@ abstract class EvictionTask implements Runnable {
final CommandAsyncExecutor executor;
ScheduledFuture<?> scheduledFuture;
volatile Timeout timeout;
EvictionTask(CommandAsyncExecutor executor) {
super();
@ -54,11 +55,11 @@ abstract class EvictionTask implements Runnable {
}
public void schedule() {
scheduledFuture = executor.getServiceManager().getGroup().schedule(this, delay, TimeUnit.SECONDS);
timeout = executor.getServiceManager().newTimeout(this, delay, TimeUnit.SECONDS);
}
public ScheduledFuture<?> getScheduledFuture() {
return scheduledFuture;
public void cancel() {
timeout.cancel();
}
abstract RFuture<Integer> execute();
@ -66,7 +67,7 @@ abstract class EvictionTask implements Runnable {
abstract String getName();
@Override
public void run() {
public void run(Timeout timeout) {
if (executor.getServiceManager().isShuttingDown()) {
return;
}

@ -97,7 +97,7 @@ public class RedissonExecutorRemoteService extends RedissonRemoteService {
startedListeners.forEach(l -> l.onStarted(request.getId()));
if (taskTimeout > 0) {
commandExecutor.getServiceManager().getGroup().schedule(() -> {
commandExecutor.getServiceManager().newTimeout(t -> {
cancelRequestFuture.complete(new RemoteServiceCancelRequest(true, false));
}, taskTimeout, TimeUnit.MILLISECONDS);
}

@ -15,7 +15,7 @@
*/
package org.redisson.remote;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.Timeout;
import org.redisson.RedissonBlockingQueue;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
@ -101,7 +101,7 @@ public abstract class BaseRemoteProxy {
addCancelHandling(requestId, responseFuture);
ScheduledFuture<?> responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture);
Timeout responseTimeoutFuture = createResponseTimeout(timeout, requestId, responseFuture);
Map<String, List<Result>> entryResponses = entry.getResponses();
List<Result> list = entryResponses.computeIfAbsent(requestId, k -> new ArrayList<>(3));
@ -122,31 +122,28 @@ public abstract class BaseRemoteProxy {
return responseFuture;
}
private <T extends RRemoteServiceResponse> ScheduledFuture<?> createResponseTimeout(long timeout, String requestId, CompletableFuture<T> responseFuture) {
return commandExecutor.getServiceManager().getGroup().schedule(new Runnable() {
@Override
public void run() {
locked.execute(() -> {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (!responseFuture.completeExceptionally(ex)) {
return;
}
List<Result> list = entry.getResponses().get(requestId);
list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(requestId);
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
});
}
private <T extends RRemoteServiceResponse> Timeout createResponseTimeout(long timeout, String requestId, CompletableFuture<T> responseFuture) {
return commandExecutor.getServiceManager().newTimeout(t -> {
locked.execute(() -> {
ResponseEntry entry = responses.get(responseQueueName);
if (entry == null) {
return;
}
RemoteServiceTimeoutException ex = new RemoteServiceTimeoutException("No response after " + timeout + "ms");
if (!responseFuture.completeExceptionally(ex)) {
return;
}
List<Result> list = entry.getResponses().get(requestId);
list.remove(0);
if (list.isEmpty()) {
entry.getResponses().remove(requestId);
}
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
}
});
}, timeout, TimeUnit.MILLISECONDS);
}
@ -166,7 +163,7 @@ public abstract class BaseRemoteProxy {
for (Iterator<Result> iterator = list.iterator(); iterator.hasNext();) {
Result result = iterator.next();
if (result.getPromise() == responseFuture) {
result.getResponseTimeoutFuture().cancel(true);
result.cancelResponseTimeout();
iterator.remove();
}
}
@ -222,7 +219,7 @@ public abstract class BaseRemoteProxy {
}
CompletableFuture<RRemoteServiceResponse> f = res.getPromise();
res.getResponseTimeoutFuture().cancel(true);
res.cancelResponseTimeout();
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);

@ -15,11 +15,12 @@
*/
package org.redisson.remote;
import io.netty.util.Timeout;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -32,9 +33,9 @@ public class ResponseEntry {
public static class Result {
private final CompletableFuture<? extends RRemoteServiceResponse> promise;
private final ScheduledFuture<?> responseTimeoutFuture;
private final Timeout responseTimeoutFuture;
public Result(CompletableFuture<? extends RRemoteServiceResponse> promise, ScheduledFuture<?> responseTimeoutFuture) {
public Result(CompletableFuture<? extends RRemoteServiceResponse> promise, Timeout responseTimeoutFuture) {
super();
this.promise = promise;
this.responseTimeoutFuture = responseTimeoutFuture;
@ -44,8 +45,8 @@ public class ResponseEntry {
return (CompletableFuture<T>) promise;
}
public ScheduledFuture<?> getResponseTimeoutFuture() {
return responseTimeoutFuture;
public void cancelResponseTimeout() {
responseTimeoutFuture.cancel();
}
}

Loading…
Cancel
Save