From 7fe6e3db534b808e52fcbef358865be84e09479d Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Thu, 6 Aug 2020 09:18:30 +0300 Subject: [PATCH] Fixed - MasterSlaveConnectionManager allocates superfluous 113Kb of memory for non-cluster Redis setup #2965 --- .../cluster/ClusterConnectionManager.java | 79 ++++++++++++++++++- .../MasterSlaveConnectionManager.java | 77 ++++++------------ 2 files changed, 99 insertions(+), 57 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index f57cc253f..eca7a0727 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -48,6 +48,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; /** @@ -70,7 +71,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { private String configEndpointHostName; private final NatMapper natMapper; - + + private final AtomicReferenceArray slot2entry = new AtomicReferenceArray<>(MAX_SLOT); + + private final Map client2entry = new ConcurrentHashMap<>(); + public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) { super(config, id); @@ -153,7 +158,77 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { scheduleClusterChangeCheck(cfg, null); } - + + @Override + public Collection getEntrySet() { + return client2entry.values(); + } + + protected MasterSlaveEntry getEntry(RedisURI addr) { + for (MasterSlaveEntry entry : client2entry.values()) { + if (RedisURI.compare(entry.getClient().getAddr(), addr)) { + return entry; + } + if (entry.hasSlave(addr)) { + return entry; + } + } + return null; + } + + @Override + public MasterSlaveEntry getEntry(RedisClient redisClient) { + MasterSlaveEntry entry = client2entry.get(redisClient); + if (entry != null) { + return entry; + } + + for (MasterSlaveEntry mentry : client2entry.values()) { + if (mentry.hasSlave(redisClient)) { + return mentry; + } + } + return null; + } + + @Override + public MasterSlaveEntry getEntry(InetSocketAddress address) { + for (MasterSlaveEntry entry : client2entry.values()) { + InetSocketAddress addr = entry.getClient().getAddr(); + if (addr.getAddress().equals(address.getAddress()) && addr.getPort() == address.getPort()) { + return entry; + } + } + return null; + } + + protected void removeClient(RedisClient client) { + client2entry.remove(client); + } + + protected void addClient(MasterSlaveEntry entry) { + client2entry.put(entry.getClient(), entry); + } + + @Override + public MasterSlaveEntry getEntry(int slot) { + return slot2entry.get(slot); + } + + private void addEntry(Integer slot, MasterSlaveEntry entry) { + MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry); + if (oldEntry != entry) { + entry.incReference(); + shutdownEntry(oldEntry); + } + client2entry.put(entry.getClient(), entry); + } + + private void removeEntry(Integer slot) { + MasterSlaveEntry entry = slot2entry.getAndSet(slot, null); + shutdownEntry(entry); + } + @Override protected RedisClientConfig createRedisConfig(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) { RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout, sslHostname); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 73124c2be..1558bc4b4 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; import org.redisson.ElementsSubscribeService; @@ -128,8 +127,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected MasterSlaveServersConfig config; - private final AtomicReferenceArray slot2entry = new AtomicReferenceArray<>(MAX_SLOT); - private final Map client2entry = new ConcurrentHashMap<>(); + private MasterSlaveEntry masterSlaveEntry; private final Promise shutdownPromise = ImmediateEventExecutor.INSTANCE.newPromise(); @@ -330,7 +328,10 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public Collection getEntrySet() { - return client2entry.values(); + if (masterSlaveEntry != null) { + return Collections.singletonList(masterSlaveEntry); + } + return Collections.emptyList(); } protected void initTimer(MasterSlaveServersConfig config) { @@ -369,10 +370,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { } } - for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) { - addEntry(slot, entry); - } - + masterSlaveEntry = entry; + startDNSMonitoring(masterFuture.getNow()); } catch (Exception e) { stopThreads(); @@ -502,40 +501,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public MasterSlaveEntry getEntry(InetSocketAddress address) { - for (MasterSlaveEntry entry : client2entry.values()) { - InetSocketAddress addr = entry.getClient().getAddr(); - if (addr.getAddress().equals(address.getAddress()) && addr.getPort() == address.getPort()) { - return entry; - } - } - return null; + return masterSlaveEntry; } protected MasterSlaveEntry getEntry(RedisURI addr) { - for (MasterSlaveEntry entry : client2entry.values()) { - if (RedisURI.compare(entry.getClient().getAddr(), addr)) { - return entry; - } - if (entry.hasSlave(addr)) { - return entry; - } - } - return null; + return masterSlaveEntry; } @Override public MasterSlaveEntry getEntry(RedisClient redisClient) { - MasterSlaveEntry entry = client2entry.get(redisClient); - if (entry != null) { - return entry; - } - - for (MasterSlaveEntry mentry : client2entry.values()) { - if (mentry.hasSlave(redisClient)) { - return mentry; - } - } - return null; + return masterSlaveEntry; } @Override @@ -546,39 +521,31 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public MasterSlaveEntry getEntry(int slot) { - return slot2entry.get(slot); + return masterSlaveEntry; } - + + protected void removeClient(RedisClient client) { + } + + protected void addClient(MasterSlaveEntry entry) { + } + protected final RFuture changeMaster(int slot, RedisURI address) { final MasterSlaveEntry entry = getEntry(slot); final RedisClient oldClient = entry.getClient(); RFuture future = entry.changeMaster(address); future.onComplete((res, e) -> { if (e == null) { - client2entry.remove(oldClient); - client2entry.put(entry.getClient(), entry); + removeClient(oldClient); + addClient(entry); } }); return future; } - protected final void addEntry(Integer slot, MasterSlaveEntry entry) { - MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry); - if (oldEntry != entry) { - entry.incReference(); - shutdownEntry(oldEntry); - } - client2entry.put(entry.getClient(), entry); - } - - protected final void removeEntry(Integer slot) { - MasterSlaveEntry entry = slot2entry.getAndSet(slot, null); - shutdownEntry(entry); - } - - private void shutdownEntry(MasterSlaveEntry entry) { + protected final void shutdownEntry(MasterSlaveEntry entry) { if (entry != null && entry.decReference() == 0) { - client2entry.remove(entry.getClient()); + removeClient(entry.getClient()); entry.getAllEntries().forEach(e -> entry.nodeDown(e)); entry.masterDown(); entry.shutdownAsync();