Fixed - WeightedRoundRobinBalancer filters master node in readMode = ReadMode.MASTER_SLAVE #3114

pull/3438/head
Nikita Koksharov 4 years ago
parent 7839e33037
commit e4cd993c8a

@ -15,20 +15,17 @@
*/ */
package org.redisson.connection.balancer; package org.redisson.connection.balancer;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.misc.RedisURI;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.misc.RedisURI;
/** /**
* Weighted Round Robin balancer. * Weighted Round Robin balancer.
@ -90,40 +87,18 @@ public class WeightedRoundRobinBalancer implements LoadBalancer {
this.defaultWeight = defaultWeight; this.defaultWeight = defaultWeight;
} }
private Set<InetSocketAddress> getAddresses(List<ClientConnectionsEntry> clients) {
Set<InetSocketAddress> result = new HashSet<>();
for (ClientConnectionsEntry entry : clients) {
if (entry.isFreezed()) {
continue;
}
result.add(entry.getClient().getAddr());
}
return result;
}
@Override @Override
public ClientConnectionsEntry getEntry(List<ClientConnectionsEntry> clients) { public ClientConnectionsEntry getEntry(List<ClientConnectionsEntry> clients) {
Set<InetSocketAddress> addresses = getAddresses(clients); clients.stream()
.map(e -> e.getClient().getAddr())
if (!addresses.equals(weights.keySet())) { .distinct()
Set<InetSocketAddress> newAddresses = new HashSet<>(addresses); .filter(a -> !weights.containsKey(a))
newAddresses.removeAll(weights.keySet()); .forEach(a -> weights.put(a, new WeightEntry(defaultWeight)));
for (InetSocketAddress addr : newAddresses) {
weights.put(addr, new WeightEntry(defaultWeight));
}
}
Map<InetSocketAddress, WeightEntry> weightsCopy = new HashMap<>(weights); Map<InetSocketAddress, WeightEntry> weightsCopy = new HashMap<>(weights);
synchronized (this) { synchronized (this) {
for (Iterator<WeightEntry> iterator = weightsCopy.values().iterator(); iterator.hasNext();) { weightsCopy.values().removeIf(WeightEntry::isWeightCounterZero);
WeightEntry entry = iterator.next();
if (entry.isWeightCounterZero()) {
iterator.remove();
}
}
if (weightsCopy.isEmpty()) { if (weightsCopy.isEmpty()) {
for (WeightEntry entry : weights.values()) { for (WeightEntry entry : weights.values()) {
@ -155,18 +130,11 @@ public class WeightedRoundRobinBalancer implements LoadBalancer {
} }
} }
private List<ClientConnectionsEntry> findClients(List<ClientConnectionsEntry> clients, Map<InetSocketAddress, WeightEntry> weightsCopy) { private List<ClientConnectionsEntry> findClients(List<ClientConnectionsEntry> clients,
List<ClientConnectionsEntry> clientsCopy = new ArrayList<>(); Map<InetSocketAddress, WeightEntry> weightsCopy) {
for (InetSocketAddress addr : weightsCopy.keySet()) { return clients.stream()
for (ClientConnectionsEntry clientConnectionsEntry : clients) { .filter(e -> weightsCopy.containsKey(e.getClient().getAddr()))
if (clientConnectionsEntry.getClient().getAddr().equals(addr) .collect(Collectors.toList());
&& !clientConnectionsEntry.isFreezed()) {
clientsCopy.add(clientConnectionsEntry);
break;
}
}
}
return clientsCopy;
} }
} }

Loading…
Cancel
Save