Merge branch 'master' into 3.0.0

pull/985/head^2
Nikita 8 years ago
commit 0a01d5f864

@ -92,13 +92,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
RedisConnection connection = connectionFuture.syncUninterruptibly().getNow();
List<ClusterNodeInfo> nodes = connection.sync(RedisCommands.CLUSTER_NODES);
if (log.isDebugEnabled()) {
StringBuilder nodesValue = new StringBuilder();
for (ClusterNodeInfo clusterNodeInfo : nodes) {
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
}
log.info("Redis cluster nodes configuration got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
lastClusterNode = addr;
@ -426,7 +424,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
aliveSlaves.removeAll(newPart.getFailedSlaveAddresses());
for (URI uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri);
if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
@ -435,7 +433,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
@ -448,7 +446,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (URI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
@ -466,7 +464,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
currentPart.addSlaveAddress(uri);
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
entry.slaveUp(uri, FreezeReason.MANAGER);
log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
}
});

@ -683,10 +683,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return entries.get(slot);
}
protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
getEntry(slotRange.getStartSlot()).slaveDown(host, port, freezeReason);
}
protected void changeMaster(int slot, URI address) {
getEntry(slot).changeMaster(address);
}

@ -129,8 +129,8 @@ public class MasterSlaveEntry {
return slaveDown(e, freezeReason == FreezeReason.SYSTEM);
}
public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason);
public boolean slaveDown(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason);
if (entry == null) {
return false;
}
@ -141,9 +141,9 @@ public class MasterSlaveEntry {
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) {
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort());
URI addr = masterEntry.getClient().getConfig().getAddress();
if (slaveUp(addr, FreezeReason.SYSTEM)) {
log.info("master {} used as slave", addr);
}
}
@ -310,6 +310,10 @@ public class MasterSlaveEntry {
return slaveBalancer.contains(addr);
}
public boolean hasSlave(String addr) {
return slaveBalancer.contains(addr);
}
public RFuture<Void> addSlave(URI address) {
return addSlave(address, true, NodeType.SLAVE);
}
@ -334,17 +338,18 @@ public class MasterSlaveEntry {
return masterEntry.getClient();
}
public boolean slaveUp(String host, int port, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(host, port, freezeReason)) {
public boolean slaveUp(URI address, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(address, freezeReason)) {
return false;
}
InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort());
InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves
if (config.getReadMode() == ReadMode.SLAVE
&& (!addr.getHostName().equals(host) || port != addr.getPort())) {
slaveDown(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort());
&& (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) {
slaveDown(address, FreezeReason.SYSTEM);
log.info("master {} excluded from slaves", addr);
}
return true;
}
@ -369,7 +374,7 @@ public class MasterSlaveEntry {
// more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(address.getHost(), address.getPort(), FreezeReason.SYSTEM);
slaveDown(address, FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());
}

@ -171,7 +171,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
log.warn("Can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort());
log.warn("Can't connect to sentinel: {}", addr);
return;
}
@ -220,8 +220,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = parts[2];
String port = parts[3];
String addr = createAddress(ip, port);
URI uri = URIBuilder.create(addr);
URI uri = convert(ip, port);
registerSentinel(cfg, uri, c);
}
}
@ -253,11 +252,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
if (entry.slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
URI uri = convert(ip, port);
if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} added", slaveAddr);
}
}
});
} else {
slaveUp(ip, port);
@ -267,6 +268,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
protected URI convert(String ip, String port) {
String addr = createAddress(ip, port);
URI uri = URIBuilder.create(addr);
return uri;
}
private void onNodeDown(URI sentinelAddr, String msg) {
String[] parts = msg.split(" ");
@ -309,7 +316,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.warn("slave: {}:{} has down", ip, port);
} else {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.slaveDown(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
URI uri = convert(ip, port);
if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {}:{} has down", ip, port);
}
}
@ -367,7 +375,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
URI uri = convert(ip, port);
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr);
}

@ -16,6 +16,7 @@
package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -44,10 +45,11 @@ public class LoadBalancerManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool;
private final Map<String, ClientConnectionsEntry> ip2Entry = PlatformDependent.newConcurrentHashMap();
public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager;
slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);
@ -65,7 +67,8 @@ public class LoadBalancerManager {
return;
}
if (counter.decrementAndGet() == 0) {
addr2Entry.put(entry.getClient().getAddr(), entry);
String addr = convert(entry.getClient().getConfig().getAddress());
ip2Entry.put(addr, entry);
result.trySuccess(null);
}
}
@ -80,7 +83,7 @@ public class LoadBalancerManager {
public int getAvailableClients() {
int count = 0;
for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) {
for (ClientConnectionsEntry connectionEntry : ip2Entry.values()) {
if (!connectionEntry.isFreezed()) {
count++;
}
@ -88,11 +91,10 @@ public class LoadBalancerManager {
return count;
}
public boolean unfreeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry entry = addr2Entry.get(addr);
public boolean unfreeze(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = getEntry(address);
if (entry == null) {
throw new IllegalStateException("Can't find " + addr + " in slaves!");
throw new IllegalStateException("Can't find " + address + " in slaves!");
}
synchronized (entry) {
@ -111,12 +113,21 @@ public class LoadBalancerManager {
return false;
}
public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr);
private String convert(URI address) {
InetSocketAddress addr = new InetSocketAddress(address.getHost(), address.getPort());
return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}
public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason);
}
protected ClientConnectionsEntry getEntry(URI address) {
String addr = convert(address);
return ip2Entry.get(addr);
}
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) {
return null;
@ -143,11 +154,15 @@ public class LoadBalancerManager {
}
public boolean contains(InetSocketAddress addr) {
return addr2Entry.containsKey(addr);
return ip2Entry.containsKey(addr.getAddress().getHostAddress() + ":" + addr.getPort());
}
public boolean contains(String addr) {
return ip2Entry.containsKey(addr);
}
public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr);
ClientConnectionsEntry entry = ip2Entry.get(addr.getAddress().getHostAddress());
if (entry != null) {
return slaveConnectionPool.get(command, entry);
}
@ -160,23 +175,23 @@ public class LoadBalancerManager {
}
public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress());
pubSubConnectionPool.returnConnection(entry, connection);
}
public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress());
slaveConnectionPool.returnConnection(entry, connection);
}
public void shutdown() {
for (ClientConnectionsEntry entry : addr2Entry.values()) {
for (ClientConnectionsEntry entry : ip2Entry.values()) {
entry.getClient().shutdown();
}
}
public void shutdownAsync() {
for (ClientConnectionsEntry entry : addr2Entry.values()) {
for (ClientConnectionsEntry entry : ip2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient());
}
}

@ -296,7 +296,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getFailedAttempts()) {
checkForReconnect(entry);
checkForReconnect(entry, cause);
}
releaseConnection(entry);
@ -308,7 +308,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
conn.closeAsync();
checkForReconnect(entry);
checkForReconnect(entry, null);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
} else {
@ -321,15 +321,14 @@ abstract class ConnectionPool<T extends RedisConnection> {
promise.tryFailure(cause);
}
private void checkForReconnect(ClientConnectionsEntry entry) {
private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) {
if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
masterSlaveEntry.slaveDown(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT);
log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
scheduleCheck(entry);
} else {
if (entry.freezeMaster(FreezeReason.RECONNECT)) {
log.warn("host {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
log.error("host " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
scheduleCheck(entry);
}
}
@ -385,7 +384,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
public void operationComplete(Future<Void> future)
throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr());
} else {
synchronized (entry) {

Loading…
Cancel
Save