Fixed - RedissonClient.shutdown() method hangs at serviceManager.getShutdownLatch() invocation. #5637

pull/5661/head
Nikita Koksharov 1 year ago
parent 5ac1856e13
commit 2b23c864cb

@ -303,6 +303,9 @@ public class RedisConnection implements RedisCommands {
} else {
RFuture<Void> f = async(RedisCommands.QUIT);
f.whenComplete((res, e) -> {
if (redisClient.isShutdown()) {
return;
}
channel.close();
});
}

@ -422,7 +422,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
scheduleClusterChangeCheck(cfg);
return;
}
if (!serviceManager.getShutdownLatch().acquire()) {
if (serviceManager.isShuttingDown()) {
return;
}
RedisURI uri = iterator.next();
@ -430,7 +430,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
connectionFuture.whenComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
serviceManager.getShutdownLatch().release();
checkClusterState(cfg, iterator, lastException);
return;
}
@ -446,14 +445,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if (e != null) {
log.error("Unable to execute {}", clusterNodesCommand, e);
lastException.set(e);
serviceManager.getShutdownLatch().release();
checkClusterState(cfg, iterator, lastException);
return;
}
if (nodes.isEmpty()) {
log.debug("cluster nodes state got from {}: doesn't contain any nodes", connection.getRedisClient().getAddr());
serviceManager.getShutdownLatch().release();
checkClusterState(cfg, iterator, lastException);
return;
}
@ -478,7 +475,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
log.error("Unable to parse cluster nodes state got from: {}:\n{}", connection.getRedisClient().getAddr(), nodesValue, ex);
lastException.set(ex);
serviceManager.getShutdownLatch().release();
checkClusterState(cfg, iterator, lastException);
}
})
@ -489,7 +485,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
.thenApply(newPartitions -> {
checkSlotsMigration(newPartitions);
checkSlotsChange(newPartitions);
serviceManager.getShutdownLatch().release();
scheduleClusterChangeCheck(cfg);
return newPartitions;
});

