MasterSlaveConnectionManager#getEntry(java.net.InetSocketAddress) optimization. #1081

pull/1085/head
Nikita 8 years ago
parent 0be813da07
commit 063dcabc8c

@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -207,7 +206,7 @@ public class RedissonKeys implements RKeys {
final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise(); final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
Set<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet(); Collection<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet();
final AtomicLong executed = new AtomicLong(entries.size()); final AtomicLong executed = new AtomicLong(entries.size());
final FutureListener<Long> listener = new FutureListener<Long>() { final FutureListener<Long> listener = new FutureListener<Long>() {
@Override @Override
@ -301,14 +300,13 @@ public class RedissonKeys implements RKeys {
Map<MasterSlaveEntry, List<String>> range2key = new HashMap<MasterSlaveEntry, List<String>>(); Map<MasterSlaveEntry, List<String>> range2key = new HashMap<MasterSlaveEntry, List<String>>();
for (String key : keys) { for (String key : keys) {
int slot = commandExecutor.getConnectionManager().calcSlot(key); int slot = commandExecutor.getConnectionManager().calcSlot(key);
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(slot);
List<String> list = range2key.get(entry); List<String> list = range2key.get(entry);
if (list == null) { if (list == null) {
list = new ArrayList<String>(); list = new ArrayList<String>();
range2key.put(entry, list); range2key.put(entry, list);
}
list.add(key);
} }
list.add(key);
} }
final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise(); final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise();

@ -568,8 +568,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
for (Integer slot : removedSlots) { for (Integer slot : removedSlots) {
MasterSlaveEntry entry = removeMaster(slot); MasterSlaveEntry entry = getEntry(slot);
entry.removeSlotRange(slot); removeMaster(slot);
if (entry.getSlotRanges().isEmpty()) { if (entry.getSlotRanges().isEmpty()) {
entry.shutdownMasterAsync(); entry.shutdownMasterAsync();
log.info("{} master and slaves for it removed", entry.getClient().getAddr()); log.info("{} master and slaves for it removed", entry.getClient().getAddr());
@ -584,12 +584,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
} }
for (final Integer slot : addedSlots) { for (final Integer slot : addedSlots) {
ClusterPartition partition = find(newPartitions, slot); ClusterPartition partition = find(newPartitions, slot);
for (MasterSlaveEntry entry : getEntrySet()) { MasterSlaveEntry entry = getEntry(partition.getMasterAddr());
if (entry.getClient().getAddr().equals(partition.getMasterAddr())) { if (entry != null && entry.getClient().getAddr().equals(partition.getMasterAddr())) {
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, partition); lastPartitions.put(slot, partition);
break; break;
}
} }
} }
} }
@ -611,7 +610,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr()); MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr());
for (Integer slot : addedSlots) { for (Integer slot : addedSlots) {
entry.addSlotRange(slot);
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, currentPartition); lastPartitions.put(slot, currentPartition);
} }
@ -623,7 +621,6 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
removedSlots.removeAll(newPartition.getSlots()); removedSlots.removeAll(newPartition.getSlots());
for (Integer removeSlot : removedSlots) { for (Integer removeSlot : removedSlots) {
if (lastPartitions.remove(removeSlot, currentPartition)) { if (lastPartitions.remove(removeSlot, currentPartition)) {
entry.removeSlotRange(removeSlot);
removeMaster(removeSlot); removeMaster(removeSlot);
} }
} }

