From 5152c66e290ab1de09f00b8b53e5e8f467f8f567 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 28 Aug 2017 13:00:28 +0300 Subject: [PATCH 1/4] Shutdown master's entry in slaveConnectionPool during master change process. #945 --- .../java/org/redisson/client/RedisClient.java | 3 ++ .../client/handler/ConnectionWatchdog.java | 5 +++ .../cluster/ClusterConnectionManager.java | 4 +- .../config/BaseMasterSlaveServersConfig.java | 4 ++ .../connection/ClientConnectionsEntry.java | 11 +++-- .../MasterSlaveConnectionManager.java | 8 ++-- .../redisson/connection/MasterSlaveEntry.java | 43 +++++++++++-------- .../ReplicatedConnectionManager.java | 1 - .../connection/SentinelConnectionManager.java | 8 ++-- .../balancer/LoadBalancerManager.java | 30 ++++++++++++- .../connection/pool/ConnectionPool.java | 28 +++++++----- 11 files changed, 99 insertions(+), 46 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index e7e8a35ff..ce0e0b9d8 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -175,6 +175,9 @@ public class RedisClient { this.commandTimeout = config.getCommandTimeout(); } + public String getIpAddr() { + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } public InetSocketAddress getAddr() { return addr; diff --git a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java index 1098f8b0a..652b75261 100644 --- a/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java +++ b/redisson/src/main/java/org/redisson/client/handler/ConnectionWatchdog.java @@ -39,6 +39,11 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +/** + * + * @author Nikita Koksharov + * + */ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private final Logger log = LoggerFactory.getLogger(getClass()); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index a21ee9689..5478a6891 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -251,7 +251,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { final MasterSlaveEntry e; List> futures = new ArrayList>(); - if (config.getReadMode() == ReadMode.MASTER) { + if (config.isSkipSlavesInit()) { e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config); } else { config.setSlaveAddresses(partition.getSlaveAddresses()); @@ -508,8 +508,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { ClusterPartition newMasterPart = find(newPartitions, slot); // does partition has a new master? if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) { - log.info("changing master from {} to {} for {}", - currentPart.getMasterAddress(), newMasterPart.getMasterAddress(), slot); URI newUri = newMasterPart.getMasterAddress(); URI oldUri = currentPart.getMasterAddress(); diff --git a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java index bdc5d7312..267b9076e 100644 --- a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java @@ -256,6 +256,10 @@ public class BaseMasterSlaveServersConfig 0) { @@ -72,7 +72,10 @@ public class ClientConnectionsEntry { } connectionManager.getConnectionWatcher().add(poolMinSize, poolMaxSize, freeConnections, freeConnectionsCounter); } - + + public void setNodeType(NodeType nodeType) { + this.nodeType = nodeType; + } public NodeType getNodeType() { return nodeType; } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index fa78f53a2..6d490a009 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -51,8 +51,6 @@ import org.redisson.command.CommandSyncService; import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; -import org.redisson.config.ReadMode; -import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.RPromise; import org.redisson.misc.RedissonPromise; @@ -280,7 +278,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { slots.add(singleSlotRange); MasterSlaveEntry entry; - if (config.getReadMode() == ReadMode.MASTER) { + if (config.isSkipSlavesInit()) { entry = new SingleEntry(slots, this, config); RFuture f = entry.setupMasterEntry(config.getMasterAddress()); f.syncUninterruptibly(); @@ -349,7 +347,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdownAsync(RedisClient client) { - clientEntries.remove(client); + if (clientEntries.remove(client) == null) { + log.error("Can't find client {}", client); + } client.shutdownAsync(); } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 8b78fc115..fcfcd5a36 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -42,6 +42,7 @@ import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.pool.MasterConnectionPool; import org.redisson.connection.pool.MasterPubSubConnectionPool; +import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,7 +89,7 @@ public class MasterSlaveEntry { public List> initSlaveBalancer(Collection disconnectedNodes) { boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() - && config.getReadMode() == ReadMode.SLAVE + && !config.isSkipSlavesInit() && disconnectedNodes.size() < config.getSlaveAddresses().size(); List> result = new LinkedList>(); @@ -120,15 +121,6 @@ public class MasterSlaveEntry { return writeConnectionHolder.add(masterEntry); } - private boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReason) { - ClientConnectionsEntry e = slaveBalancer.freeze(entry, freezeReason); - if (e == null) { - return false; - } - - return slaveDown(e, freezeReason == FreezeReason.SYSTEM); - } - public boolean slaveDown(URI address, FreezeReason freezeReason) { ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason); if (entry == null) { @@ -140,7 +132,7 @@ public class MasterSlaveEntry { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { // add master as slave if no more slaves available - if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { + if (!config.isSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { URI addr = masterEntry.getClient().getConfig().getAddress(); if (slaveUp(addr, FreezeReason.SYSTEM)) { log.info("master {} used as slave", addr); @@ -153,7 +145,7 @@ public class MasterSlaveEntry { if (connection == null) { break; } - + connection.closeAsync().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -318,13 +310,13 @@ public class MasterSlaveEntry { return addSlave(address, true, NodeType.SLAVE); } - private RFuture addSlave(URI address, boolean freezed, NodeType mode) { + private RFuture addSlave(URI address, boolean freezed, NodeType nodeType) { RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); ClientConnectionsEntry entry = new ClientConnectionsEntry(client, this.config.getSlaveConnectionMinimumIdleSize(), this.config.getSlaveConnectionPoolSize(), this.config.getSubscriptionConnectionMinimumIdleSize(), - this.config.getSubscriptionConnectionPoolSize(), connectionManager, mode); + this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType); if (freezed) { synchronized (entry) { entry.setFreezed(freezed); @@ -346,7 +338,7 @@ public class MasterSlaveEntry { InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort()); InetSocketAddress addr = masterEntry.getClient().getAddr(); // exclude master from slaves - if (config.getReadMode() == ReadMode.SLAVE + if (!config.isSkipSlavesInit() && (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) { slaveDown(address, FreezeReason.SYSTEM); log.info("master {} excluded from slaves", addr); @@ -369,14 +361,21 @@ public class MasterSlaveEntry { public void operationComplete(Future future) throws Exception { writeConnectionHolder.remove(oldMaster); pubSubConnectionHolder.remove(oldMaster); - slaveDown(oldMaster, FreezeReason.MANAGER); + + oldMaster.freezeMaster(FreezeReason.MANAGER); + slaveDown(oldMaster, false); + slaveDown(URIBuilder.create("redis://" + oldMaster.getClient().getIpAddr()), FreezeReason.MANAGER); + + slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE); + slaveBalancer.changeType(address, NodeType.MASTER); // more than one slave available, so master can be removed from slaves - if (config.getReadMode() == ReadMode.SLAVE + if (!config.isSkipSlavesInit() && slaveBalancer.getAvailableClients() > 1) { slaveDown(address, FreezeReason.SYSTEM); } connectionManager.shutdownAsync(oldMaster.getClient()); + log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), address); } }); } @@ -415,10 +414,16 @@ public class MasterSlaveEntry { } public RFuture connectionReadOp(RedisCommand command) { + if (config.getReadMode() == ReadMode.MASTER) { + return connectionWriteOp(command); + } return slaveBalancer.nextConnection(command); } public RFuture connectionReadOp(RedisCommand command, InetSocketAddress addr) { + if (config.getReadMode() == ReadMode.MASTER) { + return connectionWriteOp(command); + } return slaveBalancer.getConnection(command, addr); } @@ -443,6 +448,10 @@ public class MasterSlaveEntry { } public void releaseRead(RedisConnection connection) { + if (config.getReadMode() == ReadMode.MASTER) { + releaseWrite(connection); + return; + } slaveBalancer.returnConnection(connection); } diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 329ff31fa..1634701d5 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -192,7 +192,6 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { if (master.equals(addr)) { log.debug("Current master {} unchanged", master); } else if (currentMaster.compareAndSet(master, addr)) { - log.info("Master has changed from {} to {}", master, addr); changeMaster(singleSlotRange.getStartSlot(), addr); } } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index d0cfbb35f..30099a2d6 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -41,6 +41,7 @@ import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; import org.redisson.config.SentinelServersConfig; +import org.redisson.config.SubscriptionMode; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,7 +241,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } // to avoid addition twice - if (slaves.putIfAbsent(slaveAddr, true) == null) { + if (slaves.putIfAbsent(slaveAddr, true) == null && !config.isSkipSlavesInit()) { final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); RFuture future = entry.addSlave(URIBuilder.create(slaveAddr)); future.addListener(new FutureListener() { @@ -312,7 +313,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private void slaveDown(String ip, String port) { - if (config.getReadMode() == ReadMode.MASTER) { + if (config.isSkipSlavesInit()) { log.warn("slave: {}:{} has down", ip, port); } else { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); @@ -369,7 +370,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private void slaveUp(String ip, String port) { - if (config.getReadMode() == ReadMode.MASTER) { + if (config.isSkipSlavesInit()) { String slaveAddr = ip + ":" + port; log.info("slave: {} has up", slaveAddr); return; @@ -395,7 +396,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { if (!newMaster.equals(current) && currentMaster.compareAndSet(current, newMaster)) { changeMaster(singleSlotRange.getStartSlot(), URIBuilder.create(newMaster)); - log.info("master {} changed to {}", current, newMaster); } } } else { diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 6c047a076..5ed44e8c6 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -20,12 +20,14 @@ import java.net.URI; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.redisson.api.NodeType; import org.redisson.api.RFuture; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisPubSubConnection; import org.redisson.client.protocol.RedisCommand; import org.redisson.config.MasterSlaveServersConfig; +import org.redisson.config.ReadMode; import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ConnectionManager; @@ -40,6 +42,11 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; +/** + * + * @author Nikita Koksharov + * + */ public class LoadBalancerManager { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -56,6 +63,25 @@ public class LoadBalancerManager { pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry); } + public void changeType(InetSocketAddress addr, NodeType nodeType) { + ClientConnectionsEntry entry = ip2Entry.get(addr.getAddress().getHostAddress() + ":" + addr.getPort()); + changeType(addr, nodeType, entry); + } + + protected void changeType(Object addr, NodeType nodeType, ClientConnectionsEntry entry) { + if (entry != null) { + if (connectionManager.isClusterMode()) { + entry.getClient().getConfig().setReadOnly(nodeType == NodeType.SLAVE && connectionManager.getConfig().getReadMode() != ReadMode.MASTER); + } + entry.setNodeType(nodeType); + } + } + + public void changeType(URI address, NodeType nodeType) { + ClientConnectionsEntry entry = getEntry(address); + changeType(address, nodeType, entry); + } + public RFuture add(final ClientConnectionsEntry entry) { final RPromise result = connectionManager.newPromise(); FutureListener listener = new FutureListener() { @@ -67,7 +93,7 @@ public class LoadBalancerManager { return; } if (counter.decrementAndGet() == 0) { - String addr = convert(entry.getClient().getConfig().getAddress()); + String addr = entry.getClient().getIpAddr(); ip2Entry.put(addr, entry); result.trySuccess(null); } @@ -123,7 +149,7 @@ public class LoadBalancerManager { return freeze(connectionEntry, freezeReason); } - protected ClientConnectionsEntry getEntry(URI address) { + private ClientConnectionsEntry getEntry(URI address) { String addr = convert(address); return ip2Entry.get(addr); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 132a28f21..c0ba05035 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -341,19 +341,23 @@ abstract class ConnectionPool { connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (entry.getFreezeReason() != FreezeReason.RECONNECT - || !entry.isFreezed() + synchronized (entry) { + if (entry.getFreezeReason() != FreezeReason.RECONNECT + || !entry.isFreezed() || connectionManager.isShuttingDown()) { - return; + return; + } } RFuture connectionFuture = entry.getClient().connectAsync(); connectionFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (entry.getFreezeReason() != FreezeReason.RECONNECT - || !entry.isFreezed()) { - return; + synchronized (entry) { + if (entry.getFreezeReason() != FreezeReason.RECONNECT + || !entry.isFreezed()) { + return; + } } if (!future.isSuccess()) { @@ -371,9 +375,11 @@ abstract class ConnectionPool { @Override public void operationComplete(Future future) throws Exception { try { - if (entry.getFreezeReason() != FreezeReason.RECONNECT - || !entry.isFreezed()) { - return; + synchronized (entry) { + if (entry.getFreezeReason() != FreezeReason.RECONNECT + || !entry.isFreezed()) { + return; + } } if (future.isSuccess() && "PONG".equals(future.getNow())) { @@ -385,13 +391,13 @@ abstract class ConnectionPool { throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); - log.info("slave {} successfully reconnected", entry.getClient().getAddr()); + log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); } else { synchronized (entry) { if (entry.getFreezeReason() == FreezeReason.RECONNECT) { entry.setFreezed(false); entry.setFreezeReason(null); - log.info("host {} successfully reconnected", entry.getClient().getAddr()); + log.info("host {} has been successfully reconnected", entry.getClient().getAddr()); } } } From a02d509573b53ad661fe574daa349fa45308babf Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 28 Aug 2017 13:54:05 +0300 Subject: [PATCH 2/4] DNS monitoring support for Sentinel, Master/Slave and Replicated mode #864, #1009 --- .../cluster/ClusterConnectionManager.java | 7 +- .../config/BaseMasterSlaveServersConfig.java | 24 +++- .../org/redisson/connection/DNSMonitor.java | 124 ++++++++++++++++++ .../MasterSlaveConnectionManager.java | 66 +++++----- .../redisson/connection/MasterSlaveEntry.java | 12 +- .../ReplicatedConnectionManager.java | 2 +- .../connection/SentinelConnectionManager.java | 23 ++-- .../connection/SingleConnectionManager.java | 69 +--------- .../test/java/org/redisson/RedissonTest.java | 3 +- 9 files changed, 209 insertions(+), 121 deletions(-) create mode 100644 redisson/src/main/java/org/redisson/connection/DNSMonitor.java diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 5478a6891..bb3e7d352 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -82,7 +82,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { this.config = create(cfg); initTimer(this.config); - init(this.config); Throwable lastException = null; List failedMasters = new ArrayList(); @@ -196,10 +195,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return result; } - @Override - protected void initEntry(MasterSlaveServersConfig config) { - } - private RFuture>> addMasterEntry(final ClusterPartition partition, final ClusterServersConfig cfg) { if (partition.isMasterFail()) { RedisException e = new RedisException("Failed to add master: " + @@ -251,7 +246,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { final MasterSlaveEntry e; List> futures = new ArrayList>(); - if (config.isSkipSlavesInit()) { + if (config.checkSkipSlavesInit()) { e = new SingleEntry(partition.getSlotRanges(), ClusterConnectionManager.this, config); } else { config.setSlaveAddresses(partition.getSlaveAddresses()); diff --git a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java index 267b9076e..f6e52924a 100644 --- a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java @@ -65,6 +65,8 @@ public class BaseMasterSlaveServersConfig + * Applications must ensure the JVM DNS cache TTL is low enough to support this.

