From 0443ea26fa3e19129d8bbace3586524469d7f1f0 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Mon, 20 Jan 2025 11:15:52 +0300 Subject: [PATCH] Feature - Unix Domain Socket (UDS) support for a single server mode. #773 --- .../java/org/redisson/client/RedisClient.java | 70 +++++++++++++------ .../redisson/client/RedisClientConfig.java | 8 +-- .../redisson/connection/ServiceManager.java | 40 +++++++++-- .../main/java/org/redisson/misc/RedisURI.java | 30 ++++++-- 4 files changed, 109 insertions(+), 39 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index d34297191..cd0cdb319 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -16,7 +16,10 @@ package org.redisson.client; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; import io.netty.channel.epoll.EpollChannelOption; import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollSocketChannel; @@ -29,6 +32,8 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.channel.unix.DomainSocketChannel; import io.netty.incubator.channel.uring.IOUringChannelOption; import io.netty.incubator.channel.uring.IOUringSocketChannel; import io.netty.resolver.AddressResolver; @@ -45,10 +50,7 @@ import org.redisson.client.handler.RedisChannelInitializer.Type; import org.redisson.misc.CompletableFutureWrapper; import org.redisson.misc.RedisURI; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketOption; -import java.net.UnknownHostException; +import java.net.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; @@ -64,7 +66,7 @@ public final class RedisClient { private final Bootstrap bootstrap; private final Bootstrap pubSubBootstrap; private final RedisURI uri; - private InetSocketAddress resolvedAddr; + private SocketAddress resolvedAddr; private final ChannelGroup channels; private final ExecutorService executor; @@ -113,11 +115,14 @@ public final class RedisClient { uri = copy.getAddress(); resolvedAddr = copy.getAddr(); - + + if (uri.isUDS()) { + resolvedAddr = new DomainSocketAddress(uri.getHost()); + } if (resolvedAddr != null) { - resolvedAddrFuture.set(CompletableFuture.completedFuture(resolvedAddr)); + resolvedAddrFuture.set(CompletableFuture.completedFuture(getAddr())); } - + channels = new DefaultChannelGroup(copy.getGroup().next()); bootstrap = createBootstrap(copy, Type.PLAIN); pubSubBootstrap = createBootstrap(copy, Type.PUBSUB); @@ -133,16 +138,19 @@ public final class RedisClient { bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type)); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout()); - bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive()); - bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()); - applyChannelOptions(config, bootstrap); + if (!DomainSocketChannel.class.isAssignableFrom(config.getSocketChannelClass())) { + applyTCPOptions(config, bootstrap); + } config.getNettyHook().afterBoostrapInitialization(bootstrap); return bootstrap; } - private void applyChannelOptions(RedisClientConfig config, Bootstrap bootstrap) { + private void applyTCPOptions(RedisClientConfig config, Bootstrap bootstrap) { + bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive()); + bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()); + if (config.getSocketChannelClass() == NioSocketChannel.class) { SocketOption countOption = null; SocketOption idleOption = null; @@ -197,7 +205,20 @@ public final class RedisClient { } public InetSocketAddress getAddr() { - return resolvedAddr; + if (resolvedAddr instanceof DomainSocketAddress) { + try { + return new InetSocketAddress(InetAddress.getByAddress(((DomainSocketAddress) resolvedAddr).path(), new byte[]{127, 0, 0, 1}), uri.getPort()) { + @Override + public String toString() { + return ((DomainSocketAddress) resolvedAddr).path(); + } + }; + } catch (UnknownHostException e) { + throw new IllegalArgumentException(e); + } + } + + return (InetSocketAddress) resolvedAddr; } public long getCommandTimeout() { @@ -241,7 +262,7 @@ public final class RedisClient { } catch (UnknownHostException e) { // skip } - promise.complete(resolvedAddr); + promise.complete((InetSocketAddress) resolvedAddr); return promise; } @@ -256,14 +277,14 @@ public final class RedisClient { InetSocketAddress resolved = future.getNow(); byte[] addr1 = resolved.getAddress().getAddress(); resolvedAddr = new InetSocketAddress(InetAddress.getByAddress(uri.getHost(), addr1), resolved.getPort()); - promise.complete(resolvedAddr); + promise.complete((InetSocketAddress) resolvedAddr); }); return promise; } public RFuture connectAsync() { - CompletableFuture addrFuture = resolveAddr(); - CompletableFuture f = addrFuture.thenCompose(res -> { + CompletionStage addrFuture = resolveSocket(); + CompletionStage f = addrFuture.thenCompose(res -> { CompletableFuture r = new CompletableFuture<>(); ChannelFuture channelFuture = bootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() { @@ -287,7 +308,7 @@ public final class RedisClient { } else { executor.execute(() -> { if (config.getConnectedListener() != null) { - config.getConnectedListener().accept(getAddr()); + config.getConnectedListener().accept((InetSocketAddress) getAddr()); } }); } @@ -312,6 +333,13 @@ public final class RedisClient { return new CompletableFutureWrapper<>(f); } + private CompletionStage resolveSocket() { + if (uri.isUDS()) { + return CompletableFuture.completedFuture(resolvedAddr); + } + return resolveAddr().thenApply(s -> s); + } + public RedisPubSubConnection connectPubSub() { try { return connectPubSubAsync().toCompletableFuture().join(); @@ -325,8 +353,8 @@ public final class RedisClient { } public RFuture connectPubSubAsync() { - CompletableFuture nameFuture = resolveAddr(); - CompletableFuture f = nameFuture.thenCompose(res -> { + CompletionStage nameFuture = resolveSocket(); + CompletionStage f = nameFuture.thenCompose(res -> { CompletableFuture r = new CompletableFuture<>(); ChannelFuture channelFuture = pubSubBootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() { diff --git a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java index b159a877f..989a0267b 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java +++ b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java @@ -16,7 +16,7 @@ package org.redisson.client; import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.resolver.AddressResolverGroup; import io.netty.util.Timer; @@ -44,7 +44,7 @@ public class RedisClientConfig { private ExecutorService executor; private EventLoopGroup group; private AddressResolverGroup resolverGroup; - private Class socketChannelClass = NioSocketChannel.class; + private Class socketChannelClass = NioSocketChannel.class; private int connectTimeout = 10000; private int commandTimeout = 10000; @@ -197,10 +197,10 @@ public class RedisClientConfig { return this; } - public Class getSocketChannelClass() { + public Class getSocketChannelClass() { return socketChannelClass; } - public RedisClientConfig setSocketChannelClass(Class socketChannelClass) { + public RedisClientConfig setSocketChannelClass(Class socketChannelClass) { this.socketChannelClass = socketChannelClass; return this; } diff --git a/redisson/src/main/java/org/redisson/connection/ServiceManager.java b/redisson/src/main/java/org/redisson/connection/ServiceManager.java index 2cf74fae6..fcf30aa2a 100644 --- a/redisson/src/main/java/org/redisson/connection/ServiceManager.java +++ b/redisson/src/main/java/org/redisson/connection/ServiceManager.java @@ -18,13 +18,15 @@ package org.redisson.connection; import io.netty.buffer.ByteBufUtil; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollDomainSocketChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.kqueue.KQueueDatagramChannel; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.kqueue.KQueueSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.incubator.channel.uring.IOUringDatagramChannel; @@ -124,7 +126,7 @@ public final class ServiceManager { private final EventLoopGroup group; - private final Class socketChannelClass; + private Class socketChannelClass; private final AddressResolverGroup resolverGroup; @@ -157,6 +159,20 @@ public final class ServiceManager { public ServiceManager(MasterSlaveServersConfig config, Config cfg) { Version.logVersion(); + RedisURI u = null; + if (config.getMasterAddress() != null) { + u = new RedisURI(config.getMasterAddress()); + if (u.isUDS()) { + if (!cfg.isSingleConfig()) { + throw new IllegalStateException("UDS is supported only in a single server mode"); + } + if (cfg.getTransportMode() != TransportMode.EPOLL + && cfg.getTransportMode() != TransportMode.KQUEUE) { + throw new IllegalStateException("UDS is supported only if transportMode = EPOLL or KQUEUE"); + } + } + } + if (cfg.getTransportMode() == TransportMode.EPOLL) { if (cfg.getEventLoopGroup() == null) { if (cfg.getNettyExecutor() != null) { @@ -169,10 +185,15 @@ public final class ServiceManager { } this.socketChannelClass = EpollSocketChannel.class; + + if (u != null && u.isUDS()) { + this.socketChannelClass = EpollDomainSocketChannel.class; + } + if (PlatformDependent.isAndroid()) { this.resolverGroup = DefaultAddressResolverGroup.INSTANCE; } else { - this.resolverGroup = cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault()); + this.resolverGroup = cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, EpollSocketChannel.class, DnsServerAddressStreamProviders.platformDefault()); } } else if (cfg.getTransportMode() == TransportMode.KQUEUE) { if (cfg.getEventLoopGroup() == null) { @@ -186,7 +207,12 @@ public final class ServiceManager { } this.socketChannelClass = KQueueSocketChannel.class; - this.resolverGroup = cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault()); + + if (u != null && u.isUDS()) { + this.socketChannelClass = KQueueDomainSocketChannel.class; + } + + this.resolverGroup = cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, KQueueSocketChannel.class, DnsServerAddressStreamProviders.platformDefault()); } else if (cfg.getTransportMode() == TransportMode.IO_URING) { if (cfg.getEventLoopGroup() == null) { this.group = createIOUringGroup(cfg); @@ -195,7 +221,7 @@ public final class ServiceManager { } this.socketChannelClass = IOUringSocketChannel.class; - this.resolverGroup = cfg.getAddressResolverGroupFactory().create(IOUringDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault()); + this.resolverGroup = cfg.getAddressResolverGroupFactory().create(IOUringDatagramChannel.class, IOUringSocketChannel.class, DnsServerAddressStreamProviders.platformDefault()); } else { if (cfg.getEventLoopGroup() == null) { if (cfg.getNettyExecutor() != null) { @@ -211,7 +237,7 @@ public final class ServiceManager { if (PlatformDependent.isAndroid()) { this.resolverGroup = DefaultAddressResolverGroup.INSTANCE; } else { - this.resolverGroup = cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault()); + this.resolverGroup = cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, NioSocketChannel.class, DnsServerAddressStreamProviders.platformDefault()); } } @@ -348,7 +374,7 @@ public final class ServiceManager { return connectionWatcher; } - public Class getSocketChannelClass() { + public Class getSocketChannelClass() { return socketChannelClass; } diff --git a/redisson/src/main/java/org/redisson/misc/RedisURI.java b/redisson/src/main/java/org/redisson/misc/RedisURI.java index dbd811b3f..2def6fc60 100644 --- a/redisson/src/main/java/org/redisson/misc/RedisURI.java +++ b/redisson/src/main/java/org/redisson/misc/RedisURI.java @@ -34,8 +34,11 @@ public final class RedisURI { public static final String REDIS_PROTOCOL= "redis://"; public static final String REDIS_SSL_PROTOCOL = "rediss://"; + public static final String REDIS_UDS_PROTOCOL= "redis+uds://"; + public static final String VALKEY_PROTOCOL= "valkey://"; public static final String VALKEY_SSL_PROTOCOL = "valkeys://"; + public static final String VALKEY_UDS_PROTOCOL= "valkey+uds://"; private final String scheme; private final String host; @@ -48,7 +51,9 @@ public final class RedisURI { return url.startsWith(REDIS_PROTOCOL) || url.startsWith(REDIS_SSL_PROTOCOL) || url.startsWith(VALKEY_PROTOCOL) - || url.startsWith(VALKEY_SSL_PROTOCOL); + || url.startsWith(VALKEY_SSL_PROTOCOL) + || url.startsWith(REDIS_UDS_PROTOCOL) + || url.startsWith(VALKEY_UDS_PROTOCOL); } public RedisURI(String scheme, String host, int port) { @@ -63,13 +68,21 @@ public final class RedisURI { throw new IllegalArgumentException("Redis url should start with redis:// or rediss:// (for SSL connection)"); } - if (uri.split(":").length < 3) { - throw new IllegalArgumentException("Redis url doesn't contain a port"); - } - - String urlHost = parseUrl(uri); + scheme = uri.split("://")[0]; try { + if (isUDS()) { + host = uri.split("://")[1]; + port = 0; + return; + } + + if (uri.split(":").length < 3) { + throw new IllegalArgumentException("Redis url doesn't contain a port"); + } + + String urlHost = parseUrl(uri); + URL url = new URL(urlHost); if (url.getUserInfo() != null) { String[] details = url.getUserInfo().split(":", 2); @@ -86,7 +99,6 @@ public final class RedisURI { host = url.getHost(); port = url.getPort(); - scheme = uri.split("://")[0]; } catch (MalformedURLException | UnsupportedEncodingException e) { throw new IllegalArgumentException(e); } @@ -129,6 +141,10 @@ public final class RedisURI { return port; } + public boolean isUDS() { + return "redis+uds".equals(scheme) || "valkey+uds".equals(scheme); + } + public boolean isIP() { return NetUtil.createByteArrayFromIpAddressString(host) != null; }