Fixed - "valkeys" scheme can't be used in Sentinel mode

pull/6402/merge
mrniko 4 days ago
parent 0bda4b5b44
commit fbde6ed91d

@ -26,7 +26,6 @@ import org.redisson.client.protocol.decoder.*;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.codec.CompositeCodec;
import org.redisson.misc.RedisURI;
import java.time.Duration;
import java.time.Instant;
@ -762,10 +761,6 @@ public interface RedisCommands {
RedisStrictCommand<Void> SENTINEL_REMOVE = new RedisStrictCommand<Void>("SENTINEL", "REMOVE", new VoidReplayConvertor());
RedisStrictCommand<Void> SENTINEL_MONITOR = new RedisStrictCommand<Void>("SENTINEL", "MONITOR", new VoidReplayConvertor());
RedisStrictCommand<RedisURI> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<>("SENTINEL", "GET-MASTER-ADDR-BY-NAME",
new RedisURIDecoder(false));
RedisStrictCommand<RedisURI> SENTINEL_GET_MASTER_ADDR_BY_NAME_SSL = new RedisStrictCommand<>("SENTINEL", "GET-MASTER-ADDR-BY-NAME",
new RedisURIDecoder(true));
RedisCommand<List<Map<String, String>>> SENTINEL_MASTERS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "MASTERS",
new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder()));
RedisCommand<Map<String, String>> SENTINEL_MASTER = new RedisCommand("SENTINEL", "MASTER", new ObjectMapReplayDecoder());

@ -32,13 +32,8 @@ public class RedisURIDecoder implements MultiDecoder<RedisURI> {
private final String scheme;
public RedisURIDecoder(boolean ssl) {
super();
if (ssl) {
scheme = "rediss";
} else {
scheme = "redis";
}
public RedisURIDecoder(String scheme) {
this.scheme = scheme;
}
@Override

@ -24,6 +24,7 @@ import org.redisson.client.*;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.RedisURIDecoder;
import org.redisson.config.*;
import org.redisson.misc.RedisURI;
import org.slf4j.Logger;
@ -72,18 +73,15 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
sentinelHosts.add(addr);
}
}
masterHostCommand = new RedisStrictCommand<>("SENTINEL", "GET-MASTER-ADDR-BY-NAME",
new RedisURIDecoder(scheme));
}
@Override
public void doConnect(Function<RedisURI, String> hostnameMapper) {
checkAuth(cfg);
if ("redis".equals(scheme)) {
masterHostCommand = RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME;
} else {
masterHostCommand = RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME_SSL;
}
Map<RedisURI, String> uri2hostname = new HashMap<>();
Throwable lastException = null;
for (String address : cfg.getSentinelAddresses()) {

@ -25,7 +25,9 @@ import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.Time;
import org.redisson.client.protocol.decoder.RedisURIDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.misc.RedisURI;
@ -223,7 +225,10 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
@Override
public RFuture<RedisURI> getMasterAddrAsync(String masterName) {
return executeAsync(null, StringCodec.INSTANCE, -1, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName);
RedisStrictCommand<RedisURI> masterHostCommand = new RedisStrictCommand<>("SENTINEL", "GET-MASTER-ADDR-BY-NAME",
new RedisURIDecoder(client.getConfig().getAddress().getScheme()));
return executeAsync(null, StringCodec.INSTANCE, -1, masterHostCommand, masterName);
}
@Override

@ -5,11 +5,17 @@ import com.github.dockerjava.api.model.ContainerNetwork;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProvider;
import io.netty.resolver.dns.DnsServerAddresses;
import org.junit.jupiter.api.BeforeEach;
import org.redisson.api.NatMapper;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.Protocol;
import org.redisson.connection.SequentialDnsAddressResolverFactory;
import org.redisson.misc.RedisURI;
import org.testcontainers.containers.*;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
@ -17,6 +23,7 @@ import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.*;
import java.util.function.BiConsumer;
@ -154,6 +161,8 @@ public class RedisDockerTest {
protected void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves) throws InterruptedException {
Network network = Network.newNetwork();
SimpleDnsServer dnsServer = new SimpleDnsServer();
List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();
GenericContainer<?> master =
@ -245,6 +254,16 @@ public class RedisDockerTest {
Config config = new Config();
config.setProtocol(protocol);
config.setAddressResolverGroupFactory(new SequentialDnsAddressResolverFactory() {
@Override
public AddressResolverGroup<InetSocketAddress> create(Class<? extends DatagramChannel> channelType, Class<? extends SocketChannel> socketChannelType, DnsServerAddressStreamProvider nameServerProvider) {
return super.create(channelType, socketChannelType, hostname -> {
return DnsServerAddresses.singleton(dnsServer.getAddr()).stream();
});
}
});
config.useSentinelServers()
.setPingConnectionInterval(0)
.setNatMapper(new NatMapper() {
@ -284,11 +303,14 @@ public class RedisDockerTest {
nodes.forEach(n -> n.stop());
network.close();
dnsServer.stop();
}
protected void withSentinel(BiConsumer<List<GenericContainer<?>>, Config> callback, int slaves, String password) throws InterruptedException {
Network network = Network.newNetwork();
SimpleDnsServer dnsServer = new SimpleDnsServer();
List<GenericContainer<? extends GenericContainer<?>>> nodes = new ArrayList<>();
GenericContainer<?> master =
@ -387,12 +409,23 @@ public class RedisDockerTest {
Config config = new Config();
config.setProtocol(protocol);
config.setAddressResolverGroupFactory(new SequentialDnsAddressResolverFactory() {
@Override
public AddressResolverGroup<InetSocketAddress> create(Class<? extends DatagramChannel> channelType, Class<? extends SocketChannel> socketChannelType, DnsServerAddressStreamProvider nameServerProvider) {
return super.create(channelType, socketChannelType, hostname -> {
return DnsServerAddresses.singleton(dnsServer.getAddr()).stream();
});
}
});
config.useSentinelServers()
.setPassword(password)
.setNatMapper(new NatMapper() {
@Override
public RedisURI map(RedisURI uri) {
for (GenericContainer<? extends GenericContainer<?>> node : nodes) {
if (node.getContainerInfo() == null) {
continue;
@ -426,6 +459,7 @@ public class RedisDockerTest {
nodes.forEach(n -> n.stop());
network.close();
dnsServer.stop();
}
protected void withNewCluster(BiConsumer<List<ContainerState>, RedissonClient> callback) {

@ -254,7 +254,7 @@ public class RedissonFailoverTest extends RedisDockerTest {
assertThat(futures.get(futures.size() - 1).isDone()).isTrue();
assertThat(futures.get(futures.size() - 1).toCompletableFuture().isCompletedExceptionally()).isFalse();
assertThat(errors).isBetween(30, 800);
assertThat(errors).isBetween(30, 1900);
assertThat(readonlyErrors).isZero();
redisson.shutdown();

Loading…
Cancel
Save