+ * Set -1 to disable. + *

+ * Default is 5000. + * + * @param dnsMonitoringInterval time + * @return config + */ + public T setDnsMonitoringInterval(long dnsMonitoringInterval) { + this.dnsMonitoringInterval = dnsMonitoringInterval; + return (T) this; + } + public long getDnsMonitoringInterval() { + return dnsMonitoringInterval; + } + } diff --git a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java new file mode 100644 index 000000000..3f5bb4a00 --- /dev/null +++ b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java @@ -0,0 +1,124 @@ +package org.redisson.connection; + +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.redisson.client.RedisConnectionException; +import org.redisson.connection.ClientConnectionsEntry.FreezeReason; +import org.redisson.misc.URIBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.ScheduledFuture; + +/** + * DNS changes monitor. + * + * @author Nikita Koksharov + * + */ +public class DNSMonitor { + + private static final Logger log = LoggerFactory.getLogger(DNSMonitor.class); + + private ScheduledFuture dnsMonitorFuture; + + private ConnectionManager connectionManager; + + private final Map masters = new HashMap(); + private final Map slaves = new HashMap(); + + private long dnsMonitoringInterval; + + public DNSMonitor(ConnectionManager connectionManager, Set masterHosts, Set slaveHosts, long dnsMonitoringInterval) { + for (URI host : masterHosts) { + try { + masters.put(host, InetAddress.getByName(host.getHost())); + } catch (UnknownHostException e) { + throw new RedisConnectionException("Unknown host: " + host.getHost(), e); + } + } + for (URI host : slaveHosts) { + try { + slaves.put(host, InetAddress.getByName(host.getHost())); + } catch (UnknownHostException e) { + throw new RedisConnectionException("Unknown host: " + host.getHost(), e); + } + } + this.connectionManager = connectionManager; + this.dnsMonitoringInterval = dnsMonitoringInterval; + } + + public void start() { + monitorDnsChange(); + log.debug("DNS monitoring enabled; Current masters: {}, slaves: {}", masters, slaves); + } + + public void stop() { + if (dnsMonitorFuture != null) { + dnsMonitorFuture.cancel(true); + } + } + + private void monitorDnsChange() { + dnsMonitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() { + @Override + public void run() { + // As InetAddress.getByName call is blocking. Method should be run in dedicated thread + connectionManager.getExecutor().execute(new Runnable() { + @Override + public void run() { + try { + for (Entry entry : masters.entrySet()) { + InetAddress master = entry.getValue(); + InetAddress now = InetAddress.getByName(entry.getKey().getHost()); + if (!now.getHostAddress().equals(master.getHostAddress())) { + log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), master.getHostAddress(), now.getHostAddress()); + for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) { + if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost()) + && entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) { + entrySet.changeMaster(entry.getKey()); + } + } + masters.put(entry.getKey(), now); + log.info("Master {} has been changed", entry.getKey().getHost()); + } + } + + for (Entry entry : slaves.entrySet()) { + InetAddress slave = entry.getValue(); + InetAddress updatedSlave = InetAddress.getByName(entry.getKey().getHost()); + if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) { + log.info("Detected DNS change. {} has changed from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress()); + for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { + URI uri = URIBuilder.create("redis://" + slave.getHostAddress() + ":" + entry.getKey().getPort()); + if (masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER)) { + masterSlaveEntry.slaveUp(entry.getKey(), FreezeReason.MANAGER); + } + } + slaves.put(entry.getKey(), updatedSlave); + log.info("Slave {} has been changed", entry.getKey().getHost()); + } + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + } finally { + monitorDnsChange(); + } + } + }); + } + + }, dnsMonitoringInterval, TimeUnit.MILLISECONDS); + } + + +} diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 6d490a009..2db3203fc 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -126,6 +126,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected final Queue freePubSubConnections = new ConcurrentLinkedQueue(); + protected DNSMonitor dnsMonitor; + protected MasterSlaveServersConfig config; private final Map entries = PlatformDependent.newConcurrentHashMap(); @@ -159,7 +161,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) { this(config); initTimer(cfg); - init(cfg); + this.config = cfg; + initSingleEntry(); } public MasterSlaveConnectionManager(Config cfg) { @@ -235,19 +238,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return new HashSet(entries.values()); } - protected void init(MasterSlaveServersConfig config) { - this.config = config; - - connectionWatcher = new IdleConnectionWatcher(this, config); - - try { - initEntry(config); - } catch (RuntimeException e) { - stopThreads(); - throw e; - } - } - protected void initTimer(MasterSlaveServersConfig config) { int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()}; Arrays.sort(timeouts); @@ -271,26 +261,38 @@ public class MasterSlaveConnectionManager implements ConnectionManager { throw new IllegalStateException(e); } + connectionWatcher = new IdleConnectionWatcher(this, config); } - protected void initEntry(MasterSlaveServersConfig config) { - HashSet slots = new HashSet(); - slots.add(singleSlotRange); - - MasterSlaveEntry entry; - if (config.isSkipSlavesInit()) { - entry = new SingleEntry(slots, this, config); - RFuture f = entry.setupMasterEntry(config.getMasterAddress()); - f.syncUninterruptibly(); - } else { - entry = createMasterSlaveEntry(config, slots); - } - - for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { - addEntry(slot, entry); + protected void initSingleEntry() { + try { + HashSet slots = new HashSet(); + slots.add(singleSlotRange); + + MasterSlaveEntry entry; + if (config.checkSkipSlavesInit()) { + entry = new SingleEntry(slots, this, config); + RFuture f = entry.setupMasterEntry(config.getMasterAddress()); + f.syncUninterruptibly(); + } else { + entry = createMasterSlaveEntry(config, slots); + } + + for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { + addEntry(slot, entry); + } + + if (config.getDnsMonitoringInterval() != -1) { + dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()), + config.getSlaveAddresses(), config.getDnsMonitoringInterval()); + dnsMonitor.start(); + } + } catch (RuntimeException e) { + stopThreads(); + throw e; } } - + protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet slots) { MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config); @@ -775,6 +777,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void shutdown(long quietPeriod, long timeout, TimeUnit unit) { + if (dnsMonitor != null) { + dnsMonitor.stop(); + } + shutdownLatch.close(); shutdownPromise.trySuccess(true); shutdownLatch.awaitUninterruptibly(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index fcfcd5a36..1314c524c 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -86,10 +86,14 @@ public class MasterSlaveEntry { writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this); pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this); } + + public MasterSlaveServersConfig getConfig() { + return config; + } public List> initSlaveBalancer(Collection disconnectedNodes) { boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() - && !config.isSkipSlavesInit() + && !config.checkSkipSlavesInit() && disconnectedNodes.size() < config.getSlaveAddresses().size(); List> result = new LinkedList>(); @@ -132,7 +136,7 @@ public class MasterSlaveEntry { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { // add master as slave if no more slaves available - if (!config.isSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { + if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { URI addr = masterEntry.getClient().getConfig().getAddress(); if (slaveUp(addr, FreezeReason.SYSTEM)) { log.info("master {} used as slave", addr); @@ -338,7 +342,7 @@ public class MasterSlaveEntry { InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort()); InetSocketAddress addr = masterEntry.getClient().getAddr(); // exclude master from slaves - if (!config.isSkipSlavesInit() + if (!config.checkSkipSlavesInit() && (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) { slaveDown(address, FreezeReason.SYSTEM); log.info("master {} excluded from slaves", addr); @@ -370,7 +374,7 @@ public class MasterSlaveEntry { slaveBalancer.changeType(address, NodeType.MASTER); // more than one slave available, so master can be removed from slaves - if (!config.isSkipSlavesInit() + if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() > 1) { slaveDown(address, FreezeReason.SYSTEM); } diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 1634701d5..8882f1df6 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -101,7 +101,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { throw new RedisConnectionException("Can't connect to servers!"); } - init(this.config); + initSingleEntry(); scheduleMasterChangeCheck(cfg); } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 30099a2d6..7b0fe1dac 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -39,17 +39,15 @@ import org.redisson.cluster.ClusterSlotRange; import org.redisson.config.BaseMasterSlaveServersConfig; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; -import org.redisson.config.ReadMode; import org.redisson.config.SentinelServersConfig; -import org.redisson.config.SubscriptionMode; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; +import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.PlatformDependent; -import org.redisson.misc.URIBuilder; /** * @@ -68,13 +66,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { public SentinelConnectionManager(SentinelServersConfig cfg, Config config) { super(config); - - this.config = create(cfg); - initTimer(this.config); - + if (cfg.getMasterName() == null) { throw new IllegalArgumentException("masterName parameter is not defined!"); } + + this.config = create(cfg); + initTimer(this.config); for (URI addr : cfg.getSentinelAddresses()) { RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts()); @@ -127,8 +125,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { stopThreads(); throw new RedisConnectionException("Can't connect to servers!"); } - init(this.config); - + + initSingleEntry(); + List> connectionFutures = new ArrayList>(cfg.getSentinelAddresses().size()); for (URI addr : cfg.getSentinelAddresses()) { RFuture future = registerSentinel(cfg, addr, this.config); @@ -241,7 +240,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } // to avoid addition twice - if (slaves.putIfAbsent(slaveAddr, true) == null && !config.isSkipSlavesInit()) { + if (slaves.putIfAbsent(slaveAddr, true) == null && !config.checkSkipSlavesInit()) { final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); RFuture future = entry.addSlave(URIBuilder.create(slaveAddr)); future.addListener(new FutureListener() { @@ -313,7 +312,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private void slaveDown(String ip, String port) { - if (config.isSkipSlavesInit()) { + if (config.checkSkipSlavesInit()) { log.warn("slave: {}:{} has down", ip, port); } else { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); @@ -370,7 +369,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } private void slaveUp(String ip, String port) { - if (config.isSkipSlavesInit()) { + if (config.checkSkipSlavesInit()) { String slaveAddr = ip + ":" + port; log.info("slave: {} has up", slaveAddr); return; diff --git a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java index 83d7c1e5c..bc42ef691 100644 --- a/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SingleConnectionManager.java @@ -15,22 +15,11 @@ */ package org.redisson.connection; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import org.redisson.client.RedisConnectionException; import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.ReadMode; import org.redisson.config.SingleServerConfig; import org.redisson.config.SubscriptionMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.concurrent.ScheduledFuture; /** * @@ -39,24 +28,8 @@ import io.netty.util.concurrent.ScheduledFuture; */ public class SingleConnectionManager extends MasterSlaveConnectionManager { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final AtomicReference currentMaster = new AtomicReference(); - - private ScheduledFuture monitorFuture; - public SingleConnectionManager(SingleServerConfig cfg, Config config) { super(create(cfg), config); - - if (cfg.isDnsMonitoring()) { - try { - this.currentMaster.set(InetAddress.getByName(cfg.getAddress().getHost())); - } catch (UnknownHostException e) { - throw new RedisConnectionException("Unknown host: " + cfg.getAddress().getHost(), e); - } - log.debug("DNS monitoring enabled; Current master set to {}", currentMaster.get()); - monitorDnsChange(cfg); - } } private static MasterSlaveServersConfig create(SingleServerConfig cfg) { @@ -84,6 +57,11 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { newconfig.setIdleConnectionTimeout(cfg.getIdleConnectionTimeout()); newconfig.setFailedAttempts(cfg.getFailedAttempts()); newconfig.setReconnectionTimeout(cfg.getReconnectionTimeout()); + if (cfg.isDnsMonitoring()) { + newconfig.setDnsMonitoringInterval(cfg.getDnsMonitoringInterval()); + } else { + newconfig.setDnsMonitoringInterval(-1); + } newconfig.setMasterConnectionMinimumIdleSize(cfg.getConnectionMinimumIdleSize()); newconfig.setSubscriptionConnectionMinimumIdleSize(cfg.getSubscriptionConnectionMinimumIdleSize()); @@ -92,41 +70,4 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager { return newconfig; } - private void monitorDnsChange(final SingleServerConfig cfg) { - monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() { - @Override - public void run() { - // As InetAddress.getByName call is blocking. Method should be run in dedicated thread - getExecutor().execute(new Runnable() { - @Override - public void run() { - try { - InetAddress master = currentMaster.get(); - InetAddress now = InetAddress.getByName(cfg.getAddress().getHost()); - if (!now.getHostAddress().equals(master.getHostAddress())) { - log.info("Detected DNS change. {} has changed from {} to {}", cfg.getAddress().getHost(), master.getHostAddress(), now.getHostAddress()); - if (currentMaster.compareAndSet(master, now)) { - changeMaster(singleSlotRange.getStartSlot(), cfg.getAddress()); - log.info("Master has been changed"); - } - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } finally { - monitorDnsChange(cfg); - } - } - }); - } - - }, cfg.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS); - } - - @Override - public void shutdown() { - if (monitorFuture != null) { - monitorFuture.cancel(true); - } - super.shutdown(); - } } diff --git a/redisson/src/test/java/org/redisson/RedissonTest.java b/redisson/src/test/java/org/redisson/RedissonTest.java index 653a69c73..3660ef171 100644 --- a/redisson/src/test/java/org/redisson/RedissonTest.java +++ b/redisson/src/test/java/org/redisson/RedissonTest.java @@ -32,7 +32,6 @@ import org.redisson.api.NodesGroup; import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.redisson.client.RedisConnectionException; -import org.redisson.client.RedisException; import org.redisson.client.RedisOutOfMemoryException; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ScanObjectEntry; @@ -536,7 +535,7 @@ public class RedissonTest { @Test(expected = RedisConnectionException.class) public void testSentinelConnectionFail() throws InterruptedException { Config config = new Config(); - config.useSentinelServers().addSentinelAddress("redis://127.99.0.1:1111"); + config.useSentinelServers().addSentinelAddress("redis://127.99.0.1:1111").setMasterName("test"); Redisson.create(config); Thread.sleep(1500); From d67b0a0a9f864cb067e529b277cc92acd221c898 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 28 Aug 2017 13:57:59 +0300 Subject: [PATCH 3/4] javadocs fixed --- redisson/src/main/java/org/redisson/api/RKeys.java | 2 +- .../java/org/redisson/config/BaseMasterSlaveServersConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/redisson/src/main/java/org/redisson/api/RKeys.java b/redisson/src/main/java/org/redisson/api/RKeys.java index f3445184b..56bd641b6 100644 --- a/redisson/src/main/java/org/redisson/api/RKeys.java +++ b/redisson/src/main/java/org/redisson/api/RKeys.java @@ -236,7 +236,7 @@ public interface RKeys extends RKeysAsync { *

* Requires Redis 4.0+ * - * @param keys + * @param keys of objects * @return number of removed keys */ long unlink(String ... keys); diff --git a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java index f6e52924a..de10668a5 100644 --- a/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java +++ b/redisson/src/main/java/org/redisson/config/BaseMasterSlaveServersConfig.java @@ -281,7 +281,7 @@ public class BaseMasterSlaveServersConfig + * Interval in milliseconds to check the endpoint's DNS

* Applications must ensure the JVM DNS cache TTL is low enough to support this.

* Set -1 to disable. *

From 8b0aea85c047430331be0e2542f1dbee1597f6fa Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 28 Aug 2017 14:02:34 +0300 Subject: [PATCH 4/4] license header added --- .../java/org/redisson/connection/DNSMonitor.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java index 3f5bb4a00..6e513a0ed 100644 --- a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java +++ b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java @@ -1,3 +1,18 @@ +/** + * Copyright 2016 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.redisson.connection; import java.net.InetAddress;