Fixed - "SlaveConnectionPool no available Redis entries" error is thrown after failover. #2932

pull/2979/head
Nikita Koksharov 5 years ago
parent a1d68b89ab
commit f220d929e0

@ -110,27 +110,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
lastClusterNode = addr;
Collection<ClusterPartition> partitions = parsePartitions(nodes);
List<RFuture<Collection<RFuture<Void>>>> futures = new ArrayList<RFuture<Collection<RFuture<Void>>>>();
List<RFuture<Void>> masterFutures = new ArrayList<>();
for (ClusterPartition partition : partitions) {
if (partition.isMasterFail()) {
failedMasters.add(partition.getMasterAddress().toString());
continue;
}
RFuture<Collection<RFuture<Void>>> masterFuture = addMasterEntry(partition, cfg);
futures.add(masterFuture);
RFuture<Void> masterFuture = addMasterEntry(partition, cfg);
masterFutures.add(masterFuture);
}
for (RFuture<Collection<RFuture<Void>>> masterFuture : futures) {
for (RFuture<Void> masterFuture : masterFutures) {
masterFuture.awaitUninterruptibly();
if (!masterFuture.isSuccess()) {
lastException = masterFuture.cause();
continue;
}
for (RFuture<Void> future : masterFuture.getNow()) {
future.awaitUninterruptibly();
if (!future.isSuccess()) {
lastException = future.cause();
}
}
}
break;
@ -168,7 +161,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return result;
}
private RFuture<Collection<RFuture<Void>>> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
private RFuture<Void> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
if (partition.isMasterFail()) {
RedisException e = new RedisException("Failed to add master: " +
partition.getMasterAddress() + " for slot ranges: " +
@ -181,7 +174,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return RedissonPromise.newFailedFuture(e);
}
RPromise<Collection<RFuture<Void>>> result = new RedissonPromise<Collection<RFuture<Void>>>();
RPromise<Void> result = new RedissonPromise<>();
RFuture<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), null, configEndpointHostName);
connectionFuture.onComplete((connection, ex1) -> {
if (ex1 != null) {
@ -193,49 +186,64 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
MasterSlaveServersConfig config = create(cfg);
config.setMasterAddress(partition.getMasterAddress().toString());
MasterSlaveEntry e;
List<RFuture<Void>> futures = new ArrayList<RFuture<Void>>();
MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) {
e = new SingleEntry(ClusterConnectionManager.this, config);
entry = new SingleEntry(ClusterConnectionManager.this, config);
} else {
Set<String> slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet());
config.setSlaveAddresses(slaveAddresses);
e = new MasterSlaveEntry(ClusterConnectionManager.this, config);
List<RFuture<Void>> fs = e.initSlaveBalancer(partition.getFailedSlaveAddresses());
futures.addAll(fs);
if (!partition.getSlaveAddresses().isEmpty()) {
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
if (!partition.getFailedSlaveAddresses().isEmpty()) {
log.warn("slaves: {} is down for slot ranges: {}", partition.getFailedSlaveAddresses(), partition.getSlotRanges());
}
}
entry = new MasterSlaveEntry(ClusterConnectionManager.this, config);
}
RFuture<RedisClient> f = e.setupMasterEntry(new RedisURI(config.getMasterAddress()));
RPromise<Void> initFuture = new RedissonPromise<Void>();
futures.add(initFuture);
f.onComplete((res, ex3) -> {
RFuture<RedisClient> f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress()));
f.onComplete((masterClient, ex3) -> {
if (ex3 != null) {
log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3);
initFuture.tryFailure(ex3);
result.tryFailure(ex3);
return;
}
for (Integer slot : partition.getSlots()) {
addEntry(slot, e);
addEntry(slot, entry);
lastPartitions.put(slot, partition);
}
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
if (!initFuture.trySuccess(null)) {
throw new IllegalStateException();
if (!config.checkSkipSlavesInit()) {
List<RFuture<Void>> fs = entry.initSlaveBalancer(partition.getFailedSlaveAddresses(), masterClient);
AtomicInteger counter = new AtomicInteger(fs.size());
for (RFuture<Void> future : fs) {
future.onComplete((r, ex) -> {
if (ex != null) {
log.error("unable to add slave for: " + partition.getMasterAddress()
+ " slot ranges: " + partition.getSlotRanges(), ex);
}
if (counter.decrementAndGet() == 0) {
if (!partition.getSlaveAddresses().isEmpty()) {
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
if (!partition.getFailedSlaveAddresses().isEmpty()) {
log.warn("slaves: {} are down for slot ranges: {}", partition.getFailedSlaveAddresses(), partition.getSlotRanges());
}
}
if (result.trySuccess(null)) {
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
} else {
log.error("unable to add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
}
}
});
}
} else {
if (result.trySuccess(null)) {
log.info("master: {} added for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
} else {
log.error("unable to add master: {} for slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
}
}
});
if (!result.trySuccess(futures)) {
throw new IllegalStateException();
}
});
return result;
@ -385,7 +393,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (RedisURI uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri);
if (entry.hasSlave(uri) && entry.slaveUp(uri, FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
log.info("slave: {} is up for slot ranges: {}", uri, currentPart.getSlotRanges());
}
}
@ -495,30 +503,13 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return RedissonPromise.newSucceededFuture(null);
}
RPromise<Void> result = new RedissonPromise<Void>();
RPromise<Void> result = new RedissonPromise<>();
AtomicInteger masters = new AtomicInteger(addedPartitions.size());
Queue<RFuture<Void>> futures = new ConcurrentLinkedQueue<RFuture<Void>>();
for (ClusterPartition newPart : addedPartitions.values()) {
RFuture<Collection<RFuture<Void>>> future = addMasterEntry(newPart, cfg);
RFuture<Void> future = addMasterEntry(newPart, cfg);
future.onComplete((res, e) -> {
if (e == null) {
futures.addAll(res);
}
if (masters.decrementAndGet() == 0) {
if (futures.isEmpty()) {
result.trySuccess(null);
return;
}
AtomicInteger nodes = new AtomicInteger(futures.size());
for (RFuture<Void> nodeFuture : futures) {
nodeFuture.onComplete((r, ex) -> {
if (nodes.decrementAndGet() == 0) {
result.trySuccess(null);
}
});
}
result.trySuccess(null);
}
});
}

@ -16,13 +16,7 @@
package org.redisson.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -363,16 +357,23 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if (config.checkSkipSlavesInit()) {
entry = new SingleEntry(this, config);
} else {
entry = createMasterSlaveEntry(config);
entry = new MasterSlaveEntry(this, config);
}
RFuture<RedisClient> f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress()));
f.syncUninterruptibly();
RFuture<RedisClient> masterFuture = entry.setupMasterEntry(new RedisURI(config.getMasterAddress()));
masterFuture.syncUninterruptibly();
if (!config.checkSkipSlavesInit()) {
List<RFuture<Void>> fs = entry.initSlaveBalancer(getDisconnectedNodes(), masterFuture.getNow());
for (RFuture<Void> future : fs) {
future.syncUninterruptibly();
}
}
for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
addEntry(slot, entry);
}
startDNSMonitoring(f.getNow());
startDNSMonitoring(masterFuture.getNow());
} catch (Exception e) {
stopThreads();
throw e;
@ -387,16 +388,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
dnsMonitor.start();
}
}
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config) {
MasterSlaveEntry entry = new MasterSlaveEntry(this, config);
List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<RedisURI>emptySet());
for (RFuture<Void> future : fs) {
future.syncUninterruptibly();
}
return entry;
}
protected Collection<RedisURI> getDisconnectedNodes() {
return Collections.emptySet();
}
protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig<?> cfg) {
MasterSlaveServersConfig c = new MasterSlaveServersConfig();

@ -85,13 +85,13 @@ public class MasterSlaveEntry {
return config;
}
public List<RFuture<Void>> initSlaveBalancer(Collection<RedisURI> disconnectedNodes) {
public List<RFuture<Void>> initSlaveBalancer(Collection<RedisURI> disconnectedNodes, RedisClient master) {
boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty()
&& !config.checkSkipSlavesInit()
&& disconnectedNodes.size() < config.getSlaveAddresses().size();
List<RFuture<Void>> result = new LinkedList<RFuture<Void>>();
RFuture<Void> f = addSlave(new RedisURI(config.getMasterAddress()), freezeMasterAsSlave, NodeType.MASTER);
RFuture<Void> f = addSlave(master.getAddr(), master.getConfig().getAddress(), freezeMasterAsSlave, NodeType.MASTER);
result.add(f);
for (String address : config.getSlaveAddresses()) {
RedisURI uri = new RedisURI(address);
@ -456,7 +456,7 @@ public class MasterSlaveEntry {
synchronized (oldMaster) {
oldMaster.setFreezeReason(FreezeReason.MANAGER);
}
slaveDown(oldMaster);
nodeDown(oldMaster);
slaveBalancer.changeType(oldMaster.getClient().getAddr(), NodeType.SLAVE);
slaveBalancer.changeType(newMasterClient.getAddr(), NodeType.MASTER);

@ -498,7 +498,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
if (sentinel != null) {
disconnectNode(sentinel);
sentinel.shutdownAsync();
log.warn("sentinel: {} was down", uri);
log.warn("sentinel: {} is down", uri);
}
}
}
@ -509,13 +509,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
@Override
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config) {
MasterSlaveEntry entry = new MasterSlaveEntry(this, config);
List<RFuture<Void>> fs = entry.initSlaveBalancer(disconnectedSlaves);
for (RFuture<Void> future : fs) {
future.syncUninterruptibly();
}
return entry;
protected Collection<RedisURI> getDisconnectedNodes() {
return disconnectedSlaves;
}
private RFuture<Void> registerSentinel(RedisURI addr, MasterSlaveServersConfig c, String sslHostname) {
@ -599,11 +594,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private void slaveDown(RedisURI uri) {
if (config.checkSkipSlavesInit()) {
log.warn("slave: {} was down", uri);
log.warn("slave: {} is down", uri);
} else {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {} was down", uri);
log.warn("slave: {} is down", uri);
}
}
}
@ -620,12 +615,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private void slaveUp(RedisURI uri) {
if (config.checkSkipSlavesInit()) {
log.info("slave: {} has up", uri);
log.info("slave: {} is up", uri);
return;
}
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) {
log.info("slave: {} has up", uri);
log.info("slave: {} is up", uri);
}
}

@ -115,16 +115,16 @@ public class LoadBalancerManager {
public boolean unfreeze(RedisURI address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = getEntry(address);
if (entry == null) {
throw new IllegalStateException("Can't find " + address + " in slaves!");
throw new IllegalStateException("Can't find " + address + " in slaves! Available slaves: " + client2Entry.keySet());
}
return unfreeze(entry, freezeReason);
}
public boolean unfreeze(InetSocketAddress address, FreezeReason freezeReason) {
public boolean unfreeze(InetSocketAddress address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = getEntry(address);
if (entry == null) {
throw new IllegalStateException("Can't find " + address + " in slaves!");
throw new IllegalStateException("Can't find " + address + " in slaves! Available slaves: " + client2Entry.keySet());
}
return unfreeze(entry, freezeReason);

Loading…
Cancel
Save