From 09b2724c44567035c8806bfe4a79bdf355c816b8 Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 20 Dec 2017 14:36:20 +0300 Subject: [PATCH] Fixed - IP address renew process. #1178 --- .../java/org/redisson/client/RedisClient.java | 27 ++++-- .../redisson/client/RedisClientConfig.java | 8 +- .../connection/ConnectionManager.java | 4 + .../org/redisson/connection/DNSMonitor.java | 68 +++++++------- .../MasterSlaveConnectionManager.java | 89 +++++++++--------- .../redisson/connection/MasterSlaveEntry.java | 92 +++++++++++++++---- .../org/redisson/connection/NodeSource.java | 5 +- .../balancer/LoadBalancerManager.java | 37 +++++++- .../connection/pool/ConnectionPool.java | 4 +- 9 files changed, 221 insertions(+), 113 deletions(-) diff --git a/redisson/src/main/java/org/redisson/client/RedisClient.java b/redisson/src/main/java/org/redisson/client/RedisClient.java index 1da0bd527..4fb81b885 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClient.java +++ b/redisson/src/main/java/org/redisson/client/RedisClient.java @@ -57,10 +57,10 @@ import io.netty.util.concurrent.FutureListener; */ public class RedisClient { - private final AtomicReference> resolveFuture = new AtomicReference>(); + private final AtomicReference> resolvedAddrFuture = new AtomicReference>(); private final Bootstrap bootstrap; private final Bootstrap pubSubBootstrap; - private final URI addr; + private final URI uri; private InetSocketAddress resolvedAddr; private final ChannelGroup channels; @@ -105,7 +105,12 @@ public class RedisClient { this.executor = copy.getExecutor(); this.timer = copy.getTimer(); - addr = copy.getAddress(); + uri = copy.getAddress(); + resolvedAddr = copy.getAddr(); + + if (resolvedAddr != null) { + resolvedAddrFuture.set(RedissonPromise.newSucceededFuture(resolvedAddr)); + } channels = new DefaultChannelGroup(copy.getGroup().next()); bootstrap = createBootstrap(copy, Type.PLAIN); @@ -147,18 +152,22 @@ public class RedisClient { try { return connectAsync().syncUninterruptibly().getNow(); } catch (Exception e) { - throw new RedisConnectionException("Unable to connect to: " + addr, e); + throw new RedisConnectionException("Unable to connect to: " + uri, e); } } public RFuture resolveAddr() { + if (resolvedAddrFuture.get() != null) { + return resolvedAddrFuture.get(); + } + final RPromise promise = new RedissonPromise(); - if (!resolveFuture.compareAndSet(null, promise)) { - return resolveFuture.get(); + if (!resolvedAddrFuture.compareAndSet(null, promise)) { + return resolvedAddrFuture.get(); } AddressResolver resolver = (AddressResolver) bootstrap.config().resolver().getResolver(bootstrap.config().group().next()); - Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort())); + Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())); resolveFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -229,7 +238,7 @@ public class RedisClient { try { return connectPubSubAsync().syncUninterruptibly().getNow(); } catch (Exception e) { - throw new RedisConnectionException("Unable to connect to: " + addr, e); + throw new RedisConnectionException("Unable to connect to: " + uri, e); } } @@ -342,7 +351,7 @@ public class RedisClient { @Override public String toString() { - return "[addr=" + addr + "]"; + return "[addr=" + uri + "]"; } } diff --git a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java index 88987e588..9f9407fd7 100644 --- a/redisson/src/main/java/org/redisson/client/RedisClientConfig.java +++ b/redisson/src/main/java/org/redisson/client/RedisClientConfig.java @@ -15,7 +15,7 @@ */ package org.redisson.client; -import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.URI; import java.util.concurrent.ExecutorService; @@ -36,7 +36,7 @@ import io.netty.util.Timer; public class RedisClientConfig { private URI address; - private InetAddress addr; + private InetSocketAddress addr; private Timer timer; private ExecutorService executor; @@ -99,7 +99,7 @@ public class RedisClientConfig { this.address = URIBuilder.create(address); return this; } - public RedisClientConfig setAddress(InetAddress addr, URI address) { + public RedisClientConfig setAddress(InetSocketAddress addr, URI address) { this.addr = addr; this.address = address; return this; @@ -111,7 +111,7 @@ public class RedisClientConfig { public URI getAddress() { return address; } - public InetAddress getAddr() { + public InetSocketAddress getAddr() { return addr; } diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index a6cc33447..db7f8043b 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -88,6 +88,8 @@ public interface ConnectionManager { MasterSlaveEntry getEntry(int slot); + MasterSlaveEntry getEntry(InetSocketAddress address); + RPromise newPromise(); void releaseRead(NodeSource source, RedisConnection connection); @@ -100,6 +102,8 @@ public interface ConnectionManager { RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout); + RedisClient createClient(NodeType type, InetSocketAddress address, URI uri); + RedisClient createClient(NodeType type, URI address); MasterSlaveEntry getEntry(RedisClient redisClient); diff --git a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java index ef7176e28..174bd7e18 100644 --- a/redisson/src/main/java/org/redisson/connection/DNSMonitor.java +++ b/redisson/src/main/java/org/redisson/connection/DNSMonitor.java @@ -15,7 +15,6 @@ */ package org.redisson.connection; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; import java.util.Collection; @@ -26,8 +25,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.redisson.api.RFuture; +import org.redisson.client.RedisClient; import org.redisson.connection.ClientConnectionsEntry.FreezeReason; -import org.redisson.misc.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,22 +48,22 @@ public class DNSMonitor { private final AddressResolver resolver; private final ConnectionManager connectionManager; - private final Map masters = new HashMap(); - private final Map slaves = new HashMap(); + private final Map masters = new HashMap(); + private final Map slaves = new HashMap(); private ScheduledFuture dnsMonitorFuture; private long dnsMonitoringInterval; - public DNSMonitor(ConnectionManager connectionManager, InetSocketAddress masterHost, Collection slaveHosts, long dnsMonitoringInterval, DnsAddressResolverGroup resolverGroup) { + public DNSMonitor(ConnectionManager connectionManager, RedisClient masterHost, Collection slaveHosts, long dnsMonitoringInterval, DnsAddressResolverGroup resolverGroup) { this.resolver = resolverGroup.getResolver(connectionManager.getGroup().next()); - URI uri = URIBuilder.create("redis://" + masterHost.getAddress().getHostAddress() + ":" + masterHost.getPort()); - masters.put(uri, masterHost.getAddress()); + masterHost.resolveAddr().syncUninterruptibly(); + masters.put(masterHost.getConfig().getAddress(), masterHost.getAddr()); for (URI host : slaveHosts) { - Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0)); + Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), host.getPort())); resolveFuture.syncUninterruptibly(); - slaves.put(host, resolveFuture.getNow().getAddress()); + slaves.put(host, resolveFuture.getNow()); } this.connectionManager = connectionManager; this.dnsMonitoringInterval = dnsMonitoringInterval; @@ -86,8 +85,8 @@ public class DNSMonitor { @Override public void run() { final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size()); - for (final Entry entry : masters.entrySet()) { - Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0)); + for (final Entry entry : masters.entrySet()) { + Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); resolveFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -100,25 +99,25 @@ public class DNSMonitor { return; } - InetAddress master = entry.getValue(); - InetAddress now = future.get().getAddress(); - if (!now.getHostAddress().equals(master.getHostAddress())) { - log.info("Detected DNS change. Master {} has changed ip from {} to {}", entry.getKey(), master.getHostAddress(), now.getHostAddress()); - for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) { - if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost()) - && entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) { - entrySet.changeMaster(entry.getKey()); - break; - } + InetSocketAddress currentMasterAddr = entry.getValue(); + InetSocketAddress newMasterAddr = future.getNow(); + if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) { + log.info("Detected DNS change. Master {} has changed ip from {} to {}", + entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), newMasterAddr.getAddress().getHostAddress()); + MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr); + if (masterSlaveEntry == null) { + log.error("Unable to find master entry for {}", currentMasterAddr); + return; } - masters.put(entry.getKey(), now); + masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey()); + masters.put(entry.getKey(), newMasterAddr); } } }); } - for (final Entry entry : slaves.entrySet()) { - Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0)); + for (final Entry entry : slaves.entrySet()) { + Future resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort())); resolveFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -131,30 +130,29 @@ public class DNSMonitor { return; } - InetAddress slave = entry.getValue(); - final InetAddress updatedSlave = future.get().getAddress(); - if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) { - log.info("Detected DNS change. Slave {} has changed ip from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress()); + final InetSocketAddress currentSlaveAddr = entry.getValue(); + final InetSocketAddress newSlaveAddr = future.getNow(); + if (!newSlaveAddr.getAddress().equals(currentSlaveAddr.getAddress())) { + log.info("Detected DNS change. Slave {} has changed ip from {} to {}", + entry.getKey().getHost(), currentSlaveAddr.getAddress().getHostAddress(), newSlaveAddr.getAddress().getHostAddress()); for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { - final URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort()); - - if (masterSlaveEntry.hasSlave(uri)) { - RFuture addFuture = masterSlaveEntry.addSlave(entry.getKey()); + if (masterSlaveEntry.hasSlave(currentSlaveAddr)) { + RFuture addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey()); addFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - log.error("Can't add slave: " + updatedSlave, future.cause()); + log.error("Can't add slave: " + newSlaveAddr, future.cause()); return; } - masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER); + masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); } }); break; } } - slaves.put(entry.getKey(), updatedSlave); + slaves.put(entry.getKey(), newSlaveAddr); } } }); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 47308e9e3..594cafe18 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -286,9 +286,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { addEntry(slot, entry); } - InetSocketAddress masterHost = f.getNow().resolveAddr().syncUninterruptibly().getNow(); if (config.getDnsMonitoringInterval() != -1) { - dnsMonitor = new DNSMonitor(this, masterHost, + dnsMonitor = new DNSMonitor(this, f.getNow(), config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup); dnsMonitor.start(); } @@ -349,6 +348,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type)); return client; } + + @Override + public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri) { + RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts()); + clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type)); + return client; + } @Override public void shutdownAsync(RedisClient client) { @@ -363,6 +369,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager { RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout); return RedisClient.create(redisConfig); } + + private RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, int timeout, int commandTimeout) { + RedisClientConfig redisConfig = createRedisConfig(type, null, timeout, commandTimeout); + redisConfig.setAddress(address, uri); + return RedisClient.create(redisConfig); + } + protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) { RedisClientConfig redisConfig = new RedisClientConfig(); @@ -677,7 +690,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return newSucceededFuture(entryCodec); } - public MasterSlaveEntry getEntry(URI addr) { + public MasterSlaveEntry getEntry(InetSocketAddress address) { + for (MasterSlaveEntry entry : client2entry.values()) { + InetSocketAddress addr = entry.getClient().getAddr(); + if (addr.getAddress().equals(address.getAddress()) && addr.getPort() == address.getPort()) { + return entry; + } + } + return null; + } + + private MasterSlaveEntry getEntry(URI addr) { for (MasterSlaveEntry entry : client2entry.values()) { if (URIBuilder.compare(entry.getClient().getAddr(), addr)) { return entry; @@ -732,28 +755,19 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public RFuture connectionWriteOp(NodeSource source, RedisCommand command) { - MasterSlaveEntry entry = source.getEntry(); - if (entry == null) { - entry = getEntry(source); - } + MasterSlaveEntry entry = getEntry(source); if (entry == null) { - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet"); + RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source + " hasn't been discovered yet"); return RedissonPromise.newFailedFuture(ex); } return entry.connectionWriteOp(command); } private MasterSlaveEntry getEntry(NodeSource source) { - // slots handling during migration state if (source.getRedirect() != null) { return getEntry(source.getAddr()); } - - return getEntry(source.getSlot()); - } - @Override - public RFuture connectionReadOp(NodeSource source, RedisCommand command) { MasterSlaveEntry entry = source.getEntry(); if (entry == null && source.getSlot() != null) { entry = getEntry(source.getSlot()); @@ -761,30 +775,21 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (source.getRedisClient() != null) { entry = getEntry(source.getRedisClient()); } - if (source.getAddr() != null) { - entry = getEntry(source.getAddr()); - if (entry == null) { - for (MasterSlaveEntry e : getEntrySet()) { - if (e.hasSlave(source.getAddr())) { - entry = e; - break; - } - } - } - - if (entry == null) { - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet"); - return RedissonPromise.newFailedFuture(ex); - } - - return entry.connectionReadOp(command, source.getAddr()); - } - + return entry; + } + + @Override + public RFuture connectionReadOp(NodeSource source, RedisCommand command) { + MasterSlaveEntry entry = getEntry(source); if (entry == null) { - RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet"); + RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source + " hasn't been discovered yet"); return RedissonPromise.newFailedFuture(ex); } + if (source.getRedirect() != null) { + return entry.connectionReadOp(command, source.getAddr()); + } + return entry.connectionReadOp(command); } @@ -800,7 +805,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) { MasterSlaveEntry entry = getEntry(slot); if (entry == null) { - log.error("Node for slot: " + slot + " hasn't been discovered yet"); + log.error("Node for slot: " + slot + " can't be found"); } else { entry.returnPubSubConnection(pubSubEntry); } @@ -808,12 +813,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void releaseWrite(NodeSource source, RedisConnection connection) { - MasterSlaveEntry entry = source.getEntry(); + MasterSlaveEntry entry = getEntry(source); if (entry == null) { - entry = getEntry(source); - } - if (entry == null) { - log.error("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet"); + log.error("Node: " + source + " can't be found"); } else { entry.releaseWrite(connection); } @@ -821,12 +823,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public void releaseRead(NodeSource source, RedisConnection connection) { - MasterSlaveEntry entry = source.getEntry(); - if (entry == null) { - entry = getEntry(source); - } + MasterSlaveEntry entry = getEntry(source); if (entry == null) { - log.error("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet"); + log.error("Node: " + source + " can't be found"); } else { entry.releaseRead(connection); } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index fd86dc1a0..bf37aebe1 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -108,11 +108,20 @@ public class MasterSlaveEntry { } return result; } + + public RFuture setupMasterEntry(InetSocketAddress address, URI uri) { + RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri); + return setupMasterEntry(client); + } + - public RPromise setupMasterEntry(URI address) { - final RPromise result = new RedissonPromise(); + public RFuture setupMasterEntry(URI address) { + RedisClient client = connectionManager.createClient(NodeType.MASTER, address); + return setupMasterEntry(client); + } - final RedisClient client = connectionManager.createClient(NodeType.MASTER, address); + private RFuture setupMasterEntry(final RedisClient client) { + final RPromise result = new RedissonPromise(); RFuture addrFuture = client.resolveAddr(); addrFuture.addListener(new FutureListener() { @@ -148,7 +157,16 @@ public class MasterSlaveEntry { return result; } - public boolean slaveDown(URI address, FreezeReason freezeReason) { + public boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReason) { + ClientConnectionsEntry e = slaveBalancer.freeze(entry, freezeReason); + if (e == null) { + return false; + } + + return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); + } + + public boolean slaveDown(InetSocketAddress address, FreezeReason freezeReason) { ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason); if (entry == null) { return false; @@ -157,8 +175,8 @@ public class MasterSlaveEntry { return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); } - public boolean slaveDown(RedisClient redisClient, FreezeReason freezeReason) { - ClientConnectionsEntry entry = slaveBalancer.freeze(redisClient, freezeReason); + public boolean slaveDown(URI address, FreezeReason freezeReason) { + ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason); if (entry == null) { return false; } @@ -169,9 +187,8 @@ public class MasterSlaveEntry { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { // add master as slave if no more slaves available if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { - URI addr = masterEntry.getClient().getConfig().getAddress(); - if (slaveUp(addr, FreezeReason.SYSTEM)) { - log.info("master {} used as slave", addr); + if (slaveBalancer.unfreeze(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM)) { + log.info("master {} used as slave", masterEntry.getClient().getAddr()); } } @@ -344,6 +361,10 @@ public class MasterSlaveEntry { return slaveBalancer.contains(redisClient); } + public boolean hasSlave(InetSocketAddress addr) { + return slaveBalancer.contains(addr); + } + public boolean hasSlave(URI addr) { return slaveBalancer.contains(addr); } @@ -352,9 +373,11 @@ public class MasterSlaveEntry { return addSlave(address, false, NodeType.SLAVE); } - private RFuture addSlave(URI address, final boolean freezed, final NodeType nodeType) { - final RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); - + public RFuture addSlave(InetSocketAddress address, URI uri) { + return addSlave(address, uri, false, NodeType.SLAVE); + } + + private RFuture addSlave(final RedisClient client, final boolean freezed, final NodeType nodeType) { final RPromise result = new RedissonPromise(); RFuture addrFuture = client.resolveAddr(); addrFuture.addListener(new FutureListener() { @@ -364,7 +387,7 @@ public class MasterSlaveEntry { result.tryFailure(future.cause()); return; } - + ClientConnectionsEntry entry = new ClientConnectionsEntry(client, config.getSlaveConnectionMinimumIdleSize(), config.getSlaveConnectionPoolSize(), @@ -382,11 +405,36 @@ public class MasterSlaveEntry { }); return result; } + + private RFuture addSlave(InetSocketAddress address, URI uri, final boolean freezed, final NodeType nodeType) { + RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri); + return addSlave(client, freezed, nodeType); + } + + private RFuture addSlave(URI address, final boolean freezed, final NodeType nodeType) { + RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); + return addSlave(client, freezed, nodeType); + } public RedisClient getClient() { return masterEntry.getClient(); } + public boolean slaveUp(ClientConnectionsEntry entry, FreezeReason freezeReason) { + if (!slaveBalancer.unfreeze(entry, freezeReason)) { + return false; + } + + InetSocketAddress addr = masterEntry.getClient().getAddr(); + // exclude master from slaves + if (!config.checkSkipSlavesInit() + && !addr.equals(entry.getClient().getAddr())) { + slaveDown(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM); + log.info("master {} excluded from slaves", addr); + } + return true; + } + public boolean slaveUp(URI address, FreezeReason freezeReason) { if (!slaveBalancer.unfreeze(address, freezeReason)) { return false; @@ -396,7 +444,7 @@ public class MasterSlaveEntry { // exclude master from slaves if (!config.checkSkipSlavesInit() && !URIBuilder.compare(addr, address)) { - slaveDown(masterEntry.getClient().getConfig().getAddress(), FreezeReason.SYSTEM); + slaveDown(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM); log.info("master {} excluded from slaves", addr); } return true; @@ -409,9 +457,21 @@ public class MasterSlaveEntry { * * @param address of Redis */ - public void changeMaster(final URI address) { + public void changeMaster(URI address) { final ClientConnectionsEntry oldMaster = masterEntry; RFuture future = setupMasterEntry(address); + changeMaster(address, oldMaster, future); + } + + public void changeMaster(InetSocketAddress address, URI uri) { + final ClientConnectionsEntry oldMaster = masterEntry; + RFuture future = setupMasterEntry(address, uri); + changeMaster(uri, oldMaster, future); + } + + + private void changeMaster(final URI address, final ClientConnectionsEntry oldMaster, + RFuture future) { future.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { @@ -434,7 +494,7 @@ public class MasterSlaveEntry { // more than one slave available, so master can be removed from slaves if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() > 1) { - slaveDown(newMasterClient, FreezeReason.SYSTEM); + slaveDown(newMasterClient.getAddr(), FreezeReason.SYSTEM); } connectionManager.shutdownAsync(oldMaster.getClient()); log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr()); diff --git a/redisson/src/main/java/org/redisson/connection/NodeSource.java b/redisson/src/main/java/org/redisson/connection/NodeSource.java index f7953e77c..eaec15a47 100644 --- a/redisson/src/main/java/org/redisson/connection/NodeSource.java +++ b/redisson/src/main/java/org/redisson/connection/NodeSource.java @@ -80,7 +80,10 @@ public class NodeSource { @Override public String toString() { - return "NodeSource [slot=" + slot + ", addr=" + addr + ", redirect=" + redirect + "]"; + return "NodeSource [slot=" + slot + ", addr=" + addr + ", redisClient=" + redisClient + ", redirect=" + redirect + + ", entry=" + entry + "]"; } + + } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 3fafa2014..7758903be 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -121,6 +121,20 @@ public class LoadBalancerManager { throw new IllegalStateException("Can't find " + address + " in slaves!"); } + return unfreeze(entry, freezeReason); + } + + public boolean unfreeze(InetSocketAddress address, FreezeReason freezeReason) { + ClientConnectionsEntry entry = getEntry(address); + if (entry == null) { + throw new IllegalStateException("Can't find " + address + " in slaves!"); + } + + return unfreeze(entry, freezeReason); + } + + + public boolean unfreeze(ClientConnectionsEntry entry, FreezeReason freezeReason) { synchronized (entry) { if (!entry.isFreezed()) { return false; @@ -136,12 +150,18 @@ public class LoadBalancerManager { } return false; } - + public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) { ClientConnectionsEntry connectionEntry = getEntry(address); return freeze(connectionEntry, freezeReason); } + public ClientConnectionsEntry freeze(InetSocketAddress address, FreezeReason freezeReason) { + ClientConnectionsEntry connectionEntry = getEntry(address); + return freeze(connectionEntry, freezeReason); + } + + public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) { ClientConnectionsEntry connectionEntry = getEntry(redisClient); return freeze(connectionEntry, freezeReason); @@ -172,6 +192,10 @@ public class LoadBalancerManager { return pubSubConnectionPool.get(); } + public boolean contains(InetSocketAddress addr) { + return getEntry(addr) != null; + } + public boolean contains(URI addr) { return getEntry(addr) != null; } @@ -190,6 +214,17 @@ public class LoadBalancerManager { return null; } + protected ClientConnectionsEntry getEntry(InetSocketAddress address) { + for (ClientConnectionsEntry entry : client2Entry.values()) { + InetSocketAddress addr = entry.getClient().getAddr(); + if (addr.getAddress().equals(address.getAddress()) && addr.getPort() == address.getPort()) { + return entry; + } + } + return null; + } + + protected ClientConnectionsEntry getEntry(RedisClient redisClient) { return client2Entry.get(redisClient); } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 9c8d3542f..e8ffc59d0 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -325,7 +325,7 @@ abstract class ConnectionPool { private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) { if (entry.getNodeType() == NodeType.SLAVE) { - masterSlaveEntry.slaveDown(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); + masterSlaveEntry.slaveDown(entry, FreezeReason.RECONNECT); log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause); scheduleCheck(entry); } else { @@ -392,7 +392,7 @@ abstract class ConnectionPool { public void operationComplete(Future future) throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { - masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); + masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT); log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); } else { synchronized (entry) {