Fixed - Slave nodes failed to pass complete initialization shouldn't be added as nodes. #4494

pull/4597/head
Nikita Koksharov 2 years ago
parent fcefed302c
commit 065313beb8

@ -148,7 +148,13 @@ public class MasterSlaveEntry {
if (e != null) {
client.shutdownAsync();
}
}).thenApply(r -> client);
}).thenApply(r -> {
writeConnectionPool.addEntry(masterEntry);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
pubSubConnectionPool.addEntry(masterEntry);
}
return client;
});
}
public boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReason) {

@ -76,6 +76,8 @@ public class LoadBalancerManager {
CompletableFuture<Void> future = CompletableFuture.allOf(slaveFuture, pubSubFuture);
return future.thenAccept(r -> {
slaveConnectionPool.addEntry(entry);
pubSubConnectionPool.addEntry(entry);
client2Entry.put(entry.getClient(), entry);
});
}
@ -151,8 +153,6 @@ public class LoadBalancerManager {
if (!entry.isInitialized()) {
entry.setInitialized(true);
entry.resetFirstFail();
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
futures.add(slaveConnectionPool.initConnections(entry));
futures.add(pubSubConnectionPool.initConnections(entry));
@ -168,8 +168,9 @@ public class LoadBalancerManager {
return;
}
log.debug("Unfreezed entry: {}", entry);
entry.resetFirstFail();
entry.setFreezeReason(null);
log.debug("Unfreezed entry: {}", entry);
});
return true;
}
@ -189,8 +190,6 @@ public class LoadBalancerManager {
if (!entry.isInitialized()) {
entry.setInitialized(true);
entry.resetFirstFail();
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
futures.add(slaveConnectionPool.initConnections(entry));
futures.add(pubSubConnectionPool.initConnections(entry));
@ -203,8 +202,9 @@ public class LoadBalancerManager {
return;
}
log.debug("Unfreezed entry: {}", entry);
entry.resetFirstFail();
entry.setFreezeReason(null);
log.debug("Unfreezed entry: {}", entry);
}).thenApply(e -> true);
}
}

@ -66,10 +66,11 @@ abstract class ConnectionPool<T extends RedisConnection> {
}
public CompletableFuture<Void> add(ClientConnectionsEntry entry) {
CompletableFuture<Void> promise = initConnections(entry, true);
return promise.thenAccept(r -> {
entries.add(entry);
});
return initConnections(entry, true);
}
public void addEntry(ClientConnectionsEntry entry) {
entries.add(entry);
}
public CompletableFuture<Void> initConnections(ClientConnectionsEntry entry) {

Loading…
Cancel
Save