Feature - Unix Domain Socket (UDS) support for a single server mode. #773

pull/6404/head^2
Nikita Koksharov 2 weeks ago
parent ae497b8059
commit 0443ea26fa

@ -16,7 +16,10 @@
package org.redisson.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollSocketChannel;
@ -29,6 +32,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.incubator.channel.uring.IOUringChannelOption;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import io.netty.resolver.AddressResolver;
@ -45,10 +50,7 @@ import org.redisson.client.handler.RedisChannelInitializer.Type;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedisURI;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.net.UnknownHostException;
import java.net.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
@ -64,7 +66,7 @@ public final class RedisClient {
private final Bootstrap bootstrap;
private final Bootstrap pubSubBootstrap;
private final RedisURI uri;
private InetSocketAddress resolvedAddr;
private SocketAddress resolvedAddr;
private final ChannelGroup channels;
private final ExecutorService executor;
@ -114,8 +116,11 @@ public final class RedisClient {
uri = copy.getAddress();
resolvedAddr = copy.getAddr();
if (uri.isUDS()) {
resolvedAddr = new DomainSocketAddress(uri.getHost());
}
if (resolvedAddr != null) {
resolvedAddrFuture.set(CompletableFuture.completedFuture(resolvedAddr));
resolvedAddrFuture.set(CompletableFuture.completedFuture(getAddr()));
}
channels = new DefaultChannelGroup(copy.getGroup().next());
@ -133,16 +138,19 @@ public final class RedisClient {
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
applyChannelOptions(config, bootstrap);
if (!DomainSocketChannel.class.isAssignableFrom(config.getSocketChannelClass())) {
applyTCPOptions(config, bootstrap);
}
config.getNettyHook().afterBoostrapInitialization(bootstrap);
return bootstrap;
}
private void applyChannelOptions(RedisClientConfig config, Bootstrap bootstrap) {
private void applyTCPOptions(RedisClientConfig config, Bootstrap bootstrap) {
bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
if (config.getSocketChannelClass() == NioSocketChannel.class) {
SocketOption<Integer> countOption = null;
SocketOption<Integer> idleOption = null;
@ -197,7 +205,20 @@ public final class RedisClient {
}
public InetSocketAddress getAddr() {
return resolvedAddr;
if (resolvedAddr instanceof DomainSocketAddress) {
try {
return new InetSocketAddress(InetAddress.getByAddress(((DomainSocketAddress) resolvedAddr).path(), new byte[]{127, 0, 0, 1}), uri.getPort()) {
@Override
public String toString() {
return ((DomainSocketAddress) resolvedAddr).path();
}
};
} catch (UnknownHostException e) {
throw new IllegalArgumentException(e);
}
}
return (InetSocketAddress) resolvedAddr;
}
public long getCommandTimeout() {
@ -241,7 +262,7 @@ public final class RedisClient {
} catch (UnknownHostException e) {
// skip
}
promise.complete(resolvedAddr);
promise.complete((InetSocketAddress) resolvedAddr);
return promise;
}
@ -256,14 +277,14 @@ public final class RedisClient {
InetSocketAddress resolved = future.getNow();
byte[] addr1 = resolved.getAddress().getAddress();
resolvedAddr = new InetSocketAddress(InetAddress.getByAddress(uri.getHost(), addr1), resolved.getPort());
promise.complete(resolvedAddr);
promise.complete((InetSocketAddress) resolvedAddr);
});
return promise;
}
public RFuture<RedisConnection> connectAsync() {
CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
CompletionStage<SocketAddress> addrFuture = resolveSocket();
CompletionStage<RedisConnection> f = addrFuture.thenCompose(res -> {
CompletableFuture<RedisConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = bootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
@ -287,7 +308,7 @@ public final class RedisClient {
} else {
executor.execute(() -> {
if (config.getConnectedListener() != null) {
config.getConnectedListener().accept(getAddr());
config.getConnectedListener().accept((InetSocketAddress) getAddr());
}
});
}
@ -312,6 +333,13 @@ public final class RedisClient {
return new CompletableFutureWrapper<>(f);
}
private CompletionStage<SocketAddress> resolveSocket() {
if (uri.isUDS()) {
return CompletableFuture.completedFuture(resolvedAddr);
}
return resolveAddr().thenApply(s -> s);
}
public RedisPubSubConnection connectPubSub() {
try {
return connectPubSubAsync().toCompletableFuture().join();
@ -325,8 +353,8 @@ public final class RedisClient {
}
public RFuture<RedisPubSubConnection> connectPubSubAsync() {
CompletableFuture<InetSocketAddress> nameFuture = resolveAddr();
CompletableFuture<RedisPubSubConnection> f = nameFuture.thenCompose(res -> {
CompletionStage<SocketAddress> nameFuture = resolveSocket();
CompletionStage<RedisPubSubConnection> f = nameFuture.thenCompose(res -> {
CompletableFuture<RedisPubSubConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = pubSubBootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {

@ -16,7 +16,7 @@
package org.redisson.client;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.Timer;
@ -44,7 +44,7 @@ public class RedisClientConfig {
private ExecutorService executor;
private EventLoopGroup group;
private AddressResolverGroup<InetSocketAddress> resolverGroup;
private Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class;
private Class<? extends DuplexChannel> socketChannelClass = NioSocketChannel.class;
private int connectTimeout = 10000;
private int commandTimeout = 10000;
@ -197,10 +197,10 @@ public class RedisClientConfig {
return this;
}
public Class<? extends SocketChannel> getSocketChannelClass() {
public Class<? extends DuplexChannel> getSocketChannelClass() {
return socketChannelClass;
}
public RedisClientConfig setSocketChannelClass(Class<? extends SocketChannel> socketChannelClass) {
public RedisClientConfig setSocketChannelClass(Class<? extends DuplexChannel> socketChannelClass) {
this.socketChannelClass = socketChannelClass;
return this;
}

@ -18,13 +18,15 @@ package org.redisson.connection;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.kqueue.KQueueDatagramChannel;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.incubator.channel.uring.IOUringDatagramChannel;
@ -124,7 +126,7 @@ public final class ServiceManager {
private final EventLoopGroup group;
private final Class<? extends SocketChannel> socketChannelClass;
private Class<? extends DuplexChannel> socketChannelClass;
private final AddressResolverGroup<InetSocketAddress> resolverGroup;
@ -157,6 +159,20 @@ public final class ServiceManager {
public ServiceManager(MasterSlaveServersConfig config, Config cfg) {
Version.logVersion();
RedisURI u = null;
if (config.getMasterAddress() != null) {
u = new RedisURI(config.getMasterAddress());
if (u.isUDS()) {
if (!cfg.isSingleConfig()) {
throw new IllegalStateException("UDS is supported only in a single server mode");
}
if (cfg.getTransportMode() != TransportMode.EPOLL
&& cfg.getTransportMode() != TransportMode.KQUEUE) {
throw new IllegalStateException("UDS is supported only if transportMode = EPOLL or KQUEUE");
}
}
}
if (cfg.getTransportMode() == TransportMode.EPOLL) {
if (cfg.getEventLoopGroup() == null) {
if (cfg.getNettyExecutor() != null) {
@ -169,10 +185,15 @@ public final class ServiceManager {
}
this.socketChannelClass = EpollSocketChannel.class;
if (u != null && u.isUDS()) {
this.socketChannelClass = EpollDomainSocketChannel.class;
}
if (PlatformDependent.isAndroid()) {
this.resolverGroup = DefaultAddressResolverGroup.INSTANCE;
} else {
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault());
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(EpollDatagramChannel.class, EpollSocketChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
} else if (cfg.getTransportMode() == TransportMode.KQUEUE) {
if (cfg.getEventLoopGroup() == null) {
@ -186,7 +207,12 @@ public final class ServiceManager {
}
this.socketChannelClass = KQueueSocketChannel.class;
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault());
if (u != null && u.isUDS()) {
this.socketChannelClass = KQueueDomainSocketChannel.class;
}
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(KQueueDatagramChannel.class, KQueueSocketChannel.class, DnsServerAddressStreamProviders.platformDefault());
} else if (cfg.getTransportMode() == TransportMode.IO_URING) {
if (cfg.getEventLoopGroup() == null) {
this.group = createIOUringGroup(cfg);
@ -195,7 +221,7 @@ public final class ServiceManager {
}
this.socketChannelClass = IOUringSocketChannel.class;
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(IOUringDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault());
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(IOUringDatagramChannel.class, IOUringSocketChannel.class, DnsServerAddressStreamProviders.platformDefault());
} else {
if (cfg.getEventLoopGroup() == null) {
if (cfg.getNettyExecutor() != null) {
@ -211,7 +237,7 @@ public final class ServiceManager {
if (PlatformDependent.isAndroid()) {
this.resolverGroup = DefaultAddressResolverGroup.INSTANCE;
} else {
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, socketChannelClass, DnsServerAddressStreamProviders.platformDefault());
this.resolverGroup = cfg.getAddressResolverGroupFactory().create(NioDatagramChannel.class, NioSocketChannel.class, DnsServerAddressStreamProviders.platformDefault());
}
}
@ -348,7 +374,7 @@ public final class ServiceManager {
return connectionWatcher;
}
public Class<? extends SocketChannel> getSocketChannelClass() {
public Class<? extends DuplexChannel> getSocketChannelClass() {
return socketChannelClass;
}

@ -34,8 +34,11 @@ public final class RedisURI {
public static final String REDIS_PROTOCOL= "redis://";
public static final String REDIS_SSL_PROTOCOL = "rediss://";
public static final String REDIS_UDS_PROTOCOL= "redis+uds://";
public static final String VALKEY_PROTOCOL= "valkey://";
public static final String VALKEY_SSL_PROTOCOL = "valkeys://";
public static final String VALKEY_UDS_PROTOCOL= "valkey+uds://";
private final String scheme;
private final String host;
@ -48,7 +51,9 @@ public final class RedisURI {
return url.startsWith(REDIS_PROTOCOL)
|| url.startsWith(REDIS_SSL_PROTOCOL)
|| url.startsWith(VALKEY_PROTOCOL)
|| url.startsWith(VALKEY_SSL_PROTOCOL);
|| url.startsWith(VALKEY_SSL_PROTOCOL)
|| url.startsWith(REDIS_UDS_PROTOCOL)
|| url.startsWith(VALKEY_UDS_PROTOCOL);
}
public RedisURI(String scheme, String host, int port) {
@ -63,13 +68,21 @@ public final class RedisURI {
throw new IllegalArgumentException("Redis url should start with redis:// or rediss:// (for SSL connection)");
}
if (uri.split(":").length < 3) {
throw new IllegalArgumentException("Redis url doesn't contain a port");
}
String urlHost = parseUrl(uri);
scheme = uri.split("://")[0];
try {
if (isUDS()) {
host = uri.split("://")[1];
port = 0;
return;
}
if (uri.split(":").length < 3) {
throw new IllegalArgumentException("Redis url doesn't contain a port");
}
String urlHost = parseUrl(uri);
URL url = new URL(urlHost);
if (url.getUserInfo() != null) {
String[] details = url.getUserInfo().split(":", 2);
@ -86,7 +99,6 @@ public final class RedisURI {
host = url.getHost();
port = url.getPort();
scheme = uri.split("://")[0];
} catch (MalformedURLException | UnsupportedEncodingException e) {
throw new IllegalArgumentException(e);
}
@ -129,6 +141,10 @@ public final class RedisURI {
return port;
}
public boolean isUDS() {
return "redis+uds".equals(scheme) || "valkey+uds".equals(scheme);
}
public boolean isIP() {
return NetUtil.createByteArrayFromIpAddressString(host) != null;
}

Loading…
Cancel
Save