From 14bcc022430c6ff81a555e5f3371db5f9d53d94f Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Aug 2020 11:39:45 +0300 Subject: [PATCH] refactoring --- .../cluster/ClusterConnectionManager.java | 34 ++++++++++++++--- .../MasterSlaveConnectionManager.java | 38 ++----------------- .../redisson/connection/MasterSlaveEntry.java | 4 +- 3 files changed, 35 insertions(+), 41 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index eca7a0727..5e1ec4c1a 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -202,12 +202,18 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { return null; } - protected void removeClient(RedisClient client) { - client2entry.remove(client); - } - - protected void addClient(MasterSlaveEntry entry) { - client2entry.put(entry.getClient(), entry); + @Override + protected RFuture changeMaster(int slot, RedisURI address) { + MasterSlaveEntry entry = getEntry(slot); + RedisClient oldClient = entry.getClient(); + RFuture future = super.changeMaster(slot, address); + future.onComplete((res, e) -> { + if (e == null) { + client2entry.remove(oldClient); + client2entry.put(entry.getClient(), entry); + } + }); + return future; } @Override @@ -229,6 +235,22 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { shutdownEntry(entry); } + private void shutdownEntry(MasterSlaveEntry entry) { + if (entry != null && entry.decReference() == 0) { + client2entry.remove(entry.getClient()); + entry.getAllEntries().forEach(e -> entry.nodeDown(e)); + entry.masterDown(); + entry.shutdownAsync(); + subscribeService.remove(entry); + + String slaves = entry.getAllEntries().stream() + .filter(e -> !e.getClient().getAddr().equals(entry.getClient().getAddr())) + .map(e -> e.getClient().toString()) + .collect(Collectors.joining(",")); + log.info("{} master and related slaves: {} removed", entry.getClient().getAddr(), slaves); + } + } + @Override protected RedisClientConfig createRedisConfig(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) { RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout, sslHostname); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index dd6003674..1098a34d2 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -147,7 +147,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { private final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService(this); - private PublishSubscribeService subscribeService; + protected PublishSubscribeService subscribeService; private final Map nodeConnections = new ConcurrentHashMap<>(); @@ -521,41 +521,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return masterSlaveEntry; } - protected void removeClient(RedisClient client) { - } - - protected void addClient(MasterSlaveEntry entry) { - } - - protected final RFuture changeMaster(int slot, RedisURI address) { - final MasterSlaveEntry entry = getEntry(slot); - final RedisClient oldClient = entry.getClient(); - RFuture future = entry.changeMaster(address); - future.onComplete((res, e) -> { - if (e == null) { - removeClient(oldClient); - addClient(entry); - } - }); - return future; + protected RFuture changeMaster(int slot, RedisURI address) { + MasterSlaveEntry entry = getEntry(slot); + return entry.changeMaster(address); } - protected final void shutdownEntry(MasterSlaveEntry entry) { - if (entry != null && entry.decReference() == 0) { - removeClient(entry.getClient()); - entry.getAllEntries().forEach(e -> entry.nodeDown(e)); - entry.masterDown(); - entry.shutdownAsync(); - subscribeService.remove(entry); - - String slaves = entry.getAllEntries().stream() - .filter(e -> !e.getClient().getAddr().equals(entry.getClient().getAddr())) - .map(e -> e.getClient().toString()) - .collect(Collectors.joining(",")); - log.info("{} master and related slaves: {} removed", entry.getClient().getAddr(), slaves); - } - } - @Override public RFuture connectionWriteOp(NodeSource source, RedisCommand command) { MasterSlaveEntry entry = getEntry(source); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index 5dc800071..ea15d6b74 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -480,7 +480,9 @@ public class MasterSlaveEntry { RPromise result = new RedissonPromise(); CountableListener listener = new CountableListener(result, null, 2); - masterEntry.getClient().shutdownAsync().onComplete(listener); + if (masterEntry != null) { + masterEntry.getClient().shutdownAsync().onComplete(listener); + } slaveBalancer.shutdownAsync().onComplete(listener); return result; }