From 7b89326c7ae6b8c19cc4234d6df06c16092b88aa Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 12 Dec 2023 13:07:34 +0300 Subject: [PATCH] Improvement - Virtual Threads compatibility #5499 --- .../org/redisson/command/RedisExecutor.java | 21 ++++++------------- .../MasterSlaveConnectionManager.java | 2 +- .../redisson/connection/ServiceManager.java | 17 +++++++++++---- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/redisson/src/main/java/org/redisson/command/RedisExecutor.java b/redisson/src/main/java/org/redisson/command/RedisExecutor.java index da951e3ea..973e1539b 100644 --- a/redisson/src/main/java/org/redisson/command/RedisExecutor.java +++ b/redisson/src/main/java/org/redisson/command/RedisExecutor.java @@ -20,7 +20,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import io.netty.util.concurrent.FutureListener; import org.redisson.RedissonShutdownException; import org.redisson.ScanResult; import org.redisson.api.NodeType; @@ -47,7 +46,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; /** @@ -441,10 +443,6 @@ public class RedisExecutor { } private void handleBlockingOperations(CompletableFuture attemptPromise, RedisConnection connection, long popTimeout) { - FutureListener listener = f -> { - mainPromise.completeExceptionally(new RedissonShutdownException("Redisson is shutdown")); - }; - Timeout scheduledFuture; if (popTimeout != 0) { // handling cases when connection has been lost @@ -457,14 +455,13 @@ public class RedisExecutor { scheduledFuture = null; } + connectionManager.getServiceManager().addFuture(mainPromise); mainPromise.whenComplete((res, e) -> { if (scheduledFuture != null) { scheduledFuture.cancel(); } - synchronized (listener) { - connectionManager.getServiceManager().getShutdownPromise().removeListener(listener); - } + connectionManager.getServiceManager().removeFuture(mainPromise); // handling cancel operation for blocking commands if ((mainPromise.isCancelled() @@ -481,12 +478,6 @@ public class RedisExecutor { attemptPromise.completeExceptionally(e); } }); - - synchronized (listener) { - if (!mainPromise.isDone()) { - connectionManager.getServiceManager().getShutdownPromise().addListener(listener); - } - } } protected Throwable cause(CompletableFuture future) { diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 9195b96c3..7d427eb2e 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -529,7 +529,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - serviceManager.getShutdownPromise().trySuccess(null); + serviceManager.shutdownFutures(); serviceManager.getShutdownLatch().awaitUninterruptibly(); if (serviceManager.getCfg().getEventLoopGroup() == null) { diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 5b035bd2e..2a9280861 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -42,6 +42,7 @@ import io.netty.util.concurrent.*; import io.netty.util.internal.PlatformDependent; import org.redisson.ElementsSubscribeService; import org.redisson.QueueTransferService; +import org.redisson.RedissonShutdownException; import org.redisson.Version; import org.redisson.api.NatMapper; import org.redisson.api.RFuture; @@ -128,8 +129,6 @@ public class ServiceManager { private IdleConnectionWatcher connectionWatcher; - private final Promise shutdownPromise = ImmediateEventExecutor.INSTANCE.newPromise(); - private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch(); private final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService(this); @@ -305,8 +304,18 @@ public class ServiceManager { return socketChannelClass; } - public Promise getShutdownPromise() { - return shutdownPromise; + private final Set> futures = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + public void addFuture(CompletableFuture future) { + futures.add(future); + } + + public void removeFuture(CompletableFuture future) { + futures.remove(future); + } + + public void shutdownFutures() { + futures.forEach(f -> f.completeExceptionally(new RedissonShutdownException("Redisson is shutdown"))); } public InfinitySemaphoreLatch getShutdownLatch() {