Feature - nettyExecutor setting added. #5486

pull/5644/head
Nikita Koksharov 1 year ago
parent 0ee5155b57
commit 74ae7d5bac

@ -458,6 +458,16 @@
</plugin>
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.12.1</version>
<configuration>
<testSource>21</testSource>
<testTarget>21</testTarget>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
</plugin>

@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.net.URL;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -58,6 +59,8 @@ public class Config {
private int nettyThreads = 32;
private Executor nettyExecutor;
private Codec codec;
private ExecutorService executor;
@ -103,6 +106,7 @@ public class Config {
public Config(Config oldConf) {
setNettyHook(oldConf.getNettyHook());
setNettyExecutor(oldConf.getNettyExecutor());
setExecutor(oldConf.getExecutor());
if (oldConf.getCodec() == null) {
@ -475,9 +479,29 @@ public class Config {
return nettyThreads;
}
public Executor getNettyExecutor() {
return nettyExecutor;
}
/**
* Use external Executor for Netty.
* <p>
* For example, it allows to define <code>Executors.newVirtualThreadPerTaskExecutor()</code>
* to use virtual threads.
* <p>
* The caller is responsible for closing the Executor.
*
* @param nettyExecutor netty executor object
* @return config
*/
public Config setNettyExecutor(Executor nettyExecutor) {
this.nettyExecutor = nettyExecutor;
return this;
}
/**
* Use external ExecutorService. ExecutorService processes
* all listeners of <code>RTopic</code>,
* all listeners of <code>RTopic</code>, <code>RPatternTopic</code>
* <code>RRemoteService</code> invocation handlers
* and <code>RExecutorService</code> tasks.
* <p>

@ -155,7 +155,11 @@ public final class ServiceManager {
if (cfg.getTransportMode() == TransportMode.EPOLL) {
if (cfg.getEventLoopGroup() == null) {
this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
if (cfg.getNettyExecutor() != null) {
this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), cfg.getNettyExecutor());
} else {
this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
}
} else {
this.group = cfg.getEventLoopGroup();
}
@ -168,7 +172,11 @@ public final class ServiceManager {
}
} else if (cfg.getTransportMode() == TransportMode.KQUEUE) {
if (cfg.getEventLoopGroup() == null) {
this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
if (cfg.getNettyExecutor() != null) {
this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), cfg.getNettyExecutor());
} else {
this.group = new KQueueEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
}
} else {
this.group = cfg.getEventLoopGroup();
}
@ -186,7 +194,11 @@ public final class ServiceManager {
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(IOUringDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault());
} else {
if (cfg.getEventLoopGroup() == null) {
this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
if (cfg.getNettyExecutor() != null) {
this.group = new NioEventLoopGroup(cfg.getNettyThreads(), cfg.getNettyExecutor());
} else {
this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
}
} else {
this.group = cfg.getEventLoopGroup();
}

@ -48,6 +48,18 @@ import static org.awaitility.Awaitility.await;
public class RedissonTest extends BaseTest {
@Test
public void testVirtualThreads() {
Config c = redisson.getConfig();
c.setNettyExecutor(Executors.newVirtualThreadPerTaskExecutor());
RedissonClient r = Redisson.create(c);
RBucket<String> b = r.getBucket("test");
b.set("1");
assertThat(b.get()).isEqualTo("1");
r.shutdown();
}
@Test
public void testStopThreads() throws IOException {
Set<Thread> threads = Thread.getAllStackTraces().keySet();

Loading…
Cancel
Save