Merge branch 'master' into 3.0.0

pull/1833/head
Nikita Koksharov 6 years ago
commit e4bbce37e4

@ -76,7 +76,7 @@ public class ClientConnectionsEntry {
}
public boolean isMasterForRead() {
return getFreezeReason() == FreezeReason.SYSTEM && getConfig().getReadMode() == ReadMode.MASTER_SLAVE;
return getFreezeReason() == FreezeReason.SYSTEM && getConfig().getReadMode() == ReadMode.MASTER_SLAVE && getNodeType() == NodeType.MASTER;
}
public void setNodeType(NodeType nodeType) {

@ -188,6 +188,10 @@ public class MasterSlaveEntry {
}
private boolean slaveDown(ClientConnectionsEntry entry) {
if (entry.isMasterForRead()) {
return false;
}
// add master as slave if no more slaves available
if (!config.checkSkipSlavesInit() && slaveBalancer.getAvailableClients() == 0) {
if (slaveBalancer.unfreeze(masterEntry.getClient().getAddr(), FreezeReason.SYSTEM)) {
@ -201,12 +205,24 @@ public class MasterSlaveEntry {
connection.closeAsync();
reattachBlockingQueue(connection);
}
while (true) {
RedisConnection connection = entry.pollConnection();
if (connection == null) {
break;
}
}
entry.getAllConnections().clear();
for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
connection.closeAsync();
connectionManager.getSubscribeService().reattachPubSub(connection);
}
while (true) {
RedisConnection connection = entry.pollSubscribeConnection();
if (connection == null) {
break;
}
}
entry.getAllSubscribeConnections().clear();
return true;
@ -436,6 +452,8 @@ public class MasterSlaveEntry {
slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE);
slaveBalancer.changeType(newMasterClient.getAddr(), NodeType.MASTER);
// freeze in slaveBalancer
slaveDown(oldMaster.getClient().getAddr(), FreezeReason.MANAGER);
// more than one slave available, so master can be removed from slaves
if (!config.checkSkipSlavesInit()

@ -19,7 +19,6 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -39,7 +38,6 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.BaseMasterSlaveServersConfig;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
@ -69,7 +67,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final Set<String> slaves = Collections.newSetFromMap(PlatformDependent.<String, Boolean>newConcurrentHashMap());
private final Set<URI> disconnectedSlaves = new HashSet<URI>();
private ScheduledFuture<?> monitorFuture;
@ -100,7 +97,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
this.config.setMasterAddress(masterHost);
currentMaster.set(masterHost);
log.info("master: {} added", masterHost);
slaves.add(masterHost);
List<Map<String, String>> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());
for (Map<String, String> map : sentinelSlaves) {
@ -115,7 +111,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String host = createAddress(ip, port);
this.config.addSlaveAddress(host);
slaves.add(host);
log.debug("slave {} state: {}", host, map);
log.info("slave: {} added", host);
@ -368,9 +363,19 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
CountableListener<Void> listener = new CountableListener<Void>() {
@Override
protected void onSuccess(Void value) {
Set<String> removedSlaves = new HashSet<String>(slaves);
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
Set<String> removedSlaves = new HashSet<String>();
for (ClientConnectionsEntry e : entry.getAllEntries()) {
InetSocketAddress addr = e.getClient().getAddr();
String slaveAddr = createAddress(addr.getAddress().getHostAddress(), addr.getPort());
removedSlaves.add(slaveAddr);
}
removedSlaves.removeAll(currentSlaves);
for (String slave : removedSlaves) {
if (slave.equals(currentMaster.get())) {
continue;
}
String hostPort = slave.replace("redis://", "");
int lastColonIdx = hostPort.lastIndexOf(":");
String host = hostPort.substring(0, lastColonIdx);
@ -462,13 +467,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
// to avoid addition twice
final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
final URI uri = convert(ip, port);
if (slaves.add(slaveAddr) && !config.checkSkipSlavesInit()) {
if (!entry.hasSlave(uri) && !config.checkSkipSlavesInit()) {
RFuture<Void> future = entry.addSlave(URIBuilder.create(slaveAddr));
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
slaves.remove(slaveAddr);
result.tryFailure(future.cause());
log.error("Can't add slave: " + slaveAddr, future.cause());
return;

@ -164,7 +164,11 @@ public class LoadBalancerManager {
synchronized (connectionEntry) {
// only RECONNECT freeze reason could be replaced
if (connectionEntry.getFreezeReason() == null
|| connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) {
|| connectionEntry.getFreezeReason() == FreezeReason.RECONNECT
|| (freezeReason == FreezeReason.MANAGER
&& connectionEntry.getFreezeReason() != FreezeReason.MANAGER
&& connectionEntry.getNodeType() == NodeType.SLAVE
)) {
connectionEntry.setFreezed(true);
connectionEntry.setFreezeReason(freezeReason);
return connectionEntry;

@ -16,6 +16,7 @@
package org.redisson.connection.pool;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@ -187,13 +188,16 @@ abstract class ConnectionPool<T extends RedisConnection> {
public RFuture<T> get(RedisCommand<?> command) {
List<ClientConnectionsEntry> entriesCopy = new LinkedList<ClientConnectionsEntry>(entries);
for (Iterator<ClientConnectionsEntry> iterator = entriesCopy.iterator(); iterator.hasNext();) {
ClientConnectionsEntry entry = iterator.next();
if (!((!entry.isFreezed() || entry.isMasterForRead()) &&
tryAcquireConnection(entry))) {
iterator.remove();
}
}
while (!entriesCopy.isEmpty()) {
ClientConnectionsEntry entry = config.getLoadBalancer().getEntry(entriesCopy);
if ((!entry.isFreezed() || entry.isMasterForRead()) &&
tryAcquireConnection(entry)) {
return acquireConnection(command, entry);
}
entriesCopy.remove(entry);
return acquireConnection(command, entry);
}
List<InetSocketAddress> failed = new LinkedList<InetSocketAddress>();
@ -252,7 +256,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
if (entry.getNodeType() == NodeType.SLAVE && entry.isFailed()) {
checkForReconnect(entry, null);
return false;
}
}
return true;
}

Loading…
Cancel
Save