diff --git a/redisson/pom.xml b/redisson/pom.xml index ce4cdbc25..3d3be63ae 100644 --- a/redisson/pom.xml +++ b/redisson/pom.xml @@ -458,6 +458,16 @@ --> + + org.apache.maven.plugins + maven-compiler-plugin + 3.12.1 + + 21 + 21 + + + maven-javadoc-plugin diff --git a/redisson/src/main/java/org/redisson/config/Config.java b/redisson/src/main/java/org/redisson/config/Config.java index 8a55938b1..5aa0ffebc 100644 --- a/redisson/src/main/java/org/redisson/config/Config.java +++ b/redisson/src/main/java/org/redisson/config/Config.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.Reader; import java.net.URL; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -58,6 +59,8 @@ public class Config { private int nettyThreads = 32; + private Executor nettyExecutor; + private Codec codec; private ExecutorService executor; @@ -103,6 +106,7 @@ public class Config { public Config(Config oldConf) { setNettyHook(oldConf.getNettyHook()); + setNettyExecutor(oldConf.getNettyExecutor()); setExecutor(oldConf.getExecutor()); if (oldConf.getCodec() == null) { @@ -475,9 +479,29 @@ public class Config { return nettyThreads; } + public Executor getNettyExecutor() { + return nettyExecutor; + } + + /** + * Use external Executor for Netty. + *

+ * For example, it allows to define Executors.newVirtualThreadPerTaskExecutor() + * to use virtual threads. + *

+ * The caller is responsible for closing the Executor. + * + * @param nettyExecutor netty executor object + * @return config + */ + public Config setNettyExecutor(Executor nettyExecutor) { + this.nettyExecutor = nettyExecutor; + return this; + } + /** * Use external ExecutorService. ExecutorService processes - * all listeners of RTopic, + * all listeners of RTopic, RPatternTopic * RRemoteService invocation handlers * and RExecutorService tasks. *

diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 03cbcea56..92ab94b2e 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -155,7 +155,11 @@ public final class ServiceManager { if (cfg.getTransportMode() == TransportMode.EPOLL) { if (cfg.getEventLoopGroup() == null) { - this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); + if (cfg.getNettyExecutor() != null) { + this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), cfg.getNettyExecutor()); + } else { + this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); + } } else { this.group = cfg.getEventLoopGroup(); } @@ -168,7 +172,11 @@ public final class ServiceManager { } } else if (cfg.getTransportMode() == TransportMode.KQUEUE) { if (cfg.getEventLoopGroup() == null) { - this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); + if (cfg.getNettyExecutor() != null) { + this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), cfg.getNettyExecutor()); + } else { + this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); + } } else { this.group = cfg.getEventLoopGroup(); } @@ -186,7 +194,11 @@ public final class ServiceManager { this.resolverGroup = cfg.getAddressResolverGroupFactory().create(IOUringDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault()); } else { if (cfg.getEventLoopGroup() == null) { - this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); + if (cfg.getNettyExecutor() != null) { + this.group = new NioEventLoopGroup(cfg.getNettyThreads(), cfg.getNettyExecutor()); + } else { + this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty")); + } } else { this.group = cfg.getEventLoopGroup(); } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index e0f06889c..188bc485f 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -48,6 +48,18 @@ import static org.awaitility.Awaitility.await; public class RedissonTest extends BaseTest { + @Test + public void testVirtualThreads() { + Config c = redisson.getConfig(); + c.setNettyExecutor(Executors.newVirtualThreadPerTaskExecutor()); + + RedissonClient r = Redisson.create(c); + RBucket b = r.getBucket("test"); + b.set("1"); + assertThat(b.get()).isEqualTo("1"); + r.shutdown(); + } + @Test public void testStopThreads() throws IOException { Set threads = Thread.getAllStackTraces().keySet();