From 7baad67849a9118a2abd5273fc191ef8e330436f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Wed, 1 May 2024 10:31:59 +0300 Subject: [PATCH] refactoring --- .../redisson/cluster/ClusterConnectionManager.java | 9 ++++++--- .../connection/MasterSlaveConnectionManager.java | 6 +++--- .../org/redisson/connection/MasterSlaveEntry.java | 4 ++-- .../connection/ReplicatedConnectionManager.java | 4 ++-- .../connection/SentinelConnectionManager.java | 11 ++++++----- 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index faaf70d5f..ef01501ef 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -79,7 +79,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } @Override - public void doConnect(Set disconnectedSlaves, Function hostnameMapper) { + public void doConnect(Function hostnameMapper) { if (cfg.getNodeAddresses().isEmpty()) { throw new IllegalArgumentException("At least one cluster node should be defined!"); } @@ -323,7 +323,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { if (config.isSlaveNotUsed()) { entry = new SingleEntry(this, config); } else { - Set slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet()); + Set slaveAddresses = partition.getSlaveAddresses().stream() + .filter(r -> !partition.getFailedSlaveAddresses().contains(r)) + .map(r -> r.toString()) + .collect(Collectors.toSet()); config.setSlaveAddresses(slaveAddresses); entry = new MasterSlaveEntry(ClusterConnectionManager.this, config); @@ -340,7 +343,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } if (!config.isSlaveNotUsed()) { - CompletableFuture fs = entry.initSlaveBalancer(partition.getFailedSlaveAddresses(), r -> configEndpointHostName); + CompletableFuture fs = entry.initSlaveBalancer(r -> configEndpointHostName); return fs.thenAccept(r -> { if (!partition.getSlaveAddresses().isEmpty()) { log.info("slaves: {} added for master: {} slot ranges: {}", diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index da527b37d..93fba67c3 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -190,7 +190,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { if (i == attempts - 1) { lastAttempt = true; } - doConnect(new HashSet<>(), u -> null); + doConnect(u -> null); return; } catch (IllegalArgumentException e) { shutdown(); @@ -210,7 +210,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - protected void doConnect(Set disconnectedSlaves, Function hostnameMapper) { + protected void doConnect(Function hostnameMapper) { try { if (config.isSlaveNotUsed()) { masterSlaveEntry = new SingleEntry(this, config); @@ -234,7 +234,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } if (!config.isSlaveNotUsed()) { - CompletableFuture fs = masterSlaveEntry.initSlaveBalancer(disconnectedSlaves, hostnameMapper); + CompletableFuture fs = masterSlaveEntry.initSlaveBalancer(hostnameMapper); try { if (config.getSlaveConnectionMinimumIdleSize() == 0) { fs.join(); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 64f0bd28d..ef3daf684 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -90,12 +90,12 @@ public class MasterSlaveEntry { return config; } - public CompletableFuture initSlaveBalancer(Collection disconnectedNodes, Function hostnameMapper) { + public CompletableFuture initSlaveBalancer(Function hostnameMapper) { List> result = new ArrayList<>(config.getSlaveAddresses().size()); for (String address : config.getSlaveAddresses()) { RedisURI uri = new RedisURI(address); String hostname = hostnameMapper.apply(uri); - CompletableFuture f = addSlave(uri, disconnectedNodes.contains(uri), hostname); + CompletableFuture f = addSlave(uri, false, hostname); result.add(f); } diff --git a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java index 67d03627a..04d0a9c2b 100644 --- a/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ReplicatedConnectionManager.java @@ -70,7 +70,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { } @Override - public void doConnect(Set disconnectedSlaves, Function hostnameMapper) { + public void doConnect(Function hostnameMapper) { if (cfg.getNodeAddresses().isEmpty()) { throw new IllegalArgumentException("At least one Redis node should be defined!"); } @@ -107,7 +107,7 @@ public class ReplicatedConnectionManager extends MasterSlaveConnectionManager { log.warn("ReadMode = {}, but slave nodes are not found! Please specify all nodes in replicated mode.", this.config.getReadMode()); } - super.doConnect(disconnectedSlaves, hostnameMapper); + super.doConnect(hostnameMapper); scheduleMasterChangeCheck(cfg); } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index f37c8b867..acfff6084 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -76,7 +76,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { } @Override - public void doConnect(Set disconnectedSlaves, Function hostnameMapper) { + public void doConnect(Function hostnameMapper) { checkAuth(cfg); if ("redis".equals(scheme)) { @@ -133,13 +133,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { if (isHostname(host)) { uri2hostname.put(uri, host); } - this.config.addSlaveAddress(uri.toString()); + log.debug("slave {} state: {}", slaveAddr, map); - log.info("slave: {} added", slaveAddr); if (isSlaveDown(flags, masterLinkStatus)) { - disconnectedSlaves.add(uri); log.warn("slave: {} is down", slaveAddr); + } else { + this.config.addSlaveAddress(uri.toString()); + log.info("slave: {} added", slaveAddr); } } @@ -201,7 +202,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager { log.warn("ReadMode = {}, but slave nodes are not found!", this.config.getReadMode()); } - super.doConnect(disconnectedSlaves, uri2hostname::get); + super.doConnect(uri2hostname::get); scheduleChangeCheck(cfg, null); }