From 2b23c864cbad433837ccf9fd0116409e494a2ee0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 28 Feb 2024 15:44:10 +0300 Subject: [PATCH] Fixed - RedissonClient.shutdown() method hangs at serviceManager.getShutdownLatch() invocation. #5637 --- .../org/redisson/client/RedisConnection.java | 3 + .../cluster/ClusterConnectionManager.java | 7 +- .../org/redisson/command/RedisExecutor.java | 12 +- .../command/RedisQueuedBatchExecutor.java | 2 - .../MasterSlaveConnectionManager.java | 22 ++-- .../connection/SentinelConnectionManager.java | 4 +- .../redisson/connection/ServiceManager.java | 21 ++-- .../redisson/misc/InfinitySemaphoreLatch.java | 113 ------------------ 8 files changed, 35 insertions(+), 149 deletions(-) delete mode 100644 redisson/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java diff --git a/redisson/src/main/java/org/redisson/client/RedisConnection.java b/redisson/src/main/java/org/redisson/client/RedisConnection.java index 2525f2237..cda9f518e 100644 --- a/redisson/src/main/java/org/redisson/client/RedisConnection.java +++ b/redisson/src/main/java/org/redisson/client/RedisConnection.java @@ -303,6 +303,9 @@ public class RedisConnection implements RedisCommands { } else { RFuture f = async(RedisCommands.QUIT); f.whenComplete((res, e) -> { + if (redisClient.isShutdown()) { + return; + } channel.close(); }); } diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 6c3f53830..a1426beed 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -422,7 +422,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { scheduleClusterChangeCheck(cfg); return; } - if (!serviceManager.getShutdownLatch().acquire()) { + if (serviceManager.isShuttingDown()) { return; } RedisURI uri = iterator.next(); @@ -430,7 +430,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { connectionFuture.whenComplete((connection, e) -> { if (e != null) { lastException.set(e); - serviceManager.getShutdownLatch().release(); checkClusterState(cfg, iterator, lastException); return; } @@ -446,14 +445,12 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (e != null) { log.error("Unable to execute {}", clusterNodesCommand, e); lastException.set(e); - serviceManager.getShutdownLatch().release(); checkClusterState(cfg, iterator, lastException); return; } if (nodes.isEmpty()) { log.debug("cluster nodes state got from {}: doesn't contain any nodes", connection.getRedisClient().getAddr()); - serviceManager.getShutdownLatch().release(); checkClusterState(cfg, iterator, lastException); return; } @@ -478,7 +475,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } log.error("Unable to parse cluster nodes state got from: {}:\n{}", connection.getRedisClient().getAddr(), nodesValue, ex); lastException.set(ex); - serviceManager.getShutdownLatch().release(); checkClusterState(cfg, iterator, lastException); } }) @@ -489,7 +485,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { .thenApply(newPartitions -> { checkSlotsMigration(newPartitions); checkSlotsChange(newPartitions); - serviceManager.getShutdownLatch().release(); scheduleClusterChangeCheck(cfg); return newPartitions; }); diff --git a/redisson/src/main/java/org/redisson/command/RedisExecutor.java b/redisson/src/main/java/org/redisson/command/RedisExecutor.java index cdcad34b9..514c59e04 100644 --- a/redisson/src/main/java/org/redisson/command/RedisExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisExecutor.java @@ -117,7 +117,7 @@ public class RedisExecutor { return; } - if (!connectionManager.getServiceManager().getShutdownLatch().acquire()) { + if (connectionManager.getServiceManager().isShuttingDown()) { free(); mainPromise.completeExceptionally(new RedissonShutdownException("Redisson is shutdown")); return; @@ -154,7 +154,11 @@ public class RedisExecutor { }; if (attempt == 0) { + connectionManager.getServiceManager().addFuture(mainPromise); + mainPromise.whenComplete((r, e) -> { + connectionManager.getServiceManager().removeFuture(mainPromise); + if (this.mainPromiseListener != null) { this.mainPromiseListener.accept(r, e); } @@ -167,12 +171,10 @@ public class RedisExecutor { connectionFuture.whenComplete((connection, e) -> { if (connectionFuture.isCancelled()) { - connectionManager.getServiceManager().getShutdownLatch().release(); return; } if (connectionFuture.isDone() && connectionFuture.isCompletedExceptionally()) { - connectionManager.getServiceManager().getShutdownLatch().release(); exception = convertException(connectionFuture); tryComplete(attemptPromise, exception); return; @@ -456,14 +458,11 @@ public class RedisExecutor { scheduledFuture = null; } - connectionManager.getServiceManager().addFuture(mainPromise); mainPromise.whenComplete((res, e) -> { if (scheduledFuture != null) { scheduledFuture.cancel(); } - connectionManager.getServiceManager().removeFuture(mainPromise); - // handling cancel operation for blocking commands if ((mainPromise.isCancelled() || e instanceof InterruptedException) @@ -673,7 +672,6 @@ public class RedisExecutor { } RedisConnection connection = getNow(connectionFuture); - connectionManager.getServiceManager().getShutdownLatch().release(); if (connectionManager.getServiceManager().getConfig().getMasterConnectionPoolSize() < 10) { if (source.getRedirect() == Redirect.ASK || getClass() != RedisExecutor.class diff --git a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java index adc4c72ab..b84adc027 100644 --- a/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisQueuedBatchExecutor.java @@ -99,8 +99,6 @@ public class RedisQueuedBatchExecutor extends BaseRedisBatchExecutor if (RedisCommands.EXEC.getName().equals(command.getName()) || RedisCommands.DISCARD.getName().equals(command.getName())) { super.releaseConnection(attemptPromise, connectionFuture); - } else { - connectionManager.getServiceManager().getShutdownLatch().release(); } } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 8587666d3..cb278f496 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -490,7 +490,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown() { - shutdown(0, 2, TimeUnit.SECONDS); //default netty value + shutdown(2, 15, TimeUnit.SECONDS); //default netty value } @Override @@ -500,7 +500,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } long timeoutInNanos = unit.toNanos(timeout); + serviceManager.close(); serviceManager.getConnectionWatcher().stop(); + serviceManager.getResolverGroup().close(); + + long startTime = System.nanoTime(); + serviceManager.shutdownFutures(timeoutInNanos, TimeUnit.NANOSECONDS); + timeoutInNanos = Math.max(0, timeoutInNanos - System.nanoTime() - startTime); if (isInitialized()) { List> futures = new ArrayList<>(); @@ -510,7 +516,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); try { - long startTime = System.nanoTime(); + startTime = System.nanoTime(); future.get(timeoutInNanos, TimeUnit.NANOSECONDS); timeoutInNanos = Math.max(0, timeoutInNanos - System.nanoTime() - startTime); } catch (Exception e) { @@ -518,13 +524,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - serviceManager.getResolverGroup().close(); - - serviceManager.getShutdownLatch().close(); if (serviceManager.getCfg().getExecutor() == null) { serviceManager.getExecutor().shutdown(); try { - long startTime = System.nanoTime(); + startTime = System.nanoTime(); serviceManager.getExecutor().awaitTermination(timeoutInNanos, TimeUnit.NANOSECONDS); timeoutInNanos = Math.max(0, timeoutInNanos - System.nanoTime() - startTime); } catch (InterruptedException e) { @@ -532,8 +535,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - serviceManager.shutdownFutures(); - serviceManager.getShutdownLatch().awaitUninterruptibly(); + serviceManager.getTimer().stop(); if (serviceManager.getCfg().getEventLoopGroup() == null) { if (timeoutInNanos < quietPeriod) { @@ -541,10 +543,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } serviceManager.getGroup() .shutdownGracefully(unit.toNanos(quietPeriod), timeoutInNanos, TimeUnit.NANOSECONDS) - .awaitUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS); + .syncUninterruptibly(); } - - serviceManager.getTimer().stop(); } private boolean isInitialized() { diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 80d9795e3..d95b6188f 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -334,7 +334,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { f.thenAccept(r -> scheduleChangeCheck(cfg, null)); return; } - if (!serviceManager.getShutdownLatch().acquire()) { + if (serviceManager.isShuttingDown()) { return; } @@ -348,7 +348,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { connectionFuture.whenComplete((connection, e) -> { if (e != null) { lastException.set(e); - serviceManager.getShutdownLatch().release(); checkState(cfg, iterator, lastException); return; } @@ -377,7 +376,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.error("Can't execute SENTINEL commands on {}", connection.getRedisClient().getAddr(), e); } - serviceManager.getShutdownLatch().release(); if (e != null) { scheduleChangeCheck(cfg, iterator); } else { diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 92ab94b2e..78c38f7b4 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -59,7 +59,6 @@ import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.Protocol; import org.redisson.config.TransportMode; import org.redisson.misc.CompletableFutureWrapper; -import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RedisURI; import org.redisson.misc.WrappedLock; import org.redisson.remote.ResponseEntry; @@ -134,7 +133,7 @@ public final class ServiceManager { private IdleConnectionWatcher connectionWatcher; - private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); + private final AtomicBoolean shutdownLatch = new AtomicBoolean(); private final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService(this); @@ -273,7 +272,7 @@ public final class ServiceManager { } public boolean isShuttingDown() { - return shutdownLatch.isClosed(); + return shutdownLatch.get(); } public boolean isShutdown() { @@ -344,12 +343,20 @@ public final class ServiceManager { futures.remove(future); } - public void shutdownFutures() { - futures.forEach(f -> f.completeExceptionally(new RedissonShutdownException("Redisson is shutdown"))); + public void shutdownFutures(long timeout, TimeUnit unit) { + CompletableFuture future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + try { + future.get(timeout, unit); + } catch (InterruptedException | ExecutionException e) { + // skip + } catch (TimeoutException e) { + futures.forEach(f -> f.completeExceptionally(new RedissonShutdownException("Redisson is shutdown"))); + } + futures.clear(); } - public InfinitySemaphoreLatch getShutdownLatch() { - return shutdownLatch; + public void close() { + shutdownLatch.set(true); } public RedisNodeNotFoundException createNodeNotFoundException(NodeSource source) { diff --git a/redisson/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java b/redisson/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java deleted file mode 100644 index 2d6a5cf54..000000000 --- a/redisson/src/main/java/org/redisson/misc/InfinitySemaphoreLatch.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Copyright (c) 2013-2024 Nikita Koksharov - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.redisson.misc; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; - -/** - * - * Code parts from Manik Surtani (manik@jboss.org) - * @author Nikita Koksharov - */ -// TODO refactor to AbstractQueuedLongSynchronizer -@SuppressWarnings({"MultipleVariableDeclarations", "AvoidInlineConditionals", "UpperEll"}) -public class InfinitySemaphoreLatch extends AbstractQueuedSynchronizer { - - private static final long serialVersionUID = 1744280161777661090l; - - volatile boolean closed; - AtomicInteger sharedResources = new AtomicInteger(); - - // the following states are used in the AQS. - private static final int OPEN_STATE = 0; - - public InfinitySemaphoreLatch() { - setState(OPEN_STATE); - } - - @Override - public final int tryAcquireShared(int ignored) { - // return 1 if we allow the requestor to proceed, -1 if we want the - // requestor to block. - return getState() == OPEN_STATE ? 1 : -1; - } - - @Override - public final boolean tryReleaseShared(int state) { - // used as a mechanism to set the state of the Sync. - setState(state); - return true; - } - - public final boolean acquireAmount(int amount) { - if (closed) { - return false; - } - releaseShared(sharedResources.addAndGet(amount)); - return true; - } - - public final boolean acquire() { - if (closed) { - return false; - } - releaseShared(sharedResources.incrementAndGet()); - return true; - } - - public final void release() { - // do not use setState() directly since this won't notify parked - // threads. - releaseShared(sharedResources.decrementAndGet()); - } - - public boolean isOpened() { - return getState() == OPEN_STATE; - } - - public boolean isClosed() { - return closed; - } - - public void close() { - closed = true; - } - - // waiting for an open state - public final boolean awaitUninterruptibly() { - try { - return await(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - - private boolean await(long time, TimeUnit unit) throws InterruptedException { - return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy - // value that is - // not used. - } - - @Override - public String toString() { - int s = getState(); - String q = hasQueuedThreads() ? "non" : ""; - return "ReclosableLatch [State = " + s + ", " + q + "empty queue]"; - } -}