@ -117,7 +117,7 @@ public class RedisExecutor<V, R> {
return;
}
if (!connectionManager.getServiceManager().getShutdownLatch().acquire()) {
if (connectionManager.getServiceManager().isShuttingDown()) {
free();
mainPromise.completeExceptionally(new RedissonShutdownException("Redisson is shutdown"));
return;
@ -154,7 +154,11 @@ public class RedisExecutor<V, R> {
};
if (attempt == 0) {
connectionManager.getServiceManager().addFuture(mainPromise);
mainPromise.whenComplete((r, e) -> {
connectionManager.getServiceManager().removeFuture(mainPromise);
if (this.mainPromiseListener != null) {
this.mainPromiseListener.accept(r, e);
}
@ -167,12 +171,10 @@ public class RedisExecutor<V, R> {
connectionFuture.whenComplete((connection, e) -> {
if (connectionFuture.isCancelled()) {
connectionManager.getServiceManager().getShutdownLatch().release();
return;
}
if (connectionFuture.isDone() && connectionFuture.isCompletedExceptionally()) {
connectionManager.getServiceManager().getShutdownLatch().release();
exception = convertException(connectionFuture);
tryComplete(attemptPromise, exception);
return;
@ -456,14 +458,11 @@ public class RedisExecutor<V, R> {
scheduledFuture = null;
}
connectionManager.getServiceManager().addFuture(mainPromise);
mainPromise.whenComplete((res, e) -> {
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
connectionManager.getServiceManager().removeFuture(mainPromise);
// handling cancel operation for blocking commands
if ((mainPromise.isCancelled()
|| e instanceof InterruptedException)
@ -673,7 +672,6 @@ public class RedisExecutor<V, R> {
}
RedisConnection connection = getNow(connectionFuture);
connectionManager.getServiceManager().getShutdownLatch().release();
if (connectionManager.getServiceManager().getConfig().getMasterConnectionPoolSize() < 10) {
if (source.getRedirect() == Redirect.ASK
|| getClass() != RedisExecutor.class

@ -99,8 +99,6 @@ public class RedisQueuedBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R>
if (RedisCommands.EXEC.getName().equals(command.getName())
|| RedisCommands.DISCARD.getName().equals(command.getName())) {
super.releaseConnection(attemptPromise, connectionFuture);
} else {
connectionManager.getServiceManager().getShutdownLatch().release();
}
}

@ -490,7 +490,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public void shutdown() {
shutdown(0, 2, TimeUnit.SECONDS); //default netty value
shutdown(2, 15, TimeUnit.SECONDS); //default netty value
}
@Override
@ -500,7 +500,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
long timeoutInNanos = unit.toNanos(timeout);
serviceManager.close();
serviceManager.getConnectionWatcher().stop();
serviceManager.getResolverGroup().close();
long startTime = System.nanoTime();
serviceManager.shutdownFutures(timeoutInNanos, TimeUnit.NANOSECONDS);
timeoutInNanos = Math.max(0, timeoutInNanos - System.nanoTime() - startTime);
if (isInitialized()) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
@ -510,7 +516,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
long startTime = System.nanoTime();
startTime = System.nanoTime();
future.get(timeoutInNanos, TimeUnit.NANOSECONDS);
timeoutInNanos = Math.max(0, timeoutInNanos - System.nanoTime() - startTime);
} catch (Exception e) {
@ -518,13 +524,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
serviceManager.getResolverGroup().close();
serviceManager.getShutdownLatch().close();
if (serviceManager.getCfg().getExecutor() == null) {
serviceManager.getExecutor().shutdown();
try {
long startTime = System.nanoTime();
startTime = System.nanoTime();
serviceManager.getExecutor().awaitTermination(timeoutInNanos, TimeUnit.NANOSECONDS);
timeoutInNanos = Math.max(0, timeoutInNanos - System.nanoTime() - startTime);
} catch (InterruptedException e) {
@ -532,8 +535,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
serviceManager.shutdownFutures();
serviceManager.getShutdownLatch().awaitUninterruptibly();
serviceManager.getTimer().stop();
if (serviceManager.getCfg().getEventLoopGroup() == null) {
if (timeoutInNanos < quietPeriod) {
@ -541,10 +543,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
serviceManager.getGroup()
.shutdownGracefully(unit.toNanos(quietPeriod), timeoutInNanos, TimeUnit.NANOSECONDS)
.awaitUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS);
.syncUninterruptibly();
}
serviceManager.getTimer().stop();
}
private boolean isInitialized() {

@ -334,7 +334,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
f.thenAccept(r -> scheduleChangeCheck(cfg, null));
return;
}
if (!serviceManager.getShutdownLatch().acquire()) {
if (serviceManager.isShuttingDown()) {
return;
}
@ -348,7 +348,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
connectionFuture.whenComplete((connection, e) -> {
if (e != null) {
lastException.set(e);
serviceManager.getShutdownLatch().release();
checkState(cfg, iterator, lastException);
return;
}
@ -377,7 +376,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.error("Can't execute SENTINEL commands on {}", connection.getRedisClient().getAddr(), e);
}
serviceManager.getShutdownLatch().release();
if (e != null) {
scheduleChangeCheck(cfg, iterator);
} else {

@ -59,7 +59,6 @@ import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.Protocol;
import org.redisson.config.TransportMode;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RedisURI;
import org.redisson.misc.WrappedLock;
import org.redisson.remote.ResponseEntry;
@ -134,7 +133,7 @@ public final class ServiceManager {
private IdleConnectionWatcher connectionWatcher;
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
private final AtomicBoolean shutdownLatch = new AtomicBoolean();
private final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService(this);
@ -273,7 +272,7 @@ public final class ServiceManager {
}
public boolean isShuttingDown() {
return shutdownLatch.isClosed();
return shutdownLatch.get();
}
public boolean isShutdown() {
@ -344,12 +343,20 @@ public final class ServiceManager {
futures.remove(future);
}
public void shutdownFutures() {
futures.forEach(f -> f.completeExceptionally(new RedissonShutdownException("Redisson is shutdown")));
public void shutdownFutures(long timeout, TimeUnit unit) {
CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
try {
future.get(timeout, unit);
} catch (InterruptedException | ExecutionException e) {
// skip
} catch (TimeoutException e) {
futures.forEach(f -> f.completeExceptionally(new RedissonShutdownException("Redisson is shutdown")));
}
futures.clear();
}
public InfinitySemaphoreLatch getShutdownLatch() {
return shutdownLatch;
public void close() {
shutdownLatch.set(true);
}
public RedisNodeNotFoundException createNodeNotFoundException(NodeSource source) {

@ -1,113 +0,0 @@
/**
* Copyright (c) 2013-2024 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.misc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
*
* Code parts from Manik Surtani (<a href="mailto:manik@jboss.org">manik@jboss.org</a>)
* @author Nikita Koksharov
*/
// TODO refactor to AbstractQueuedLongSynchronizer
@SuppressWarnings({"MultipleVariableDeclarations", "AvoidInlineConditionals", "UpperEll"})
public class InfinitySemaphoreLatch extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1744280161777661090l;
volatile boolean closed;
AtomicInteger sharedResources = new AtomicInteger();
// the following states are used in the AQS.
private static final int OPEN_STATE = 0;
public InfinitySemaphoreLatch() {
setState(OPEN_STATE);
}
@Override
public final int tryAcquireShared(int ignored) {
// return 1 if we allow the requestor to proceed, -1 if we want the
// requestor to block.
return getState() == OPEN_STATE ? 1 : -1;
}
@Override
public final boolean tryReleaseShared(int state) {
// used as a mechanism to set the state of the Sync.
setState(state);
return true;
}
public final boolean acquireAmount(int amount) {
if (closed) {
return false;
}
releaseShared(sharedResources.addAndGet(amount));
return true;
}
public final boolean acquire() {
if (closed) {
return false;
}
releaseShared(sharedResources.incrementAndGet());
return true;
}
public final void release() {
// do not use setState() directly since this won't notify parked
// threads.
releaseShared(sharedResources.decrementAndGet());
}
public boolean isOpened() {
return getState() == OPEN_STATE;
}
public boolean isClosed() {
return closed;
}
public void close() {
closed = true;
}
// waiting for an open state
public final boolean awaitUninterruptibly() {
try {
return await(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
private boolean await(long time, TimeUnit unit) throws InterruptedException {
return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy
// value that is
// not used.
}
@Override
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return "ReclosableLatch [State = " + s + ", " + q + "empty queue]";
}
}
Loading…
Cancel
Save