Merge branch 'master' into 3.0.0

pull/1303/head
Nikita 7 years ago
commit 70af89bff3

@ -57,10 +57,10 @@ import io.netty.util.concurrent.FutureListener;
*/ */
public class RedisClient { public class RedisClient {
private final AtomicReference<RFuture<InetSocketAddress>> resolveFuture = new AtomicReference<RFuture<InetSocketAddress>>(); private final AtomicReference<RFuture<InetSocketAddress>> resolvedAddrFuture = new AtomicReference<RFuture<InetSocketAddress>>();
private final Bootstrap bootstrap; private final Bootstrap bootstrap;
private final Bootstrap pubSubBootstrap; private final Bootstrap pubSubBootstrap;
private final URI addr; private final URI uri;
private InetSocketAddress resolvedAddr; private InetSocketAddress resolvedAddr;
private final ChannelGroup channels; private final ChannelGroup channels;
@ -105,7 +105,12 @@ public class RedisClient {
this.executor = copy.getExecutor(); this.executor = copy.getExecutor();
this.timer = copy.getTimer(); this.timer = copy.getTimer();
addr = copy.getAddress(); uri = copy.getAddress();
resolvedAddr = copy.getAddr();
if (resolvedAddr != null) {
resolvedAddrFuture.set(RedissonPromise.newSucceededFuture(resolvedAddr));
}
channels = new DefaultChannelGroup(copy.getGroup().next()); channels = new DefaultChannelGroup(copy.getGroup().next());
bootstrap = createBootstrap(copy, Type.PLAIN); bootstrap = createBootstrap(copy, Type.PLAIN);
@ -147,18 +152,22 @@ public class RedisClient {
try { try {
return connectAsync().syncUninterruptibly().getNow(); return connectAsync().syncUninterruptibly().getNow();
} catch (Exception e) { } catch (Exception e) {
throw new RedisConnectionException("Unable to connect to: " + addr, e); throw new RedisConnectionException("Unable to connect to: " + uri, e);
} }
} }
public RFuture<InetSocketAddress> resolveAddr() { public RFuture<InetSocketAddress> resolveAddr() {
if (resolvedAddrFuture.get() != null) {
return resolvedAddrFuture.get();
}
final RPromise<InetSocketAddress> promise = new RedissonPromise<InetSocketAddress>(); final RPromise<InetSocketAddress> promise = new RedissonPromise<InetSocketAddress>();
if (!resolveFuture.compareAndSet(null, promise)) { if (!resolvedAddrFuture.compareAndSet(null, promise)) {
return resolveFuture.get(); return resolvedAddrFuture.get();
} }
AddressResolver<InetSocketAddress> resolver = (AddressResolver<InetSocketAddress>) bootstrap.config().resolver().getResolver(bootstrap.config().group().next()); AddressResolver<InetSocketAddress> resolver = (AddressResolver<InetSocketAddress>) bootstrap.config().resolver().getResolver(bootstrap.config().group().next());
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(addr.getHost(), addr.getPort())); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() { resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override @Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception { public void operationComplete(Future<InetSocketAddress> future) throws Exception {
@ -229,7 +238,7 @@ public class RedisClient {
try { try {
return connectPubSubAsync().syncUninterruptibly().getNow(); return connectPubSubAsync().syncUninterruptibly().getNow();
} catch (Exception e) { } catch (Exception e) {
throw new RedisConnectionException("Unable to connect to: " + addr, e); throw new RedisConnectionException("Unable to connect to: " + uri, e);
} }
} }
@ -342,7 +351,7 @@ public class RedisClient {
@Override @Override
public String toString() { public String toString() {
return "[addr=" + addr + "]"; return "[addr=" + uri + "]";
} }
} }

@ -15,7 +15,7 @@
*/ */
package org.redisson.client; package org.redisson.client;
import java.net.InetAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -36,7 +36,7 @@ import io.netty.util.Timer;
public class RedisClientConfig { public class RedisClientConfig {
private URI address; private URI address;
private InetAddress addr; private InetSocketAddress addr;
private Timer timer; private Timer timer;
private ExecutorService executor; private ExecutorService executor;
@ -99,7 +99,7 @@ public class RedisClientConfig {
this.address = URIBuilder.create(address); this.address = URIBuilder.create(address);
return this; return this;
} }
public RedisClientConfig setAddress(InetAddress addr, URI address) { public RedisClientConfig setAddress(InetSocketAddress addr, URI address) {
this.addr = addr; this.addr = addr;
this.address = address; this.address = address;
return this; return this;
@ -111,7 +111,7 @@ public class RedisClientConfig {
public URI getAddress() { public URI getAddress() {
return address; return address;
} }
public InetAddress getAddr() { public InetSocketAddress getAddr() {
return addr; return addr;
} }

@ -88,6 +88,8 @@ public interface ConnectionManager {
MasterSlaveEntry getEntry(int slot); MasterSlaveEntry getEntry(int slot);
MasterSlaveEntry getEntry(InetSocketAddress address);
<R> RPromise<R> newPromise(); <R> RPromise<R> newPromise();
void releaseRead(NodeSource source, RedisConnection connection); void releaseRead(NodeSource source, RedisConnection connection);
@ -100,6 +102,8 @@ public interface ConnectionManager {
RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout); RedisClient createClient(NodeType type, URI address, int timeout, int commandTimeout);
RedisClient createClient(NodeType type, InetSocketAddress address, URI uri);
RedisClient createClient(NodeType type, URI address); RedisClient createClient(NodeType type, URI address);
MasterSlaveEntry getEntry(RedisClient redisClient); MasterSlaveEntry getEntry(RedisClient redisClient);

@ -15,7 +15,6 @@
*/ */
package org.redisson.connection; package org.redisson.connection;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
@ -26,8 +25,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -49,22 +48,22 @@ public class DNSMonitor {
private final AddressResolver<InetSocketAddress> resolver; private final AddressResolver<InetSocketAddress> resolver;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Map<URI, InetAddress> masters = new HashMap<URI, InetAddress>(); private final Map<URI, InetSocketAddress> masters = new HashMap<URI, InetSocketAddress>();
private final Map<URI, InetAddress> slaves = new HashMap<URI, InetAddress>(); private final Map<URI, InetSocketAddress> slaves = new HashMap<URI, InetSocketAddress>();
private ScheduledFuture<?> dnsMonitorFuture; private ScheduledFuture<?> dnsMonitorFuture;
private long dnsMonitoringInterval; private long dnsMonitoringInterval;
public DNSMonitor(ConnectionManager connectionManager, InetSocketAddress masterHost, Collection<URI> slaveHosts, long dnsMonitoringInterval, DnsAddressResolverGroup resolverGroup) { public DNSMonitor(ConnectionManager connectionManager, RedisClient masterHost, Collection<URI> slaveHosts, long dnsMonitoringInterval, DnsAddressResolverGroup resolverGroup) {
this.resolver = resolverGroup.getResolver(connectionManager.getGroup().next()); this.resolver = resolverGroup.getResolver(connectionManager.getGroup().next());
URI uri = URIBuilder.create("redis://" + masterHost.getAddress().getHostAddress() + ":" + masterHost.getPort()); masterHost.resolveAddr().syncUninterruptibly();
masters.put(uri, masterHost.getAddress()); masters.put(masterHost.getConfig().getAddress(), masterHost.getAddr());
for (URI host : slaveHosts) { for (URI host : slaveHosts) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), 0)); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
resolveFuture.syncUninterruptibly(); resolveFuture.syncUninterruptibly();
slaves.put(host, resolveFuture.getNow().getAddress()); slaves.put(host, resolveFuture.getNow());
} }
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.dnsMonitoringInterval = dnsMonitoringInterval; this.dnsMonitoringInterval = dnsMonitoringInterval;
@ -86,8 +85,8 @@ public class DNSMonitor {
@Override @Override
public void run() { public void run() {
final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size()); final AtomicInteger counter = new AtomicInteger(masters.size() + slaves.size());
for (final Entry<URI, InetAddress> entry : masters.entrySet()) { for (final Entry<URI, InetSocketAddress> entry : masters.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0)); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() { resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override @Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception { public void operationComplete(Future<InetSocketAddress> future) throws Exception {
@ -100,25 +99,25 @@ public class DNSMonitor {
return; return;
} }
InetAddress master = entry.getValue(); InetSocketAddress currentMasterAddr = entry.getValue();
InetAddress now = future.get().getAddress(); InetSocketAddress newMasterAddr = future.getNow();
if (!now.getHostAddress().equals(master.getHostAddress())) { if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) {
log.info("Detected DNS change. Master {} has changed ip from {} to {}", entry.getKey(), master.getHostAddress(), now.getHostAddress()); log.info("Detected DNS change. Master {} has changed ip from {} to {}",
for (MasterSlaveEntry entrySet : connectionManager.getEntrySet()) { entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), newMasterAddr.getAddress().getHostAddress());
if (entrySet.getClient().getAddr().getHostName().equals(entry.getKey().getHost()) MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr);
&& entrySet.getClient().getAddr().getPort() == entry.getKey().getPort()) { if (masterSlaveEntry == null) {
entrySet.changeMaster(entry.getKey()); log.error("Unable to find master entry for {}", currentMasterAddr);
break; return;
}
} }
masters.put(entry.getKey(), now); masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey());
masters.put(entry.getKey(), newMasterAddr);
} }
} }
}); });
} }
for (final Entry<URI, InetAddress> entry : slaves.entrySet()) { for (final Entry<URI, InetSocketAddress> entry : slaves.entrySet()) {
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), 0)); Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() { resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override @Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception { public void operationComplete(Future<InetSocketAddress> future) throws Exception {
@ -131,30 +130,29 @@ public class DNSMonitor {
return; return;
} }
InetAddress slave = entry.getValue(); final InetSocketAddress currentSlaveAddr = entry.getValue();
final InetAddress updatedSlave = future.get().getAddress(); final InetSocketAddress newSlaveAddr = future.getNow();
if (!updatedSlave.getHostAddress().equals(slave.getHostAddress())) { if (!newSlaveAddr.getAddress().equals(currentSlaveAddr.getAddress())) {
log.info("Detected DNS change. Slave {} has changed ip from {} to {}", entry.getKey().getHost(), slave.getHostAddress(), updatedSlave.getHostAddress()); log.info("Detected DNS change. Slave {} has changed ip from {} to {}",
entry.getKey().getHost(), currentSlaveAddr.getAddress().getHostAddress(), newSlaveAddr.getAddress().getHostAddress());
for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) { for (final MasterSlaveEntry masterSlaveEntry : connectionManager.getEntrySet()) {
final URI uri = URIBuilder.create(slave.getHostAddress() + ":" + entry.getKey().getPort()); if (masterSlaveEntry.hasSlave(currentSlaveAddr)) {
RFuture<Void> addFuture = masterSlaveEntry.addSlave(newSlaveAddr, entry.getKey());
if (masterSlaveEntry.hasSlave(uri)) {
RFuture<Void> addFuture = masterSlaveEntry.addSlave(entry.getKey());
addFuture.addListener(new FutureListener<Void>() { addFuture.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("Can't add slave: " + updatedSlave, future.cause()); log.error("Can't add slave: " + newSlaveAddr, future.cause());
return; return;
} }
masterSlaveEntry.slaveDown(uri, FreezeReason.MANAGER); masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER);
} }
}); });
break; break;
} }
} }
slaves.put(entry.getKey(), updatedSlave); slaves.put(entry.getKey(), newSlaveAddr);
} }
} }
}); });

