|
|
|
@ -242,6 +242,20 @@ public class MasterSlaveEntry {
|
|
|
|
|
nodeDown(masterEntry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void reattachPubSub() {
|
|
|
|
|
for (RedisPubSubConnection connection : masterEntry.getAllSubscribeConnections()) {
|
|
|
|
|
connection.closeAsync();
|
|
|
|
|
connectionManager.getSubscribeService().reattachPubSub(connection);
|
|
|
|
|
}
|
|
|
|
|
while (true) {
|
|
|
|
|
RedisConnection connection = masterEntry.pollSubscribeConnection();
|
|
|
|
|
if (connection == null) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
masterEntry.getAllSubscribeConnections().clear();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public boolean nodeDown(ClientConnectionsEntry entry) {
|
|
|
|
|
entry.reset();
|
|
|
|
|
|
|
|
|
@ -445,6 +459,9 @@ public class MasterSlaveEntry {
|
|
|
|
|
CompletableFuture<Boolean> downFuture = slaveDownAsync(addr, FreezeReason.SYSTEM);
|
|
|
|
|
return downFuture.thenApply(r -> {
|
|
|
|
|
if (r) {
|
|
|
|
|
if (config.getSubscriptionMode() == SubscriptionMode.SLAVE) {
|
|
|
|
|
reattachPubSub();
|
|
|
|
|
}
|
|
|
|
|
log.info("master {} excluded from slaves", addr);
|
|
|
|
|
}
|
|
|
|
|
return r;
|
|
|
|
@ -459,6 +476,9 @@ public class MasterSlaveEntry {
|
|
|
|
|
CompletableFuture<Boolean> downFuture = slaveDownAsync(addr, FreezeReason.SYSTEM);
|
|
|
|
|
return downFuture.thenApply(r -> {
|
|
|
|
|
if (r) {
|
|
|
|
|
if (config.getSubscriptionMode() == SubscriptionMode.SLAVE) {
|
|
|
|
|
reattachPubSub();
|
|
|
|
|
}
|
|
|
|
|
log.info("master {} excluded from slaves", addr);
|
|
|
|
|
}
|
|
|
|
|
return r;
|
|
|
|
|