Feature - natMap deprecated in favor of natMapper setting. #2545

pull/2563/head
Nikita Koksharov 5 years ago
parent ca44e02957
commit 77f680cf16

@ -0,0 +1,50 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.misc.RedisURI;
import java.util.Map;
/**
* Maps host of RedisURI object using map defined in <code>hostsMap</code> setting.
*
* @author Nikita Koksharov
*
*/
public class HostNatMapper implements NatMapper {
private Map<String, String> hostsMap;
@Override
public RedisURI map(RedisURI uri) {
String host = hostsMap.get(uri.getHost());
if (host == null) {
return uri;
}
return new RedisURI(uri.getScheme(), host, uri.getPort());
}
/**
* Defines hosts mapping. Host as key mapped to host as value.
*
* @param hostsMap - hosts map
*/
public void setHostsMap(Map<String, String> hostsMap) {
this.hostsMap = hostsMap;
}
}

@ -0,0 +1,55 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.misc.RedisURI;
import java.util.Map;
/**
* Maps host and port of RedisURI object using map defined in <code>hostsMap</code> setting.
*
* @author Nikita Koksharov
*
*/
public class HostPortNatMapper implements NatMapper {
private Map<String, String> hostsPortMap;
@Override
public RedisURI map(RedisURI uri) {
String hostPort = hostsPortMap.get(uri.getHost() + ":" + uri.getPort());
if (hostPort == null) {
return uri;
}
int lastColonIdx = hostPort.lastIndexOf(":");
String host = hostPort.substring(0, lastColonIdx);
String port = hostPort.substring(lastColonIdx + 1);
return new RedisURI(uri.getScheme(), host, Integer.valueOf(port));
}
/**
* Defines host and port mapping. Host and port as key mapped to host and port as value.
* Allowed value format: "127.0.0.1:6379"
*
* @param hostsPortMap - host and port map
*/
public void setHostsPortMap(Map<String, String> hostsPortMap) {
this.hostsPortMap = hostsPortMap;
}
}

@ -0,0 +1,48 @@
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import org.redisson.misc.RedisURI;
/**
* Maps RedisURI object. Allows to change input RedisURI's port and host values.
*
* @author Nikita Koksharov
*
* @see HostNatMapper
* @see HostPortNatMapper
*/
@FunctionalInterface
public interface NatMapper {
/**
* Applies map function to input <code>uri</code> object
*
* @param uri - RedisURI object
* @return mapped RedisURI object
*/
RedisURI map(RedisURI uri);
/**
* Returns input RedisURI object. Used by default
*
* @return NatMapper instance what returns input RedisURI object
*/
static NatMapper direct() {
return uri -> uri;
}
}

