Merge pull request #1 from redisson/master

merge redisson master
pull/3930/head
deerRule 3 years ago committed by GitHub
commit 478371b854
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,26 +15,26 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetSocketAddress; import io.netty.resolver.AddressResolver;
import java.util.Collection; import io.netty.resolver.AddressResolverGroup;
import java.util.HashMap; import io.netty.util.concurrent.Future;
import java.util.Map; import io.netty.util.concurrent.FutureListener;
import java.util.Map.Entry; import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.AsyncCountDownLatch;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.resolver.AddressResolver; import java.net.InetSocketAddress;
import io.netty.resolver.AddressResolverGroup; import java.util.Collection;
import io.netty.util.concurrent.Future; import java.util.HashMap;
import io.netty.util.concurrent.FutureListener; import java.util.Map;
import io.netty.util.concurrent.ScheduledFuture; import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* DNS changes monitor. * DNS changes monitor.
@ -88,28 +88,26 @@ public class DNSMonitor {
return; return;
} }
AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size()); AsyncCountDownLatch latch = new AsyncCountDownLatch();
monitorMasters(counter); latch.latch(() -> {
monitorSlaves(counter); monitorDnsChange();
}, masters.size() + slaves.size());
monitorMasters(latch);
monitorSlaves(latch);
} }
}, dnsMonitoringInterval, TimeUnit.MILLISECONDS); }, dnsMonitoringInterval, TimeUnit.MILLISECONDS);
} }
private void monitorMasters(AtomicInteger counter) { private void monitorMasters(AsyncCountDownLatch latch) {
for (Entry<RedisURI, InetSocketAddress> entry : masters.entrySet()) { for (Entry<RedisURI, InetSocketAddress> entry : masters.entrySet()) {
log.debug("Request sent to resolve ip address for master host: {}", entry.getKey().getHost()); log.debug("Request sent to resolve ip address for master host: {}", entry.getKey().getHost());
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() { resolveFuture.addListener((FutureListener<InetSocketAddress>) future -> {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
latch.countDown();
return; return;
} }
@ -125,35 +123,34 @@ public class DNSMonitor {
MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr); MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr);
if (masterSlaveEntry == null) { if (masterSlaveEntry == null) {
log.error("Unable to find entry for current master {}", currentMasterAddr); log.error("Unable to find entry for current master {}", currentMasterAddr);
latch.countDown();
return; return;
} }
RFuture<RedisClient> changeFuture = masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey()); RFuture<RedisClient> changeFuture = masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey());
changeFuture.onComplete((r, e) -> { changeFuture.onComplete((r, e) -> {
latch.countDown();
if (e == null) { if (e == null) {
masters.put(entry.getKey(), newMasterAddr); masters.put(entry.getKey(), newMasterAddr);
} }
}); });
} } else {
latch.countDown();
} }
}); });
} }
} }
private void monitorSlaves(AtomicInteger counter) { private void monitorSlaves(AsyncCountDownLatch latch) {
for (Entry<RedisURI, InetSocketAddress> entry : slaves.entrySet()) { for (Entry<RedisURI, InetSocketAddress> entry : slaves.entrySet()) {
log.debug("Request sent to resolve ip address for slave host: {}", entry.getKey().getHost()); log.debug("Request sent to resolve ip address for slave host: {}", entry.getKey().getHost());
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() { resolveFuture.addListener((FutureListener<InetSocketAddress>) future -> {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause()); log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
latch.countDown();
return; return;
} }
@ -172,21 +169,26 @@ public class DNSMonitor {
if (masterSlaveEntry.hasSlave(newSlaveAddr)) { if (masterSlaveEntry.hasSlave(newSlaveAddr)) {
masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER); masterSlaveEntry.slaveUp(newSlaveAddr, FreezeReason.MANAGER);
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER);
slaves.put(entry.getKey(), newSlaveAddr);
latch.countDown();
} else { } else {
RFuture<Void> addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey()); RFuture<Void> addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey());
addFuture.onComplete((res, e) -> { addFuture.onComplete((res, e) -> {
latch.countDown();
if (e != null) { if (e != null) {
log.error("Can't add slave: " + newSlaveAddr, e); log.error("Can't add slave: " + newSlaveAddr, e);
return; return;
} }
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER);
slaves.put(entry.getKey(), newSlaveAddr);
}); });
} }
break; break;
} }
slaves.put(entry.getKey(), newSlaveAddr); } else {
} latch.countDown();
} }
}); });
} }

Loading…
Cancel
Save