From 61e561c6c054f27976a03162fe920c2a375361f1 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Tue, 2 Nov 2021 11:24:02 +0300 Subject: [PATCH] Fixed - DNS monitor makes a new attempt to change master while current attempt wasn't finished. #3924 --- .../org/redisson/connection/DNSMonitor.java | 170 +++++++++--------- 1 file changed, 86 insertions(+), 84 deletions(-) diff --git a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java index 87dbc7fe6..ccc7560f1 100644 --- a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java +++ b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java @@ -15,26 +15,26 @@ */ package org.redisson.connection; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - +import io.netty.resolver.AddressResolver; +import io.netty.resolver.AddressResolverGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ScheduledFuture; import org.redisson.api.RFuture; import org.redisson.client.RedisClient; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; +import org.redisson.misc.AsyncCountDownLatch; import org.redisson.misc.RedisURI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.resolver.AddressResolver; -import io.netty.resolver.AddressResolverGroup; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.ScheduledFuture; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * DNS changes monitor. @@ -87,106 +87,108 @@ public class DNSMonitor { if (connectionManager.isShuttingDown()) { return; } - - AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size()); - monitorMasters(counter); - monitorSlaves(counter); + + AsyncCountDownLatch latch = new AsyncCountDownLatch(); + latch.latch(() -> { + monitorDnsChange(); + }, masters.size() + slaves.size()); + monitorMasters(latch); + monitorSlaves(latch); } }, dnsMonitoringInterval, TimeUnit.MILLISECONDS); } - private void monitorMasters(AtomicInteger counter) { + private void monitorMasters(AsyncCountDownLatch latch) { for (Entry entry : masters.entrySet()) { log.debug("Request sent to resolve ip address for master host: {}", entry.getKey().getHost()); Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); - resolveFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (counter.decrementAndGet() == 0) { - monitorDnsChange(); - } + resolveFuture.addListener((FutureListener) future -> { + if (!future.isSuccess()) { + log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); + latch.countDown(); + return; + } + + log.debug("Resolved ip: {} for master host: {}", future.getNow().getAddress(), entry.getKey().getHost()); - if (!future.isSuccess()) { - log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); + InetSocketAddress currentMasterAddr = entry.getValue(); + InetSocketAddress newMasterAddr = future.getNow(); + if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) { + log.info("Detected DNS change. Master {} has changed ip from {} to {}", + entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), + newMasterAddr.getAddress().getHostAddress()); + + MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr); + if (masterSlaveEntry == null) { + log.error("Unable to find entry for current master {}", currentMasterAddr); + latch.countDown(); return; } - log.debug("Resolved ip: {} for master host: {}", future.getNow().getAddress(), entry.getKey().getHost()); - - InetSocketAddress currentMasterAddr = entry.getValue(); - InetSocketAddress newMasterAddr = future.getNow(); - if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) { - log.info("Detected DNS change. Master {} has changed ip from {} to {}", - entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), - newMasterAddr.getAddress().getHostAddress()); + RFuture changeFuture = masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey()); + changeFuture.onComplete((r, e) -> { + latch.countDown(); - MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr); - if (masterSlaveEntry == null) { - log.error("Unable to find entry for current master {}", currentMasterAddr); - return; + if (e == null) { + masters.put(entry.getKey(), newMasterAddr); } - - RFuture changeFuture = masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey()); - changeFuture.onComplete((r, e) -> { - if (e == null) { - masters.put(entry.getKey(), newMasterAddr); - } - }); - } + }); + } else { + latch.countDown(); } }); } } - private void monitorSlaves(AtomicInteger counter) { + private void monitorSlaves(AsyncCountDownLatch latch) { for (Entry entry : slaves.entrySet()) { log.debug("Request sent to resolve ip address for slave host: {}", entry.getKey().getHost()); Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); - resolveFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - if (counter.decrementAndGet() == 0) { - monitorDnsChange(); - } + resolveFuture.addListener((FutureListener) future -> { + if (!future.isSuccess()) { + log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); + latch.countDown(); + return; + } - if (!future.isSuccess()) { - log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); - return; - } + log.debug("Resolved ip: {} for slave host: {}", future.getNow().getAddress(), entry.getKey().getHost()); + + InetSocketAddress currentSlaveAddr = entry.getValue(); + InetSocketAddress newSlaveAddr = future.getNow(); + if (!newSlaveAddr.getAddress().equals(currentSlaveAddr.getAddress())) { + log.info("Detected DNS change. Slave {} has changed ip from {} to {}", + entry.getKey().getHost(), currentSlaveAddr.getAddress().getHostAddress(), newSlaveAddr.getAddress().getHostAddress()); + for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { + if (!masterSlaveEntry.hasSlave(currentSlaveAddr)) { + continue; + } + + if (masterSlaveEntry.hasSlave(newSlaveAddr)) { + masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER); + masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); + slaves.put(entry.getKey(), newSlaveAddr); + latch.countDown(); + } else { + RFuture addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey()); + addFuture.onComplete((res, e) -> { + latch.countDown(); + + if (e != null) { + log.error("Can't add slave: " + newSlaveAddr, e); + return; + } - log.debug("Resolved ip: {} for slave host: {}", future.getNow().getAddress(), entry.getKey().getHost()); - - InetSocketAddress currentSlaveAddr = entry.getValue(); - InetSocketAddress newSlaveAddr = future.getNow(); - if (!newSlaveAddr.getAddress().equals(currentSlaveAddr.getAddress())) { - log.info("Detected DNS change. Slave {} has changed ip from {} to {}", - entry.getKey().getHost(), currentSlaveAddr.getAddress().getHostAddress(), newSlaveAddr.getAddress().getHostAddress()); - for (MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { - if (!masterSlaveEntry.hasSlave(currentSlaveAddr)) { - continue; - } - - if (masterSlaveEntry.hasSlave(newSlaveAddr)) { - masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER); masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); - } else { - RFuture addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey()); - addFuture.onComplete((res, e) -> { - if (e != null) { - log.error("Can't add slave: " + newSlaveAddr, e); - return; - } - - masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); - }); - } - break; + slaves.put(entry.getKey(), newSlaveAddr); + }); } - slaves.put(entry.getKey(), newSlaveAddr); + break; } + } else { + latch.countDown(); } }); }