Improvement - Virtual Threads compatibility #5499

pull/5520/head
Nikita Koksharov 1 year ago
parent 4ae1d46fd7
commit 7b89326c7a

@ -20,7 +20,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import org.redisson.RedissonShutdownException;
import org.redisson.ScanResult;
import org.redisson.api.NodeType;
@ -47,7 +46,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
@ -441,10 +443,6 @@ public class RedisExecutor<V, R> {
}
private void handleBlockingOperations(CompletableFuture<R> attemptPromise, RedisConnection connection, long popTimeout) {
FutureListener<Void> listener = f -> {
mainPromise.completeExceptionally(new RedissonShutdownException("Redisson is shutdown"));
};
Timeout scheduledFuture;
if (popTimeout != 0) {
// handling cases when connection has been lost
@ -457,14 +455,13 @@ public class RedisExecutor<V, R> {
scheduledFuture = null;
}
connectionManager.getServiceManager().addFuture(mainPromise);
mainPromise.whenComplete((res, e) -> {
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
synchronized (listener) {
connectionManager.getServiceManager().getShutdownPromise().removeListener(listener);
}
connectionManager.getServiceManager().removeFuture(mainPromise);
// handling cancel operation for blocking commands
if ((mainPromise.isCancelled()
@ -481,12 +478,6 @@ public class RedisExecutor<V, R> {
attemptPromise.completeExceptionally(e);
}
});
synchronized (listener) {
if (!mainPromise.isDone()) {
connectionManager.getServiceManager().getShutdownPromise().addListener(listener);
}
}
}
protected Throwable cause(CompletableFuture<?> future) {

@ -529,7 +529,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
serviceManager.getShutdownPromise().trySuccess(null);
serviceManager.shutdownFutures();
serviceManager.getShutdownLatch().awaitUninterruptibly();
if (serviceManager.getCfg().getEventLoopGroup() == null) {

@ -42,6 +42,7 @@ import io.netty.util.concurrent.*;
import io.netty.util.internal.PlatformDependent;
import org.redisson.ElementsSubscribeService;
import org.redisson.QueueTransferService;
import org.redisson.RedissonShutdownException;
import org.redisson.Version;
import org.redisson.api.NatMapper;
import org.redisson.api.RFuture;
@ -128,8 +129,6 @@ public class ServiceManager {
private IdleConnectionWatcher connectionWatcher;
private final Promise<Void> shutdownPromise = ImmediateEventExecutor.INSTANCE.newPromise();
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
private final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService(this);
@ -305,8 +304,18 @@ public class ServiceManager {
return socketChannelClass;
}
public Promise<Void> getShutdownPromise() {
return shutdownPromise;
private final Set<CompletableFuture<?>> futures = Collections.newSetFromMap(new ConcurrentHashMap<>());
public void addFuture(CompletableFuture<?> future) {
futures.add(future);
}
public void removeFuture(CompletableFuture<?> future) {
futures.remove(future);
}
public void shutdownFutures() {
futures.forEach(f -> f.completeExceptionally(new RedissonShutdownException("Redisson is shutdown")));
}
public InfinitySemaphoreLatch getShutdownLatch() {

Loading…
Cancel
Save