diff --git a/redisson/src/main/java/org/redisson/api/HostNatMapper.java b/redisson/src/main/java/org/redisson/api/HostNatMapper.java new file mode 100644 index 000000000..c4e362cdc --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/HostNatMapper.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2013-2020 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.redisson.misc.RedisURI; + +import java.util.Map; + +/** + * Maps host of RedisURI object using map defined in hostsMap setting. + * + * @author Nikita Koksharov + * + */ +public class HostNatMapper implements NatMapper { + + private Map hostsMap; + + @Override + public RedisURI map(RedisURI uri) { + String host = hostsMap.get(uri.getHost()); + if (host == null) { + return uri; + } + return new RedisURI(uri.getScheme(), host, uri.getPort()); + } + + /** + * Defines hosts mapping. Host as key mapped to host as value. + * + * @param hostsMap - hosts map + */ + public void setHostsMap(Map hostsMap) { + this.hostsMap = hostsMap; + } + +} diff --git a/redisson/src/main/java/org/redisson/api/HostPortNatMapper.java b/redisson/src/main/java/org/redisson/api/HostPortNatMapper.java new file mode 100644 index 000000000..bbfcd832d --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/HostPortNatMapper.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2013-2020 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.redisson.misc.RedisURI; + +import java.util.Map; + +/** + * Maps host and port of RedisURI object using map defined in hostsMap setting. + * + * @author Nikita Koksharov + * + */ +public class HostPortNatMapper implements NatMapper { + + private Map hostsPortMap; + + @Override + public RedisURI map(RedisURI uri) { + String hostPort = hostsPortMap.get(uri.getHost() + ":" + uri.getPort()); + if (hostPort == null) { + return uri; + } + + int lastColonIdx = hostPort.lastIndexOf(":"); + String host = hostPort.substring(0, lastColonIdx); + String port = hostPort.substring(lastColonIdx + 1); + return new RedisURI(uri.getScheme(), host, Integer.valueOf(port)); + } + + /** + * Defines host and port mapping. Host and port as key mapped to host and port as value. + * Allowed value format: "127.0.0.1:6379" + * + * @param hostsPortMap - host and port map + */ + public void setHostsPortMap(Map hostsPortMap) { + this.hostsPortMap = hostsPortMap; + } + +} diff --git a/redisson/src/main/java/org/redisson/api/NatMapper.java b/redisson/src/main/java/org/redisson/api/NatMapper.java new file mode 100644 index 000000000..023b39b92 --- /dev/null +++ b/redisson/src/main/java/org/redisson/api/NatMapper.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2013-2020 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.redisson.api; + +import org.redisson.misc.RedisURI; + +/** + * Maps RedisURI object. Allows to change input RedisURI's port and host values. + * + * @author Nikita Koksharov + * + * @see HostNatMapper + * @see HostPortNatMapper + */ +@FunctionalInterface +public interface NatMapper { + + /** + * Applies map function to input uri object + * + * @param uri - RedisURI object + * @return mapped RedisURI object + */ + RedisURI map(RedisURI uri); + + /** + * Returns input RedisURI object. Used by default + * + * @return NatMapper instance what returns input RedisURI object + */ + static NatMapper direct() { + return uri -> uri; + } + +} diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 318255ec3..9e4c5a577 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -15,35 +15,15 @@ */ package org.redisson.cluster; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - +import io.netty.resolver.AddressResolver; +import io.netty.util.NetUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ScheduledFuture; +import org.redisson.api.NatMapper; import org.redisson.api.NodeType; import org.redisson.api.RFuture; -import org.redisson.client.RedisClient; -import org.redisson.client.RedisClientConfig; -import org.redisson.client.RedisConnection; -import org.redisson.client.RedisConnectionException; -import org.redisson.client.RedisException; +import org.redisson.client.*; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.cluster.ClusterNodeInfo.Flag; @@ -63,11 +43,15 @@ import org.redisson.misc.RedissonPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.resolver.AddressResolver; -import io.netty.util.NetUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ScheduledFuture; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * @@ -88,7 +72,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private String configEndpointHostName; - private final Map natMap; + private final NatMapper natMapper; public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { super(config, id); @@ -97,7 +81,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { throw new IllegalArgumentException("At least one cluster node should be defined!"); } - natMap = cfg.getNatMap(); + this.natMapper = cfg.getNatMapper(); this.config = create(cfg); initTimer(this.config); @@ -691,14 +675,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { @Override public RedisURI applyNatMap(RedisURI address) { - String mappedAddress = natMap.get(address.getHost() + ":" + address.getPort()); - if (mappedAddress == null && natMap.get(address.getHost()) != null) { - mappedAddress = natMap.get(address.getHost()) + ":" + address.getPort(); - } - if (mappedAddress != null) { - return new RedisURI(address.getScheme() + "://" + mappedAddress); - } - return address; + return natMapper.map(address); } private Collection parsePartitions(List nodes) { diff --git a/redisson/src/main/java/org/redisson/config/ClusterServersConfig.java b/redisson/src/main/java/org/redisson/config/ClusterServersConfig.java index ad37d6c58..e0cb1d1b5 100644 --- a/redisson/src/main/java/org/redisson/config/ClusterServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/ClusterServersConfig.java @@ -15,10 +15,12 @@ */ package org.redisson.config; +import org.redisson.api.HostNatMapper; +import org.redisson.api.HostPortNatMapper; +import org.redisson.api.NatMapper; + import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,7 +31,7 @@ import java.util.Map; */ public class ClusterServersConfig extends BaseMasterSlaveServersConfig { - private Map natMap = Collections.emptyMap(); + private NatMapper natMapper = NatMapper.direct(); /** * Redis cluster node urls list @@ -50,7 +52,7 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig(config.getNatMap())); + setNatMapper(config.getNatMapper()); setCheckSlotsCoverage(config.isCheckSlotsCoverage()); } @@ -104,17 +106,32 @@ public class ClusterServersConfig extends BaseMasterSlaveServersConfig getNatMap() { - return natMap; + /* + * Use {@link #setNatMapper(NatMapper)} + */ + @Deprecated + public ClusterServersConfig setNatMap(Map natMap) { + HostPortNatMapper mapper = new HostPortNatMapper(); + mapper.setHostsPortMap(natMap); + this.natMapper = mapper; + return this; + } + + public NatMapper getNatMapper() { + return natMapper; } + /** - * Defines NAT mapping. Address as a map key is replaced with mapped address as value. - * - * @param natMap - nat mapping + * Defines NAT mapper which maps Redis URI object. + * + * @see HostNatMapper + * @see HostPortNatMapper + * + * @param natMapper - nat mapper object * @return config */ - public ClusterServersConfig setNatMap(Map natMap) { - this.natMap = natMap; + public ClusterServersConfig setNatMapper(NatMapper natMapper) { + this.natMapper = natMapper; return this; } diff --git a/redisson/src/main/java/org/redisson/config/SentinelServersConfig.java b/redisson/src/main/java/org/redisson/config/SentinelServersConfig.java index fceb180f3..fc9420ce4 100644 --- a/redisson/src/main/java/org/redisson/config/SentinelServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/SentinelServersConfig.java @@ -15,10 +15,12 @@ */ package org.redisson.config; +import org.redisson.api.HostNatMapper; +import org.redisson.api.HostPortNatMapper; +import org.redisson.api.NatMapper; + import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +33,7 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig sentinelAddresses = new ArrayList<>(); - private Map natMap = Collections.emptyMap(); + private NatMapper natMapper = NatMapper.direct(); private String masterName; @@ -54,7 +56,7 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig(config.getNatMap())); + setNatMapper(config.getNatMapper()); } /** @@ -118,20 +120,34 @@ public class SentinelServersConfig extends BaseMasterSlaveServersConfig getNatMap() { - return natMap; + + /* + * Use {@link #setNatMapper(NatMapper)} + */ + @Deprecated + public SentinelServersConfig setNatMap(Map natMap) { + HostPortNatMapper mapper = new HostPortNatMapper(); + mapper.setHostsPortMap(natMap); + this.natMapper = mapper; + return this; } - + + public NatMapper getNatMapper() { + return natMapper; + } + /** - * Defines NAT mapping. Address as a map key is replaced with mapped address as value. - * - * @param natMap - nat mapping + * Defines NAT mapper which maps Redis URI object. + * + * @see HostNatMapper + * @see HostPortNatMapper + * + * @param natMapper - nat mapper object * @return config */ - public SentinelServersConfig setNatMap(Map natMap) { - this.natMap = natMap; + public SentinelServersConfig setNatMapper(NatMapper natMapper) { + this.natMapper = natMapper; return this; } - + } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 1eb07f221..2e24e769f 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -20,6 +20,7 @@ import io.netty.util.NetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ScheduledFuture; +import org.redisson.api.NatMapper; import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.*; @@ -56,13 +57,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { private final Set sentinelHosts = new HashSet<>(); private final ConcurrentMap sentinels = new ConcurrentHashMap<>(); - private final AtomicReference currentMaster = new AtomicReference<>(); + private final AtomicReference currentMaster = new AtomicReference<>(); private final Set disconnectedSlaves = new HashSet<>(); private ScheduledFuture monitorFuture; private AddressResolver sentinelResolver; - private Map natMap; + private final NatMapper natMapper; private boolean usePassword = false; private String scheme; @@ -79,9 +80,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { this.config = create(cfg); initTimer(this.config); - - this.natMap=cfg.getNatMap(); - + + this.natMapper = cfg.getNatMapper(); + this.sentinelResolver = resolverGroup.getResolver(getGroup().next()); checkAuth(cfg); @@ -109,8 +110,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!"); } - String masterHost = createAddress(master.get(0), master.get(1)); - this.config.setMasterAddress(masterHost); + RedisURI masterHost = toURI(master.get(0), master.get(1)); + this.config.setMasterAddress(masterHost.toString()); currentMaster.set(masterHost); log.info("master: {} added", masterHost); @@ -124,15 +125,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String port = map.get("port"); String flags = map.get("flags"); - String host = createAddress(ip, port); + RedisURI host = toURI(ip, port); - this.config.addSlaveAddress(host); + this.config.addSlaveAddress(host.toString()); log.debug("slave {} state: {}", host, map); log.info("slave: {} added", host); if (flags.contains("s_down") || flags.contains("disconnected")) { - RedisURI uri = new RedisURI(host); - disconnectedSlaves.add(uri); + disconnectedSlaves.add(host); log.warn("slave: {} is down", host); } } @@ -352,11 +352,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - String current = currentMaster.get(); - String newMaster = createAddress(master.get(0), master.get(1)); + RedisURI current = currentMaster.get(); + RedisURI newMaster = toURI(master.get(0), master.get(1)); if (!newMaster.equals(current) && currentMaster.compareAndSet(current, newMaster)) { - RFuture changeFuture = changeMaster(singleSlotRange.getStartSlot(), new RedisURI(newMaster)); + RFuture changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster); changeFuture.onComplete((res, ex) -> { if (ex != null) { currentMaster.compareAndSet(newMaster, current); @@ -374,7 +374,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return; } - Set currentSlaves = new HashSet(slavesMap.size()); + Set currentSlaves = new HashSet<>(slavesMap.size()); List> futures = new ArrayList<>(); for (Map map : slavesMap) { if (map.isEmpty()) { @@ -386,18 +386,18 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { String flags = map.get("flags"); String masterHost = map.get("master-host"); String masterPort = map.get("master-port"); - + + RedisURI slaveAddr = toURI(ip, port); if (flags.contains("s_down") || flags.contains("disconnected")) { - slaveDown(ip, port); + slaveDown(slaveAddr); continue; } - if ("?".equals(masterHost) || !isUseSameMaster(ip, port, masterHost, masterPort)) { + if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) { continue; } - - String slaveAddr = createAddress(ip, port); + currentSlaves.add(slaveAddr); - RFuture slaveFuture = addSlave(ip, port, slaveAddr); + RFuture slaveFuture = addSlave(slaveAddr); futures.add(slaveFuture); } @@ -405,23 +405,20 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { @Override protected void onSuccess(Void value) { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - Set removedSlaves = new HashSet(); + Set removedSlaves = new HashSet<>(); for (ClientConnectionsEntry e : entry.getAllEntries()) { InetSocketAddress addr = e.getClient().getAddr(); - String slaveAddr = createAddress(addr.getAddress().getHostAddress(), String.valueOf(addr.getPort())); + RedisURI slaveAddr = toURI(addr.getAddress().getHostAddress(), String.valueOf(addr.getPort())); removedSlaves.add(slaveAddr); } removedSlaves.removeAll(currentSlaves); - for (String slave : removedSlaves) { + for (RedisURI slave : removedSlaves) { if (slave.equals(currentMaster.get())) { continue; } - String hostPort = slave.replace("redis://", ""); - int lastColonIdx = hostPort.lastIndexOf(":"); - String host = hostPort.substring(0, lastColonIdx); - String port = hostPort.substring(lastColonIdx + 1); - slaveDown(host, port); + + slaveDown(slave); } }; }; @@ -486,10 +483,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return applyNatMap(uri); } - private String createAddress(String host, String port) { - return toURI(host, port).toString(); - } - @Override protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config) { MasterSlaveEntry entry = new MasterSlaveEntry(this, config); @@ -540,68 +533,62 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { return result; } - private RFuture addSlave(String ip, String port, String addr) { + private RFuture addSlave(RedisURI uri) { RPromise result = new RedissonPromise(); // to avoid addition twice MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - RedisURI uri = toURI(ip, port); if (!entry.hasSlave(uri) && !config.checkSkipSlavesInit()) { - RFuture future = entry.addSlave(new RedisURI(addr)); + RFuture future = entry.addSlave(uri); future.onComplete((res, e) -> { if (e != null) { result.tryFailure(e); - log.error("Can't add slave: " + addr, e); + log.error("Can't add slave: " + uri, e); return; } if (entry.isSlaveUnfreezed(uri) || entry.slaveUp(uri, FreezeReason.MANAGER)) { - String slaveAddr = ip + ":" + port; - log.info("slave: {} added", slaveAddr); + log.info("slave: {} added", uri); result.trySuccess(null); } }); } else { if (entry.hasSlave(uri)) { - slaveUp(ip, port); + slaveUp(uri); } result.trySuccess(null); } return result; } - private void slaveDown(String ip, String port) { + private void slaveDown(RedisURI uri) { if (config.checkSkipSlavesInit()) { - log.warn("slave: {}:{} has down", ip, port); + log.warn("slave: {} has down", uri); } else { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - RedisURI uri = toURI(ip, port); if (entry.slaveDown(uri, FreezeReason.MANAGER)) { - log.warn("slave: {}:{} has down", ip, port); + log.warn("slave: {} has down", uri); } } } - private boolean isUseSameMaster(String slaveIp, String slavePort, String slaveMasterHost, String slaveMasterPort) { - String master = currentMaster.get(); - String slaveMaster = createAddress(slaveMasterHost, slaveMasterPort); + private boolean isUseSameMaster(RedisURI slaveAddr, String slaveMasterHost, String slaveMasterPort) { + RedisURI master = currentMaster.get(); + RedisURI slaveMaster = toURI(slaveMasterHost, slaveMasterPort); if (!master.equals(slaveMaster)) { - log.warn("Skipped slave up {} for master {} differs from current {}", slaveIp + ":" + slavePort, slaveMaster, master); + log.warn("Skipped slave up {} for master {} differs from current {}", slaveAddr, slaveMaster, master); return false; } return true; } - private void slaveUp(String ip, String port) { + private void slaveUp(RedisURI uri) { if (config.checkSkipSlavesInit()) { - String slaveAddr = ip + ":" + port; - log.info("slave: {} has up", slaveAddr); + log.info("slave: {} has up", uri); return; } - RedisURI uri = toURI(ip, port); if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) { - String slaveAddr = ip + ":" + port; - log.info("slave: {} has up", slaveAddr); + log.info("slave: {} has up", uri); } } @@ -637,14 +624,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { @Override public RedisURI applyNatMap(RedisURI address) { - String mappedAddress = natMap.get(address.getHost() + ":" + address.getPort()); - if (mappedAddress == null && natMap.get(address.getHost()) != null) { - mappedAddress = natMap.get(address.getHost()) + ":" + address.getPort(); - } - if (mappedAddress != null) { - return new RedisURI(address.getScheme() + "://" + mappedAddress); - } - return address; + return natMapper.map(address); } } diff --git a/redisson/src/main/java/org/redisson/misc/RedisURI.java b/redisson/src/main/java/org/redisson/misc/RedisURI.java index 62a99f7df..28051c92f 100644 --- a/redisson/src/main/java/org/redisson/misc/RedisURI.java +++ b/redisson/src/main/java/org/redisson/misc/RedisURI.java @@ -29,7 +29,11 @@ public class RedisURI { private final boolean ssl; private final String host; private final int port; - + + public RedisURI(String scheme, String host, int port) { + this(scheme + "://" + host + ":" + port); + } + public RedisURI(String uri) { if (!uri.startsWith("redis://") && !uri.startsWith("rediss://")) {