From 063dcabc8c4487018d56dba57640408cd89e49ec Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 2 Oct 2017 19:41:04 +0300 Subject: [PATCH] MasterSlaveConnectionManager#getEntry(java.net.InetSocketAddress) optimization. #1081 --- .../main/java/org/redisson/RedissonKeys.java | 16 +++---- .../cluster/ClusterConnectionManager.java | 17 +++----- .../redisson/command/CommandAsyncService.java | 13 +++--- .../connection/ConnectionManager.java | 3 +- .../MasterSlaveConnectionManager.java | 43 +++++++++++-------- 5 files changed, 45 insertions(+), 47 deletions(-) diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 3361b8ce0..1aae21912 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -207,7 +206,7 @@ public class RedissonKeys implements RKeys { final RPromise result = commandExecutor.getConnectionManager().newPromise(); final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); - Set entries = commandExecutor.getConnectionManager().getEntrySet(); + Collection entries = commandExecutor.getConnectionManager().getEntrySet(); final AtomicLong executed = new AtomicLong(entries.size()); final FutureListener listener = new FutureListener() { @Override @@ -301,14 +300,13 @@ public class RedissonKeys implements RKeys { Map> range2key = new HashMap>(); for (String key : keys) { int slot = commandExecutor.getConnectionManager().calcSlot(key); - for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } - list.add(key); + MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(slot); + List list = range2key.get(entry); + if (list == null) { + list = new ArrayList(); + range2key.put(entry, list); } + list.add(key); } final RPromise result = commandExecutor.getConnectionManager().newPromise(); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index bb3e7d352..4eb0a1977 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -568,8 +568,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } for (Integer slot : removedSlots) { - MasterSlaveEntry entry = removeMaster(slot); - entry.removeSlotRange(slot); + MasterSlaveEntry entry = getEntry(slot); + removeMaster(slot); if (entry.getSlotRanges().isEmpty()) { entry.shutdownMasterAsync(); log.info("{} master and slaves for it removed", entry.getClient().getAddr()); @@ -584,12 +584,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { } for (final Integer slot : addedSlots) { ClusterPartition partition = find(newPartitions, slot); - for (MasterSlaveEntry entry : getEntrySet()) { - if (entry.getClient().getAddr().equals(partition.getMasterAddr())) { - addEntry(slot, entry); - lastPartitions.put(slot, partition); - break; - } + MasterSlaveEntry entry = getEntry(partition.getMasterAddr()); + if (entry != null && entry.getClient().getAddr().equals(partition.getMasterAddr())) { + addEntry(slot, entry); + lastPartitions.put(slot, partition); + break; } } } @@ -611,7 +610,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr()); for (Integer slot : addedSlots) { - entry.addSlotRange(slot); addEntry(slot, entry); lastPartitions.put(slot, currentPartition); } @@ -623,7 +621,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager { removedSlots.removeAll(newPartition.getSlots()); for (Integer removeSlot : removedSlots) { if (lastPartitions.remove(removeSlot, currentPartition)) { - entry.removeSlotRange(removeSlot); removeMaster(removeSlot); } } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 784d6a278..cadcecefd 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -16,11 +16,13 @@ package org.redisson.command; import java.net.InetSocketAddress; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -71,9 +73,6 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.util.AbstractMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; /** * @@ -198,7 +197,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { final RPromise> mainPromise = connectionManager.newPromise(); - final Set nodes = connectionManager.getEntrySet(); + final Collection nodes = connectionManager.getEntrySet(); final List results = new ArrayList(); final AtomicInteger counter = new AtomicInteger(nodes.size()); FutureListener listener = new FutureListener() { @@ -288,7 +287,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { private RFuture allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object... params) { final RPromise mainPromise = connectionManager.newPromise(); - final Set nodes = connectionManager.getEntrySet(); + final Collection nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); FutureListener listener = new FutureListener() { @Override @@ -413,7 +412,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { public RFuture evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object... params) { final RPromise mainPromise = connectionManager.newPromise(); - final Set entries = connectionManager.getEntrySet(); + final Collection entries = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(entries.size()); FutureListener listener = new FutureListener() { diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index 6a6ee0517..b89edb115 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -18,7 +18,6 @@ package org.redisson.connection; import java.net.InetSocketAddress; import java.net.URI; import java.util.Collection; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -85,7 +84,7 @@ public interface ConnectionManager { Codec getCodec(); - Set getEntrySet(); + Collection getEntrySet(); MasterSlaveEntry getEntry(int slot); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 5918060b9..69e357742 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -24,9 +24,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Queue; -import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -130,7 +128,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected MasterSlaveServersConfig config; - private final Map entries = PlatformDependent.newConcurrentHashMap(); + private final Map slot2entry = PlatformDependent.newConcurrentHashMap(); + private final Map addr2entry = PlatformDependent.newConcurrentHashMap(); private final RPromise shutdownPromise; @@ -234,8 +233,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager { return codec; } - public Set getEntrySet() { - return new HashSet(entries.values()); + @Override + public Collection getEntrySet() { + return addr2entry.values(); } protected void initTimer(MasterSlaveServersConfig config) { @@ -672,29 +672,34 @@ public class MasterSlaveConnectionManager implements ConnectionManager { @Override public MasterSlaveEntry getEntry(InetSocketAddress addr) { - // TODO optimize - for (Entry entry : entries.entrySet()) { - if (entry.getValue().getClient().getAddr().equals(addr)) { - return entry.getValue(); - } - } - return null; + return addr2entry.get(addr); } + @Override public MasterSlaveEntry getEntry(int slot) { - return entries.get(slot); + return slot2entry.get(slot); } - protected void changeMaster(int slot, URI address) { - getEntry(slot).changeMaster(address); + protected final void changeMaster(int slot, URI address) { + MasterSlaveEntry entry = getEntry(slot); + addr2entry.remove(entry.getClient().getAddr()); + entry.changeMaster(address); + addr2entry.put(entry.getClient().getAddr(), entry); } - protected void addEntry(Integer slot, MasterSlaveEntry entry) { - entries.put(slot, entry); + protected final void addEntry(Integer slot, MasterSlaveEntry entry) { + slot2entry.put(slot, entry); + entry.addSlotRange(slot); + addr2entry.put(entry.getClient().getAddr(), entry); } protected MasterSlaveEntry removeMaster(Integer slot) { - return entries.remove(slot); + MasterSlaveEntry entry = slot2entry.remove(slot); + entry.removeSlotRange(slot); + if (entry.getSlotRanges().isEmpty()) { + addr2entry.remove(entry.getClient().getAddr()); + } + return entry; } @Override @@ -812,7 +817,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { shutdownPromise.trySuccess(true); shutdownLatch.awaitUninterruptibly(); - for (MasterSlaveEntry entry : entries.values()) { + for (MasterSlaveEntry entry : getEntrySet()) { entry.shutdown(); }