Feature - Redis hostnames support for Sentinel mode #3805

pull/3806/head
Nikita Koksharov 3 years ago
parent 2bb23815db
commit 1801f6996b

@ -15,7 +15,8 @@
*/
package org.redisson.api.redisnode;
import java.net.InetSocketAddress;
import org.redisson.misc.RedisURI;
import java.util.List;
import java.util.Map;
@ -33,7 +34,7 @@ public interface RedisSentinel extends RedisNode, RedisSentinelAsync {
* @param masterName - name of master
* @return network address
*/
InetSocketAddress getMasterAddr(String masterName);
RedisURI getMasterAddr(String masterName);
/**
* Returns list of map containing info regarding Redis Sentinel server

@ -16,8 +16,8 @@
package org.redisson.api.redisnode;
import org.redisson.api.RFuture;
import org.redisson.misc.RedisURI;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
@ -35,7 +35,7 @@ public interface RedisSentinelAsync extends RedisNodeAsync {
* @param masterName - name of master
* @return network address
*/
RFuture<InetSocketAddress> getMasterAddrAsync(String masterName);
RFuture<RedisURI> getMasterAddrAsync(String masterName);
/**
* Returns list of map containing info regarding Redis Sentinel server

@ -24,8 +24,8 @@ import org.redisson.client.protocol.convertor.*;
import org.redisson.client.protocol.decoder.*;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
import org.redisson.cluster.ClusterNodeInfo;
import org.redisson.misc.RedisURI;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@ -459,8 +459,8 @@ public interface RedisCommands {
RedisStrictCommand<Void> SENTINEL_REMOVE = new RedisStrictCommand<Void>("SENTINEL", "REMOVE", new VoidReplayConvertor());
RedisStrictCommand<Void> SENTINEL_MONITOR = new RedisStrictCommand<Void>("SENTINEL", "MONITOR", new VoidReplayConvertor());
RedisStrictCommand<InetSocketAddress> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<>("SENTINEL", "GET-MASTER-ADDR-BY-NAME",
new InetSocketAddressDecoder());
RedisStrictCommand<RedisURI> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<>("SENTINEL", "GET-MASTER-ADDR-BY-NAME",
new RedisURIDecoder());
RedisCommand<List<Map<String, String>>> SENTINEL_MASTERS = new RedisCommand<List<Map<String, String>>>("SENTINEL", "MASTERS",
new ListMultiDecoder2(new ListResultReplayDecoder(), new ObjectMapReplayDecoder()));
RedisCommand<Map<String, String>> SENTINEL_MASTER = new RedisCommand("SENTINEL", "MASTER", new ObjectMapReplayDecoder());

@ -19,10 +19,8 @@ import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;
import org.redisson.misc.RedisURI;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
/**
@ -30,7 +28,7 @@ import java.util.List;
* @author Nikita Koksharov
*
*/
public class InetSocketAddressDecoder implements MultiDecoder<InetSocketAddress> {
public class RedisURIDecoder implements MultiDecoder<RedisURI> {
@Override
public Decoder<Object> getDecoder(Codec codec, int paramNum, State state) {
@ -38,15 +36,11 @@ public class InetSocketAddressDecoder implements MultiDecoder<InetSocketAddress>
}
@Override
public InetSocketAddress decode(List<Object> parts, State state) {
public RedisURI decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
try {
return new InetSocketAddress(InetAddress.getByName((String) parts.get(0)), Integer.valueOf((String) parts.get(1)));
} catch (UnknownHostException e) {
throw new IllegalStateException(e);
}
return new RedisURI("redis", (String) parts.get(0), Integer.valueOf((String) parts.get(1)));
}
}

@ -691,6 +691,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
protected RFuture<RedisURI> resolveIP(RedisURI address) {
return resolveIP(address.getScheme(), address);
}
protected RFuture<RedisURI> resolveIP(String scheme, RedisURI address) {
if (address.isIP()) {
return RedissonPromise.newSucceededFuture(address);
}
@ -707,7 +711,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
InetSocketAddress s = f.getNow();
RedisURI uri = new RedisURI(address.getScheme() + "://" + s.getAddress().getHostAddress() + ":" + address.getPort());
RedisURI uri = new RedisURI(scheme + "://" + s.getAddress().getHostAddress() + ":" + address.getPort());
result.trySuccess(uri);
});
return result;

@ -29,7 +29,7 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.CountableListener;
import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
@ -120,12 +120,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
continue;
}
InetSocketAddress master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
RedisURI master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
if (master == null) {
throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");
}
RedisURI masterHost = toURI(master.getHostString(), String.valueOf(master.getPort()));
RedisURI masterHost = resolveIP(scheme, master).syncUninterruptibly().getNow();
this.config.setMasterAddress(masterHost.toString());
currentMaster.set(masterHost);
log.info("master: {} added", masterHost);
@ -136,20 +136,21 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
continue;
}
String ip = map.get("ip");
String host = map.get("ip");
String port = map.get("port");
String flags = map.getOrDefault("flags", "");
String masterLinkStatus = map.getOrDefault("master-link-status", "");
RedisURI host = toURI(ip, port);
RedisURI uri = toURI(host, port);
uri = resolveIP(uri).syncUninterruptibly().getNow();
this.config.addSlaveAddress(host.toString());
log.debug("slave {} state: {}", host, map);
log.info("slave: {} added", host);
this.config.addSlaveAddress(uri.toString());
log.debug("slave {} state: {}", uri, map);
log.info("slave: {} added", uri);
if (isSlaveDown(flags, masterLinkStatus)) {
disconnectedSlaves.add(host);
log.warn("slave: {} is down", host);
disconnectedSlaves.add(uri);
log.warn("slave: {} is down", uri);
}
}
@ -163,8 +164,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = map.get("ip");
String port = map.get("port");
RedisURI sentinelAddr = toURI(ip, port);
RFuture<Void> future = registerSentinel(sentinelAddr, this.config, null);
RedisURI uri = toURI(ip, port);
uri = resolveIP(uri).syncUninterruptibly().getNow();
RFuture<Void> future = registerSentinel(uri, this.config, null);
connectionFutures.add(future);
}
@ -375,14 +377,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
};
RFuture<InetSocketAddress> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
masterFuture.onComplete((master, e) -> {
RFuture<RedisURI> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
masterFuture.thenCompose(u -> resolveIP(scheme, u))
.whenComplete((newMaster, e) -> {
if (e != null) {
return;
}
RedisURI current = currentMaster.get();
RedisURI newMaster = toURI(master.getHostString(), String.valueOf(master.getPort()));
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster);
@ -398,55 +400,62 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
if (!config.checkSkipSlavesInit()) {
RFuture<List<Map<String, String>>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
commands.incrementAndGet();
slavesFuture.onComplete((slavesMap, e) -> {
if (e != null) {
slavesFuture.onComplete((slavesMap, ex) -> {
if (ex != null) {
return;
}
Set<RedisURI> currentSlaves = new HashSet<>(slavesMap.size());
List<RFuture<Void>> futures = new ArrayList<>();
AsyncCountDownLatch latch = new AsyncCountDownLatch();
for (Map<String, String> map : slavesMap) {
if (map.isEmpty()) {
latch.countDown();
continue;
}
String ip = map.get("ip");
String host = map.get("ip");
String port = map.get("port");
String flags = map.getOrDefault("flags", "");
String masterLinkStatus = map.getOrDefault("master-link-status", "");
String masterHost = map.get("master-host");
String masterPort = map.get("master-port");
RedisURI slaveAddr = toURI(ip, port);
if (isSlaveDown(flags, masterLinkStatus)) {
slaveDown(slaveAddr);
continue;
}
if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) {
continue;
}
currentSlaves.add(slaveAddr);
RFuture<Void> slaveFuture = addSlave(slaveAddr);
futures.add(slaveFuture);
}
CountableListener<Void> listener = new CountableListener<Void>() {
@Override
protected void onSuccess(Void value) {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
entry.getAllEntries().stream()
.map(e -> e.getClient().getAddr())
.map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort())))
.filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get()))
.forEach(a -> slaveDown(a));
};
};
listener.setCounter(futures.size());
for (RFuture<Void> f : futures) {
f.onComplete(listener);
RedisURI addr = toURI(host, port);
resolveIP(addr).onComplete((slaveAddr, exc) -> {
if (exc != null) {
log.error("Unable to add slave " + addr, exc);
latch.countDown();
return;
}
if (isSlaveDown(flags, masterLinkStatus)) {
slaveDown(slaveAddr);
latch.countDown();
return;
}
if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) {
latch.countDown();
return;
}
currentSlaves.add(slaveAddr);
addSlave(slaveAddr).onComplete((r, e2) -> {
latch.countDown();
if (e2 != null) {
log.error("Unable to add slave " + slaveAddr, e2);
}
});
});
}
latch.latch(() -> {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
entry.getAllEntries().stream()
.map(e -> e.getClient().getAddr())
.map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort())))
.filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get()))
.forEach(a -> slaveDown(a));
}, slavesMap.size());
});
slavesFuture.onComplete(commonListener);
}
@ -456,8 +465,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
if (e != null || list.isEmpty()) {
return;
}
Set<RedisURI> newUris = list.stream().filter(m -> {
AsyncCountDownLatch latch = new AsyncCountDownLatch();
List<RFuture<RedisURI>> newUris = list.stream().filter(m -> {
String flags = m.getOrDefault("flags", "");
String masterLinkStatus = m.getOrDefault("master-link-status", "");
if (!m.isEmpty() && !isSlaveDown(flags, masterLinkStatus)) {
@ -468,18 +478,30 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = m.get("ip");
String port = m.get("port");
return toURI(ip, port);
}).collect(Collectors.toSet());
InetSocketAddress addr = connection.getRedisClient().getAddr();
RedisURI currentAddr = toURI(addr);
newUris.add(currentAddr);
updateSentinels(newUris);
}).map(addr -> {
RFuture<RedisURI> f = resolveIP(addr);
f.onComplete((res, ex) -> {
if (ex != null) {
log.error("unable to resolve hostname", ex);
}
latch.countDown();
});
return f;
}).collect(Collectors.toList());
latch.latch(() -> {
List<RedisURI> uris = newUris.stream().map(u -> u.getNow()).filter(u -> u != null).collect(Collectors.toList());
InetSocketAddress addr = connection.getRedisClient().getAddr();
RedisURI currentAddr = toURI(addr);
uris.add(currentAddr);
updateSentinels(uris);
}, newUris.size());
});
sentinelsFuture.onComplete(commonListener);
}
private void updateSentinels(Set<RedisURI> newUris) {
private void updateSentinels(Collection<RedisURI> newUris) {
newUris.stream()
.filter(uri -> !sentinels.containsKey(uri))
.forEach(uri -> registerSentinel(uri, getConfig(), null));

@ -28,6 +28,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.Time;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedisURI;
import org.redisson.misc.RedissonPromise;
import java.net.InetSocketAddress;
@ -199,7 +200,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
}
@Override
public InetSocketAddress getMasterAddr(String masterName) {
public RedisURI getMasterAddr(String masterName) {
return commandAsyncService.get(getMasterAddrAsync(masterName));
}
@ -229,7 +230,7 @@ public class SentinelRedisNode implements RedisSentinel, RedisSentinelAsync {
}
@Override
public RFuture<InetSocketAddress> getMasterAddrAsync(String masterName) {
public RFuture<RedisURI> getMasterAddrAsync(String masterName) {
return executeAsync(null, StringCodec.INSTANCE, -1, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName);
}

@ -9,6 +9,7 @@ import org.redisson.client.protocol.Time;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.redisson.misc.RedisURI;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -275,8 +276,8 @@ public class RedissonRedisNodesTest extends BaseTest {
for (RedisSentinel sentinel : nodes.getSentinels()) {
Assertions.assertTrue(sentinel.ping());
InetSocketAddress addr = sentinel.getMasterAddr("myMaster");
assertThat(addr.getAddress().getHostAddress()).isEqualTo("127.0.0.1");
RedisURI addr = sentinel.getMasterAddr("myMaster");
assertThat(addr.getHost()).isEqualTo("127.0.0.1");
assertThat(addr.getPort()).isEqualTo(master.getRedisServerPort());
Map<String, String> masterMap = sentinel.getMaster("myMaster");

Loading…
Cancel
Save