@ -286,9 +286,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
addEntry(slot, entry); addEntry(slot, entry);
} }
InetSocketAddress masterHost = f.getNow().resolveAddr().syncUninterruptibly().getNow();
if (config.getDnsMonitoringInterval() != -1) { if (config.getDnsMonitoringInterval() != -1) {
dnsMonitor = new DNSMonitor(this, masterHost, dnsMonitor = new DNSMonitor(this, f.getNow(),
config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup); config.getSlaveAddresses(), config.getDnsMonitoringInterval(), resolverGroup);
dnsMonitor.start(); dnsMonitor.start();
} }
@ -349,6 +348,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type)); clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type));
return client; return client;
} }
@Override
public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri) {
RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts());
clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type));
return client;
}
@Override @Override
public void shutdownAsync(RedisClient client) { public void shutdownAsync(RedisClient client) {
@ -363,6 +369,13 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout); RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout);
return RedisClient.create(redisConfig); return RedisClient.create(redisConfig);
} }
private RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, int timeout, int commandTimeout) {
RedisClientConfig redisConfig = createRedisConfig(type, null, timeout, commandTimeout);
redisConfig.setAddress(address, uri);
return RedisClient.create(redisConfig);
}
protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) { protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout) {
RedisClientConfig redisConfig = new RedisClientConfig(); RedisClientConfig redisConfig = new RedisClientConfig();
@ -677,7 +690,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return newSucceededFuture(entryCodec); return newSucceededFuture(entryCodec);
} }
public MasterSlaveEntry getEntry(URI addr) { public MasterSlaveEntry getEntry(InetSocketAddress address) {
for (MasterSlaveEntry entry : client2entry.values()) {
InetSocketAddress addr = entry.getClient().getAddr();
if (addr.getAddress().equals(address.getAddress()) && addr.getPort() == address.getPort()) {
return entry;
}
}
return null;
}
private MasterSlaveEntry getEntry(URI addr) {
for (MasterSlaveEntry entry : client2entry.values()) { for (MasterSlaveEntry entry : client2entry.values()) {
if (URIBuilder.compare(entry.getClient().getAddr(), addr)) { if (URIBuilder.compare(entry.getClient().getAddr(), addr)) {
return entry; return entry;
@ -732,28 +755,19 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) { public RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry entry = source.getEntry(); MasterSlaveEntry entry = getEntry(source);
if (entry == null) {
entry = getEntry(source);
}
if (entry == null) { if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet"); RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source + " hasn't been discovered yet");
return RedissonPromise.newFailedFuture(ex); return RedissonPromise.newFailedFuture(ex);
} }
return entry.connectionWriteOp(command); return entry.connectionWriteOp(command);
} }
private MasterSlaveEntry getEntry(NodeSource source) { private MasterSlaveEntry getEntry(NodeSource source) {
// slots handling during migration state
if (source.getRedirect() != null) { if (source.getRedirect() != null) {
return getEntry(source.getAddr()); return getEntry(source.getAddr());
} }
return getEntry(source.getSlot());
}
@Override
public RFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry entry = source.getEntry(); MasterSlaveEntry entry = source.getEntry();
if (entry == null && source.getSlot() != null) { if (entry == null && source.getSlot() != null) {
entry = getEntry(source.getSlot()); entry = getEntry(source.getSlot());
@ -761,30 +775,21 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (source.getRedisClient() != null) { if (source.getRedisClient() != null) {
entry = getEntry(source.getRedisClient()); entry = getEntry(source.getRedisClient());
} }
if (source.getAddr() != null) { return entry;
entry = getEntry(source.getAddr()); }
if (entry == null) {
for (MasterSlaveEntry e : getEntrySet()) { @Override
if (e.hasSlave(source.getAddr())) { public RFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
entry = e; MasterSlaveEntry entry = getEntry(source);
break;
}
}
}
if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
return RedissonPromise.newFailedFuture(ex);
}
return entry.connectionReadOp(command, source.getAddr());
}
if (entry == null) { if (entry == null) {
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet"); RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source + " hasn't been discovered yet");
return RedissonPromise.newFailedFuture(ex); return RedissonPromise.newFailedFuture(ex);
} }
if (source.getRedirect() != null) {
return entry.connectionReadOp(command, source.getAddr());
}
return entry.connectionReadOp(command); return entry.connectionReadOp(command);
} }
@ -800,7 +805,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) { protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry pubSubEntry) {
MasterSlaveEntry entry = getEntry(slot); MasterSlaveEntry entry = getEntry(slot);
if (entry == null) { if (entry == null) {
log.error("Node for slot: " + slot + " hasn't been discovered yet"); log.error("Node for slot: " + slot + " can't be found");
} else { } else {
entry.returnPubSubConnection(pubSubEntry); entry.returnPubSubConnection(pubSubEntry);
} }
@ -808,12 +813,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public void releaseWrite(NodeSource source, RedisConnection connection) { public void releaseWrite(NodeSource source, RedisConnection connection) {
MasterSlaveEntry entry = source.getEntry(); MasterSlaveEntry entry = getEntry(source);
if (entry == null) { if (entry == null) {
entry = getEntry(source); log.error("Node: " + source + " can't be found");
}
if (entry == null) {
log.error("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
} else { } else {
entry.releaseWrite(connection); entry.releaseWrite(connection);
} }
@ -821,12 +823,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public void releaseRead(NodeSource source, RedisConnection connection) { public void releaseRead(NodeSource source, RedisConnection connection) {
MasterSlaveEntry entry = source.getEntry(); MasterSlaveEntry entry = getEntry(source);
if (entry == null) {
entry = getEntry(source);
}
if (entry == null) { if (entry == null) {
log.error("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet"); log.error("Node: " + source + " can't be found");
} else { } else {
entry.releaseRead(connection); entry.releaseRead(connection);
} }

@ -108,11 +108,20 @@ public class MasterSlaveEntry {
} }
return result; return result;
} }
public RFuture<RedisClient> setupMasterEntry(InetSocketAddress address, URI uri) {
RedisClient client = connectionManager.createClient(NodeType.MASTER, address, uri);
return setupMasterEntry(client);
}
public RPromise<RedisClient> setupMasterEntry(URI address) { public RFuture<RedisClient> setupMasterEntry(URI address) {
final RPromise<RedisClient> result = new RedissonPromise<RedisClient>(); RedisClient client = connectionManager.createClient(NodeType.MASTER, address);
return setupMasterEntry(client);
}
final RedisClient client = connectionManager.createClient(NodeType.MASTER, address); private RFuture<RedisClient> setupMasterEntry(final RedisClient client) {
final RPromise<RedisClient> result = new RedissonPromise<RedisClient>();
RFuture<InetSocketAddress> addrFuture = client.resolveAddr(); RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
addrFuture.addListener(new FutureListener<InetSocketAddress>() { addrFuture.addListener(new FutureListener<InetSocketAddress>() {
@ -148,7 +157,16 @@ public class MasterSlaveEntry {
return result; return result;
} }
public boolean slaveDown(URI address, FreezeReason freezeReason) { public boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReason) {
ClientConnectionsEntry e = slaveBalancer.freeze(entry, freezeReason);
if (e == null) {
return false;
}
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
}
public boolean slaveDown(InetSocketAddress address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason); ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason);
if (entry == null) { if (entry == null) {
return false; return false;
@ -157,8 +175,8 @@ public class MasterSlaveEntry {
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM); return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
} }
public boolean slaveDown(RedisClient redisClient, FreezeReason freezeReason) { public boolean slaveDown(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(redisClient, freezeReason); ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason);
if (entry == null) { if (entry == null) {
return false; return false;
} }
@ -169,9 +187,8 @@ public class MasterSlaveEntry {
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available // add master as slave if no more slaves available
if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) { if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
URI addr = masterEntry.getClient().getConfig().getAddress(); if (slaveBalancer.unfreeze(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM)) {
if (slaveUp(addr, FreezeReason.SYSTEM)) { log.info("master {} used as slave", masterEntry.getClient().getAddr());
log.info("master {} used as slave", addr);
} }
} }
@ -344,6 +361,10 @@ public class MasterSlaveEntry {
return slaveBalancer.contains(redisClient); return slaveBalancer.contains(redisClient);
} }
public boolean hasSlave(InetSocketAddress addr) {
return slaveBalancer.contains(addr);
}
public boolean hasSlave(URI addr) { public boolean hasSlave(URI addr) {
return slaveBalancer.contains(addr); return slaveBalancer.contains(addr);
} }
@ -352,9 +373,11 @@ public class MasterSlaveEntry {
return addSlave(address, false, NodeType.SLAVE); return addSlave(address, false, NodeType.SLAVE);
} }
private RFuture<Void> addSlave(URI address, final boolean freezed, final NodeType nodeType) { public RFuture<Void> addSlave(InetSocketAddress address, URI uri) {
final RedisClient client = connectionManager.createClient(NodeType.SLAVE, address); return addSlave(address, uri, false, NodeType.SLAVE);
}
private RFuture<Void> addSlave(final RedisClient client, final boolean freezed, final NodeType nodeType) {
final RPromise<Void> result = new RedissonPromise<Void>(); final RPromise<Void> result = new RedissonPromise<Void>();
RFuture<InetSocketAddress> addrFuture = client.resolveAddr(); RFuture<InetSocketAddress> addrFuture = client.resolveAddr();
addrFuture.addListener(new FutureListener<InetSocketAddress>() { addrFuture.addListener(new FutureListener<InetSocketAddress>() {
@ -364,7 +387,7 @@ public class MasterSlaveEntry {
result.tryFailure(future.cause()); result.tryFailure(future.cause());
return; return;
} }
ClientConnectionsEntry entry = new ClientConnectionsEntry(client, ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
config.getSlaveConnectionMinimumIdleSize(), config.getSlaveConnectionMinimumIdleSize(),
config.getSlaveConnectionPoolSize(), config.getSlaveConnectionPoolSize(),
@ -382,11 +405,36 @@ public class MasterSlaveEntry {
}); });
return result; return result;
} }
private RFuture<Void> addSlave(InetSocketAddress address, URI uri, final boolean freezed, final NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address, uri);
return addSlave(client, freezed, nodeType);
}
private RFuture<Void> addSlave(URI address, final boolean freezed, final NodeType nodeType) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);
return addSlave(client, freezed, nodeType);
}
public RedisClient getClient() { public RedisClient getClient() {
return masterEntry.getClient(); return masterEntry.getClient();
} }
public boolean slaveUp(ClientConnectionsEntry entry, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(entry, freezeReason)) {
return false;
}
InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves
if (!config.checkSkipSlavesInit()
&& !addr.equals(entry.getClient().getAddr())) {
slaveDown(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr);
}
return true;
}
public boolean slaveUp(URI address, FreezeReason freezeReason) { public boolean slaveUp(URI address, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(address, freezeReason)) { if (!slaveBalancer.unfreeze(address, freezeReason)) {
return false; return false;
@ -396,7 +444,7 @@ public class MasterSlaveEntry {
// exclude master from slaves // exclude master from slaves
if (!config.checkSkipSlavesInit() if (!config.checkSkipSlavesInit()
&& !URIBuilder.compare(addr, address)) { && !URIBuilder.compare(addr, address)) {
slaveDown(masterEntry.getClient().getConfig().getAddress(), FreezeReason.SYSTEM); slaveDown(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr); log.info("master {} excluded from slaves", addr);
} }
return true; return true;
@ -409,9 +457,21 @@ public class MasterSlaveEntry {
* *
* @param address of Redis * @param address of Redis
*/ */
public void changeMaster(final URI address) { public void changeMaster(URI address) {
final ClientConnectionsEntry oldMaster = masterEntry; final ClientConnectionsEntry oldMaster = masterEntry;
RFuture<RedisClient> future = setupMasterEntry(address); RFuture<RedisClient> future = setupMasterEntry(address);
changeMaster(address, oldMaster, future);
}
public void changeMaster(InetSocketAddress address, URI uri) {
final ClientConnectionsEntry oldMaster = masterEntry;
RFuture<RedisClient> future = setupMasterEntry(address, uri);
changeMaster(uri, oldMaster, future);
}
private void changeMaster(final URI address, final ClientConnectionsEntry oldMaster,
RFuture<RedisClient> future) {
future.addListener(new FutureListener<RedisClient>() { future.addListener(new FutureListener<RedisClient>() {
@Override @Override
public void operationComplete(Future<RedisClient> future) throws Exception { public void operationComplete(Future<RedisClient> future) throws Exception {
@ -434,7 +494,7 @@ public class MasterSlaveEntry {
// more than one slave available, so master can be removed from slaves // more than one slave available, so master can be removed from slaves
if (!config.checkSkipSlavesInit() if (!config.checkSkipSlavesInit()
&& slaveBalancer.getAvailableClients() > 1) { && slaveBalancer.getAvailableClients() > 1) {
slaveDown(newMasterClient, FreezeReason.SYSTEM); slaveDown(newMasterClient.getAddr(), FreezeReason.SYSTEM);
} }
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr()); log.info("master {} has changed to {}", oldMaster.getClient().getAddr(), masterEntry.getClient().getAddr());

@ -80,7 +80,10 @@ public class NodeSource {
@Override @Override
public String toString() { public String toString() {
return "NodeSource [slot=" + slot + ", addr=" + addr + ", redirect=" + redirect + "]"; return "NodeSource [slot=" + slot + ", addr=" + addr + ", redisClient=" + redisClient + ", redirect=" + redirect
+ ", entry=" + entry + "]";
} }
} }

@ -121,6 +121,20 @@ public class LoadBalancerManager {
throw new IllegalStateException("Can't find " + address + " in slaves!"); throw new IllegalStateException("Can't find " + address + " in slaves!");
} }
return unfreeze(entry, freezeReason);
}
public boolean unfreeze(InetSocketAddress address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = getEntry(address);
if (entry == null) {
throw new IllegalStateException("Can't find " + address + " in slaves!");
}
return unfreeze(entry, freezeReason);
}
public boolean unfreeze(ClientConnectionsEntry entry, FreezeReason freezeReason) {
synchronized (entry) { synchronized (entry) {
if (!entry.isFreezed()) { if (!entry.isFreezed()) {
return false; return false;
@ -136,12 +150,18 @@ public class LoadBalancerManager {
} }
return false; return false;
} }
public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) { public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address); ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason); return freeze(connectionEntry, freezeReason);
} }
public ClientConnectionsEntry freeze(InetSocketAddress address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) { public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(redisClient); ClientConnectionsEntry connectionEntry = getEntry(redisClient);
return freeze(connectionEntry, freezeReason); return freeze(connectionEntry, freezeReason);
@ -172,6 +192,10 @@ public class LoadBalancerManager {
return pubSubConnectionPool.get(); return pubSubConnectionPool.get();
} }
public boolean contains(InetSocketAddress addr) {
return getEntry(addr) != null;
}
public boolean contains(URI addr) { public boolean contains(URI addr) {
return getEntry(addr) != null; return getEntry(addr) != null;
} }
@ -190,6 +214,17 @@ public class LoadBalancerManager {
return null; return null;
} }
protected ClientConnectionsEntry getEntry(InetSocketAddress address) {
for (ClientConnectionsEntry entry : client2Entry.values()) {
InetSocketAddress addr = entry.getClient().getAddr();
if (addr.getAddress().equals(address.getAddress()) && addr.getPort() == address.getPort()) {
return entry;
}
}
return null;
}
protected ClientConnectionsEntry getEntry(RedisClient redisClient) { protected ClientConnectionsEntry getEntry(RedisClient redisClient) {
return client2Entry.get(redisClient); return client2Entry.get(redisClient);
} }

@ -325,7 +325,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) { private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveDown(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); masterSlaveEntry.slaveDown(entry, FreezeReason.RECONNECT);
log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause); log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
scheduleCheck(entry); scheduleCheck(entry);
} else { } else {
@ -392,7 +392,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
public void operationComplete(Future<Void> future) public void operationComplete(Future<Void> future)
throws Exception { throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT);
log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
} else { } else {
synchronized (entry) { synchronized (entry) {

Loading…
Cancel
Save