@ -15,35 +15,15 @@
*/
package org.redisson.cluster;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import io.netty.resolver.AddressResolver;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import org.redisson.api.NatMapper;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.*;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.cluster.ClusterNodeInfo.Flag;
@ -63,11 +43,15 @@ import org.redisson.misc.RedissonPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.resolver.AddressResolver;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
*
@ -88,7 +72,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private String configEndpointHostName;
private final Map<String, String> natMap;
private final NatMapper natMapper;
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
@ -97,7 +81,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
throw new IllegalArgumentException("At least one cluster node should be defined!");
}
natMap = cfg.getNatMap();
this.natMapper = cfg.getNatMapper();
this.config = create(cfg);
initTimer(this.config);
@ -691,14 +675,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
@Override
public RedisURI applyNatMap(RedisURI address) {
String mappedAddress = natMap.get(address.getHost() + ":" + address.getPort());
if (mappedAddress == null && natMap.get(address.getHost()) != null) {
mappedAddress = natMap.get(address.getHost()) + ":" + address.getPort();
}
if (mappedAddress != null) {
return new RedisURI(address.getScheme() + "://" + mappedAddress);
}
return address;
return natMapper.map(address);
}
private Collection<ClusterPartition> parsePartitions(List<ClusterNodeInfo> nodes) {

@ -15,10 +15,12 @@
*/
package org.redisson.config;
import org.redisson.api.HostNatMapper;
import org.redisson.api.HostPortNatMapper;
import org.redisson.api.NatMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -29,7 +31,7 @@ import java.util.Map;
*/
public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterServersConfig> {
private Map<String, String> natMap = Collections.emptyMap();
private NatMapper natMapper = NatMapper.direct();
/**
* Redis cluster node urls list
@ -50,7 +52,7 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
super(config);
setNodeAddresses(config.getNodeAddresses());
setScanInterval(config.getScanInterval());
setNatMap(new HashMap<>(config.getNatMap()));
setNatMapper(config.getNatMapper());
setCheckSlotsCoverage(config.isCheckSlotsCoverage());
}
@ -104,17 +106,32 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterSe
return this;
}
public Map<String, String> getNatMap() {
return natMap;
/*
* Use {@link #setNatMapper(NatMapper)}
*/
@Deprecated
public ClusterServersConfig setNatMap(Map<String, String> natMap) {
HostPortNatMapper mapper = new HostPortNatMapper();
mapper.setHostsPortMap(natMap);
this.natMapper = mapper;
return this;
}
public NatMapper getNatMapper() {
return natMapper;
}
/**
* Defines NAT mapping. Address as a map key is replaced with mapped address as value.
*
* @param natMap - nat mapping
* Defines NAT mapper which maps Redis URI object.
*
* @see HostNatMapper
* @see HostPortNatMapper
*
* @param natMapper - nat mapper object
* @return config
*/
public ClusterServersConfig setNatMap(Map<String, String> natMap) {
this.natMap = natMap;
public ClusterServersConfig setNatMapper(NatMapper natMapper) {
this.natMapper = natMapper;
return this;
}

@ -15,10 +15,12 @@
*/
package org.redisson.config;
import org.redisson.api.HostNatMapper;
import org.redisson.api.HostPortNatMapper;
import org.redisson.api.NatMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -31,7 +33,7 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
private List<String> sentinelAddresses = new ArrayList<>();
private Map<String, String> natMap = Collections.emptyMap();
private NatMapper natMapper = NatMapper.direct();
private String masterName;
@ -54,7 +56,7 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
setMasterName(config.getMasterName());
setDatabase(config.getDatabase());
setScanInterval(config.getScanInterval());
setNatMap(new HashMap<>(config.getNatMap()));
setNatMapper(config.getNatMapper());
}
/**
@ -118,20 +120,34 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig<Sentinel
this.scanInterval = scanInterval;
return this;
}
public Map<String, String> getNatMap() {
return natMap;
/*
* Use {@link #setNatMapper(NatMapper)}
*/
@Deprecated
public SentinelServersConfig setNatMap(Map<String, String> natMap) {
HostPortNatMapper mapper = new HostPortNatMapper();
mapper.setHostsPortMap(natMap);
this.natMapper = mapper;
return this;
}
public NatMapper getNatMapper() {
return natMapper;
}
/**
* Defines NAT mapping. Address as a map key is replaced with mapped address as value.
*
* @param natMap - nat mapping
* Defines NAT mapper which maps Redis URI object.
*
* @see HostNatMapper
* @see HostPortNatMapper
*
* @param natMapper - nat mapper object
* @return config
*/
public SentinelServersConfig setNatMap(Map<String, String> natMap) {
this.natMap = natMap;
public SentinelServersConfig setNatMapper(NatMapper natMapper) {
this.natMapper = natMapper;
return this;
}
}

@ -20,6 +20,7 @@ import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
import org.redisson.api.NatMapper;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.*;
@ -56,13 +57,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Set<RedisURI> sentinelHosts = new HashSet<>();
private final ConcurrentMap<RedisURI, RedisClient> sentinels = new ConcurrentHashMap<>();
private final AtomicReference<String> currentMaster = new AtomicReference<>();
private final AtomicReference<RedisURI> currentMaster = new AtomicReference<>();
private final Set<RedisURI> disconnectedSlaves = new HashSet<>();
private ScheduledFuture<?> monitorFuture;
private AddressResolver<InetSocketAddress> sentinelResolver;
private Map<String, String> natMap;
private final NatMapper natMapper;
private boolean usePassword = false;
private String scheme;
@ -79,9 +80,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
this.config = create(cfg);
initTimer(this.config);
this.natMap=cfg.getNatMap();
this.natMapper = cfg.getNatMapper();
this.sentinelResolver = resolverGroup.getResolver(getGroup().next());
checkAuth(cfg);
@ -109,8 +110,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");
}
String masterHost = createAddress(master.get(0), master.get(1));
this.config.setMasterAddress(masterHost);
RedisURI masterHost = toURI(master.get(0), master.get(1));
this.config.setMasterAddress(masterHost.toString());
currentMaster.set(masterHost);
log.info("master: {} added", masterHost);
@ -124,15 +125,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String port = map.get("port");
String flags = map.get("flags");
String host = createAddress(ip, port);
RedisURI host = toURI(ip, port);
this.config.addSlaveAddress(host);
this.config.addSlaveAddress(host.toString());
log.debug("slave {} state: {}", host, map);
log.info("slave: {} added", host);
if (flags.contains("s_down") || flags.contains("disconnected")) {
RedisURI uri = new RedisURI(host);
disconnectedSlaves.add(uri);
disconnectedSlaves.add(host);
log.warn("slave: {} is down", host);
}
}
@ -352,11 +352,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
String current = currentMaster.get();
String newMaster = createAddress(master.get(0), master.get(1));
RedisURI current = currentMaster.get();
RedisURI newMaster = toURI(master.get(0), master.get(1));
if (!newMaster.equals(current)
&& currentMaster.compareAndSet(current, newMaster)) {
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), new RedisURI(newMaster));
RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster);
changeFuture.onComplete((res, ex) -> {
if (ex != null) {
currentMaster.compareAndSet(newMaster, current);
@ -374,7 +374,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
Set<String> currentSlaves = new HashSet<String>(slavesMap.size());
Set<RedisURI> currentSlaves = new HashSet<>(slavesMap.size());
List<RFuture<Void>> futures = new ArrayList<>();
for (Map<String, String> map : slavesMap) {
if (map.isEmpty()) {
@ -386,18 +386,18 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String flags = map.get("flags");
String masterHost = map.get("master-host");
String masterPort = map.get("master-port");
RedisURI slaveAddr = toURI(ip, port);
if (flags.contains("s_down") || flags.contains("disconnected")) {
slaveDown(ip, port);
slaveDown(slaveAddr);
continue;
}
if ("?".equals(masterHost) || !isUseSameMaster(ip, port, masterHost, masterPort)) {
if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) {
continue;
}
String slaveAddr = createAddress(ip, port);
currentSlaves.add(slaveAddr);
RFuture<Void> slaveFuture = addSlave(ip, port, slaveAddr);
RFuture<Void> slaveFuture = addSlave(slaveAddr);
futures.add(slaveFuture);
}
@ -405,23 +405,20 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override
protected void onSuccess(Void value) {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
Set<String> removedSlaves = new HashSet<String>();
Set<RedisURI> removedSlaves = new HashSet<>();
for (ClientConnectionsEntry e : entry.getAllEntries()) {
InetSocketAddress addr = e.getClient().getAddr();
String slaveAddr = createAddress(addr.getAddress().getHostAddress(), String.valueOf(addr.getPort()));
RedisURI slaveAddr = toURI(addr.getAddress().getHostAddress(), String.valueOf(addr.getPort()));
removedSlaves.add(slaveAddr);
}
removedSlaves.removeAll(currentSlaves);
for (String slave : removedSlaves) {
for (RedisURI slave : removedSlaves) {
if (slave.equals(currentMaster.get())) {
continue;
}
String hostPort = slave.replace("redis://", "");
int lastColonIdx = hostPort.lastIndexOf(":");
String host = hostPort.substring(0, lastColonIdx);
String port = hostPort.substring(lastColonIdx + 1);
slaveDown(host, port);
slaveDown(slave);
}
};
};
@ -486,10 +483,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return applyNatMap(uri);
}
private String createAddress(String host, String port) {
return toURI(host, port).toString();
}
@Override
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config) {
MasterSlaveEntry entry = new MasterSlaveEntry(this, config);
@ -540,68 +533,62 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private RFuture<Void> addSlave(String ip, String port, String addr) {
private RFuture<Void> addSlave(RedisURI uri) {
RPromise<Void> result = new RedissonPromise<Void>();
// to avoid addition twice
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
RedisURI uri = toURI(ip, port);
if (!entry.hasSlave(uri) && !config.checkSkipSlavesInit()) {
RFuture<Void> future = entry.addSlave(new RedisURI(addr));
RFuture<Void> future = entry.addSlave(uri);
future.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
log.error("Can't add slave: " + addr, e);
log.error("Can't add slave: " + uri, e);
return;
}
if (entry.isSlaveUnfreezed(uri) || entry.slaveUp(uri, FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} added", slaveAddr);
log.info("slave: {} added", uri);
result.trySuccess(null);
}
});
} else {
if (entry.hasSlave(uri)) {
slaveUp(ip, port);
slaveUp(uri);
}
result.trySuccess(null);
}
return result;
}
private void slaveDown(String ip, String port) {
private void slaveDown(RedisURI uri) {
if (config.checkSkipSlavesInit()) {
log.warn("slave: {}:{} has down", ip, port);
log.warn("slave: {} has down", uri);
} else {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
RedisURI uri = toURI(ip, port);
if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {}:{} has down", ip, port);
log.warn("slave: {} has down", uri);
}
}
}
private boolean isUseSameMaster(String slaveIp, String slavePort, String slaveMasterHost, String slaveMasterPort) {
String master = currentMaster.get();
String slaveMaster = createAddress(slaveMasterHost, slaveMasterPort);
private boolean isUseSameMaster(RedisURI slaveAddr, String slaveMasterHost, String slaveMasterPort) {
RedisURI master = currentMaster.get();
RedisURI slaveMaster = toURI(slaveMasterHost, slaveMasterPort);
if (!master.equals(slaveMaster)) {
log.warn("Skipped slave up {} for master {} differs from current {}", slaveIp + ":" + slavePort, slaveMaster, master);
log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMaster, master);
return false;
}
return true;
}
private void slaveUp(String ip, String port) {
private void slaveUp(RedisURI uri) {
if (config.checkSkipSlavesInit()) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr);
log.info("slave: {} has up", uri);
return;
}
RedisURI uri = toURI(ip, port);
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr);
log.info("slave: {} has up", uri);
}
}
@ -637,14 +624,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override
public RedisURI applyNatMap(RedisURI address) {
String mappedAddress = natMap.get(address.getHost() + ":" + address.getPort());
if (mappedAddress == null && natMap.get(address.getHost()) != null) {
mappedAddress = natMap.get(address.getHost()) + ":" + address.getPort();
}
if (mappedAddress != null) {
return new RedisURI(address.getScheme() + "://" + mappedAddress);
}
return address;
return natMapper.map(address);
}
}

@ -29,7 +29,11 @@ public class RedisURI {
private final boolean ssl;
private final String host;
private final int port;
public RedisURI(String scheme, String host, int port) {
this(scheme + "://" + host + ":" + port);
}
public RedisURI(String uri) {
if (!uri.startsWith("redis://")
&& !uri.startsWith("rediss://")) {

Loading…
Cancel
Save