refactoring

pull/6077/head
Nikita Koksharov 8 months ago
parent fd2b84206e
commit bdd71d3d5e

@ -26,7 +26,6 @@ import org.redisson.cluster.ClusterNodeInfo.Flag;
import org.redisson.cluster.ClusterPartition.Type; import org.redisson.cluster.ClusterPartition.Type;
import org.redisson.config.*; import org.redisson.config.*;
import org.redisson.connection.*; import org.redisson.connection.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -555,7 +554,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
.collect(Collectors.toList()); .collect(Collectors.toList());
nonFailedSlaves.forEach(uri -> { nonFailedSlaves.forEach(uri -> {
if (entry.hasSlave(uri)) { if (entry.hasSlave(uri)) {
CompletableFuture<Boolean> f = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER); CompletableFuture<Boolean> f = entry.slaveUpNoMasterExclusionAsync(uri);
f = f.thenApply(v -> { f = f.thenApply(v -> {
if (v) { if (v) {
log.info("slave: {} is up for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave: {} is up for slot ranges: {}", uri, currentPart.getSlotRanges());
@ -572,7 +571,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
.filter(uri -> !currentPart.getFailedSlaveAddresses().contains(uri)) .filter(uri -> !currentPart.getFailedSlaveAddresses().contains(uri))
.forEach(uri -> { .forEach(uri -> {
currentPart.addFailedSlaveAddress(uri); currentPart.addFailedSlaveAddress(uri);
boolean slaveDown = entry.slaveDown(uri, FreezeReason.MANAGER); boolean slaveDown = entry.slaveDown(uri);
if (config.isSlaveNotUsed() || slaveDown) { if (config.isSlaveNotUsed() || slaveDown) {
disconnectNode(uri); disconnectNode(uri);
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
@ -593,7 +592,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (RedisURI uri : removedSlaves) { for (RedisURI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri); currentPart.removeSlaveAddress(uri);
boolean slaveDown = entry.slaveDown(uri, FreezeReason.MANAGER); boolean slaveDown = entry.slaveDown(uri);
if (config.isSlaveNotUsed() || slaveDown) { if (config.isSlaveNotUsed() || slaveDown) {
disconnectNode(uri); disconnectNode(uri);
log.info("slave {} removed for master {} and slot ranges: {}", log.info("slave {} removed for master {} and slot ranges: {}",
@ -616,7 +615,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (RedisURI uri : addedSlaves) { for (RedisURI uri : addedSlaves) {
ClientConnectionsEntry slaveEntry = entry.getEntry(uri); ClientConnectionsEntry slaveEntry = entry.getEntry(uri);
if (slaveEntry != null) { if (slaveEntry != null) {
CompletableFuture<Boolean> slaveUpFuture = entry.slaveUpNoMasterExclusionAsync(uri, FreezeReason.MANAGER); CompletableFuture<Boolean> slaveUpFuture = entry.slaveUpNoMasterExclusionAsync(uri);
slaveUpFuture = slaveUpFuture.thenApply(v -> { slaveUpFuture = slaveUpFuture.thenApply(v -> {
if (v) { if (v) {
currentPart.addSlaveAddress(uri); currentPart.addSlaveAddress(uri);

@ -22,7 +22,6 @@ import io.netty.util.Timeout;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -200,7 +199,7 @@ public class DNSMonitor {
slaveFound = true; slaveFound = true;
if (masterSlaveEntry.hasSlave(newSlaveAddr)) { if (masterSlaveEntry.hasSlave(newSlaveAddr)) {
CompletableFuture<Boolean> slaveUpFuture = masterSlaveEntry.slaveUpAsync(newSlaveAddr, FreezeReason.MANAGER); CompletableFuture<Boolean> slaveUpFuture = masterSlaveEntry.slaveUpAsync(newSlaveAddr);
slaveUpFuture.whenComplete((r, e) -> { slaveUpFuture.whenComplete((r, e) -> {
if (e != null) { if (e != null) {
promise.complete(null); promise.complete(null);
@ -208,7 +207,7 @@ public class DNSMonitor {
} }
if (r) { if (r) {
slaves.put(entry.getKey(), newSlaveAddr); slaves.put(entry.getKey(), newSlaveAddr);
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); masterSlaveEntry.slaveDown(currentSlaveAddr);
} }
promise.complete(null); promise.complete(null);
}); });
@ -222,7 +221,7 @@ public class DNSMonitor {
} }
slaves.put(entry.getKey(), newSlaveAddr); slaves.put(entry.getKey(), newSlaveAddr);
masterSlaveEntry.slaveDown(currentSlaveAddr, FreezeReason.MANAGER); masterSlaveEntry.slaveDown(currentSlaveAddr);
promise.complete(null); promise.complete(null);
}); });
} }

@ -192,9 +192,9 @@ public class MasterSlaveEntry {
}); });
} }
public boolean slaveDown(InetSocketAddress address, FreezeReason freezeReason) { public boolean slaveDown(InetSocketAddress address) {
ClientConnectionsEntry connectionEntry = getEntry(address); ClientConnectionsEntry connectionEntry = getEntry(address);
ClientConnectionsEntry entry = freeze(connectionEntry, freezeReason); ClientConnectionsEntry entry = freeze(connectionEntry, FreezeReason.MANAGER);
if (entry == null) { if (entry == null) {
return false; return false;
} }
@ -202,9 +202,9 @@ public class MasterSlaveEntry {
return slaveDown(entry); return slaveDown(entry);
} }
public boolean slaveDown(RedisURI address, FreezeReason freezeReason) { public boolean slaveDown(RedisURI address) {
ClientConnectionsEntry connectionEntry = getEntry(address); ClientConnectionsEntry connectionEntry = getEntry(address);
ClientConnectionsEntry entry = freeze(connectionEntry, freezeReason); ClientConnectionsEntry entry = freeze(connectionEntry, FreezeReason.MANAGER);
if (entry == null) { if (entry == null) {
return false; return false;
} }
@ -282,7 +282,7 @@ public class MasterSlaveEntry {
} }
if ("PONG".equals(t)) { if ("PONG".equals(t)) {
CompletableFuture<Boolean> ff = slaveUpAsync(entry, FreezeReason.RECONNECT); CompletableFuture<Boolean> ff = slaveUpAsync(entry);
ff.thenAccept(r -> { ff.thenAccept(r -> {
if (r) { if (r) {
log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
@ -411,9 +411,9 @@ public class MasterSlaveEntry {
return masterEntry.getClient(); return masterEntry.getClient();
} }
public CompletableFuture<Boolean> slaveUpAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) { private CompletableFuture<Boolean> slaveUpAsync(ClientConnectionsEntry entry) {
noPubSubSlaves.set(false); noPubSubSlaves.set(false);
CompletableFuture<Boolean> f = unfreezeAsync(entry, freezeReason); CompletableFuture<Boolean> f = unfreezeAsync(entry, FreezeReason.RECONNECT);
return f.thenApply(r -> { return f.thenApply(r -> {
if (r) { if (r) {
excludeMasterFromSlaves(entry.getClient().getAddr()); excludeMasterFromSlaves(entry.getClient().getAddr());
@ -423,9 +423,9 @@ public class MasterSlaveEntry {
}); });
} }
public CompletableFuture<Boolean> slaveUpAsync(RedisURI address, FreezeReason freezeReason) { public CompletableFuture<Boolean> slaveUpAsync(RedisURI address) {
noPubSubSlaves.set(false); noPubSubSlaves.set(false);
CompletableFuture<Boolean> f = unfreezeAsync(address, freezeReason); CompletableFuture<Boolean> f = unfreezeAsync(address);
return f.thenApply(r -> { return f.thenApply(r -> {
if (r) { if (r) {
excludeMasterFromSlaves(address); excludeMasterFromSlaves(address);
@ -459,19 +459,19 @@ public class MasterSlaveEntry {
return true; return true;
} }
public CompletableFuture<Boolean> slaveUpNoMasterExclusionAsync(RedisURI address, FreezeReason freezeReason) { public CompletableFuture<Boolean> slaveUpNoMasterExclusionAsync(RedisURI address) {
noPubSubSlaves.set(false); noPubSubSlaves.set(false);
return unfreezeAsync(address, freezeReason); return unfreezeAsync(address);
} }
public CompletableFuture<Boolean> slaveUpNoMasterExclusionAsync(InetSocketAddress address, FreezeReason freezeReason) { public CompletableFuture<Boolean> slaveUpNoMasterExclusionAsync(InetSocketAddress address) {
noPubSubSlaves.set(false); noPubSubSlaves.set(false);
return unfreezeAsync(address, freezeReason); return unfreezeAsync(address);
} }
public CompletableFuture<Boolean> slaveUpAsync(InetSocketAddress address, FreezeReason freezeReason) { public CompletableFuture<Boolean> slaveUpAsync(InetSocketAddress address) {
noPubSubSlaves.set(false); noPubSubSlaves.set(false);
CompletableFuture<Boolean> f = unfreezeAsync(address, freezeReason); CompletableFuture<Boolean> f = unfreezeAsync(address);
return f.thenApply(r -> { return f.thenApply(r -> {
if (r) { if (r) {
excludeMasterFromSlaves(address); excludeMasterFromSlaves(address);
@ -685,8 +685,8 @@ public class MasterSlaveEntry {
@SuppressWarnings("BooleanExpressionComplexity") @SuppressWarnings("BooleanExpressionComplexity")
private ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { private ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null || (connectionEntry.getClient().getConfig().getFailedNodeDetector().isNodeFailed() if (connectionEntry == null || (connectionEntry.getClient().getConfig().getFailedNodeDetector().isNodeFailed()
&& connectionEntry.getFreezeReason() == FreezeReason.RECONNECT && connectionEntry.getFreezeReason() == FreezeReason.RECONNECT
&& freezeReason == FreezeReason.RECONNECT)) { && freezeReason == FreezeReason.RECONNECT)) {
return null; return null;
} }
@ -708,24 +708,24 @@ public class MasterSlaveEntry {
}); });
} }
private CompletableFuture<Boolean> unfreezeAsync(RedisURI address, FreezeReason freezeReason) { private CompletableFuture<Boolean> unfreezeAsync(RedisURI address) {
ClientConnectionsEntry entry = getEntry(address); ClientConnectionsEntry entry = getEntry(address);
if (entry == null) { if (entry == null) {
log.error("Can't find {} in slaves! Available slaves: {}", address, client2Entry.keySet()); log.error("Can't find {} in slaves! Available slaves: {}", address, client2Entry.keySet());
return CompletableFuture.completedFuture(false); return CompletableFuture.completedFuture(false);
} }
return unfreezeAsync(entry, freezeReason); return unfreezeAsync(entry, FreezeReason.MANAGER);
} }
private CompletableFuture<Boolean> unfreezeAsync(InetSocketAddress address, FreezeReason freezeReason) { private CompletableFuture<Boolean> unfreezeAsync(InetSocketAddress address) {
ClientConnectionsEntry entry = getEntry(address); ClientConnectionsEntry entry = getEntry(address);
if (entry == null) { if (entry == null) {
log.error("Can't find {} in slaves! Available slaves: {}", address, client2Entry.keySet()); log.error("Can't find {} in slaves! Available slaves: {}", address, client2Entry.keySet());
return CompletableFuture.completedFuture(false); return CompletableFuture.completedFuture(false);
} }
return unfreezeAsync(entry, freezeReason); return unfreezeAsync(entry, FreezeReason.MANAGER);
} }
private CompletableFuture<Boolean> unfreezeAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) { private CompletableFuture<Boolean> unfreezeAsync(ClientConnectionsEntry entry, FreezeReason freezeReason) {

@ -22,7 +22,6 @@ import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.*; import org.redisson.config.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -174,7 +173,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
.collect(Collectors.toSet()); .collect(Collectors.toSet());
for (RedisClient slave : failedSlaves) { for (RedisClient slave : failedSlaves) {
if (config.isSlaveNotUsed() || entry.slaveDown(slave.getAddr(), FreezeReason.MANAGER)) { if (config.isSlaveNotUsed() || entry.slaveDown(slave.getAddr())) {
log.info("slave: {} is down", slave); log.info("slave: {} is down", slave);
disconnectNode(new RedisURI(slave.getConfig().getAddress().getScheme(), disconnectNode(new RedisURI(slave.getConfig().getAddress().getScheme(),
slave.getAddr().getAddress().getHostAddress(), slave.getAddr().getAddress().getHostAddress(),
@ -257,7 +256,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager {
}); });
} }
return entry.slaveUpAsync(address, FreezeReason.MANAGER).thenAccept(r -> { return entry.slaveUpAsync(address).thenAccept(r -> {
if (r) { if (r) {
log.info("slave: {} is up", address); log.info("slave: {} is up", address);
} }

@ -25,7 +25,6 @@ import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.config.*; import org.redisson.config.*;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.RedisURI; import org.redisson.misc.RedisURI;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -615,7 +614,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}); });
} }
CompletableFuture<Boolean> f = entry.slaveUpNoMasterExclusionAsync(addr, FreezeReason.MANAGER); CompletableFuture<Boolean> f = entry.slaveUpNoMasterExclusionAsync(addr);
return f.thenApply(e -> { return f.thenApply(e -> {
if (e) { if (e) {
log.info("slave: {} is up", addr); log.info("slave: {} is up", addr);
@ -630,7 +629,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log.warn("slave: {} is down", addr); log.warn("slave: {} is down", addr);
} else { } else {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.slaveDown(addr, FreezeReason.MANAGER)) { if (entry.slaveDown(addr)) {
log.warn("slave: {} is down", addr); log.warn("slave: {} is down", addr);
} }
} }

Loading…
Cancel
Save