@ -16,11 +16,13 @@
package org.redisson.command; package org.redisson.command;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -71,9 +73,6 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import java.util.AbstractMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
/** /**
* *
@ -198,7 +197,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
@Override @Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) { public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = connectionManager.newPromise(); final RPromise<Collection<R>> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final List<R> results = new ArrayList<R>(); final List<R> results = new ArrayList<R>();
final AtomicInteger counter = new AtomicInteger(nodes.size()); final AtomicInteger counter = new AtomicInteger(nodes.size());
FutureListener<R> listener = new FutureListener<R>() { FutureListener<R> listener = new FutureListener<R>() {
@ -288,7 +287,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) { private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise(); final RPromise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(nodes.size()); final AtomicInteger counter = new AtomicInteger(nodes.size());
FutureListener<T> listener = new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {
@Override @Override
@ -413,7 +412,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) { public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise(); final RPromise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> entries = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(entries.size()); final AtomicInteger counter = new AtomicInteger(entries.size());
FutureListener<T> listener = new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {

@ -18,7 +18,6 @@ package org.redisson.connection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -85,7 +84,7 @@ public interface ConnectionManager {
Codec getCodec(); Codec getCodec();
Set<MasterSlaveEntry> getEntrySet(); Collection<MasterSlaveEntry> getEntrySet();
MasterSlaveEntry getEntry(int slot); MasterSlaveEntry getEntry(int slot);

@ -24,9 +24,7 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue; import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -130,7 +128,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected MasterSlaveServersConfig config; protected MasterSlaveServersConfig config;
private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap(); private final Map<Integer, MasterSlaveEntry> slot2entry = PlatformDependent.newConcurrentHashMap();
private final Map<InetSocketAddress, MasterSlaveEntry> addr2entry = PlatformDependent.newConcurrentHashMap();
private final RPromise<Boolean> shutdownPromise; private final RPromise<Boolean> shutdownPromise;
@ -234,8 +233,9 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return codec; return codec;
} }
public Set<MasterSlaveEntry> getEntrySet() { @Override
return new HashSet<MasterSlaveEntry>(entries.values()); public Collection<MasterSlaveEntry> getEntrySet() {
return addr2entry.values();
} }
protected void initTimer(MasterSlaveServersConfig config) { protected void initTimer(MasterSlaveServersConfig config) {
@ -672,29 +672,34 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override @Override
public MasterSlaveEntry getEntry(InetSocketAddress addr) { public MasterSlaveEntry getEntry(InetSocketAddress addr) {
// TODO optimize return addr2entry.get(addr);
for (Entry<Integer, MasterSlaveEntry> entry : entries.entrySet()) {
if (entry.getValue().getClient().getAddr().equals(addr)) {
return entry.getValue();
}
}
return null;
} }
@Override
public MasterSlaveEntry getEntry(int slot) { public MasterSlaveEntry getEntry(int slot) {
return entries.get(slot); return slot2entry.get(slot);
} }
protected void changeMaster(int slot, URI address) { protected final void changeMaster(int slot, URI address) {
getEntry(slot).changeMaster(address); MasterSlaveEntry entry = getEntry(slot);
addr2entry.remove(entry.getClient().getAddr());
entry.changeMaster(address);
addr2entry.put(entry.getClient().getAddr(), entry);
} }
protected void addEntry(Integer slot, MasterSlaveEntry entry) { protected final void addEntry(Integer slot, MasterSlaveEntry entry) {
entries.put(slot, entry); slot2entry.put(slot, entry);
entry.addSlotRange(slot);
addr2entry.put(entry.getClient().getAddr(), entry);
} }
protected MasterSlaveEntry removeMaster(Integer slot) { protected MasterSlaveEntry removeMaster(Integer slot) {
return entries.remove(slot); MasterSlaveEntry entry = slot2entry.remove(slot);
entry.removeSlotRange(slot);
if (entry.getSlotRanges().isEmpty()) {
addr2entry.remove(entry.getClient().getAddr());
}
return entry;
} }
@Override @Override
@ -812,7 +817,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
shutdownPromise.trySuccess(true); shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly(); shutdownLatch.awaitUninterruptibly();
for (MasterSlaveEntry entry : entries.values()) { for (MasterSlaveEntry entry : getEntrySet()) {
entry.shutdown(); entry.shutdown();
} }

Loading…
Cancel
Save