Fixed - Old/stale nodes not removed from NodesGroup #1330

pull/1344/head
Nikita 7 years ago
parent ac5365a098
commit cdc1d164c3

@ -30,9 +30,12 @@ import org.redisson.api.NodesGroup;
import org.redisson.api.RFuture;
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ConnectionListener;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.RedisClientEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.misc.URIBuilder;
import io.netty.util.concurrent.Future;
@ -54,11 +57,17 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override
public N getNode(String address) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
URI addr = URIBuilder.create(address);
for (N node : clients) {
if (URIBuilder.compare(node.getAddr(), addr)) {
return node;
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (URIBuilder.compare(masterSlaveEntry.getClient().getAddr(), addr)) {
return (N) new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
}
for (ClientConnectionsEntry entry : masterSlaveEntry.getSlaveEntries()) {
if (URIBuilder.compare(entry.getClient().getAddr(), addr) ||
entry.getFreezeReason() == null || entry.getFreezeReason() == FreezeReason.RECONNECT) {
return (N) new RedisClientEntry(entry.getClient(), connectionManager.getCommandExecutor(), entry.getNodeType());
}
}
}
return null;
@ -66,11 +75,20 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override
public Collection<N> getNodes(NodeType type) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
List<N> result = new ArrayList<N>();
for (N node : clients) {
if (node.getType().equals(type)) {
result.add(node);
for (MasterSlaveEntry masterSlaveEntry : entries) {
if (type == NodeType.MASTER) {
RedisClientEntry entry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
result.add((N) entry);
}
if (type == NodeType.SLAVE) {
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) {
if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
result.add((N) entry);
}
}
}
}
return result;
@ -79,12 +97,25 @@ public class RedisNodes<N extends Node> implements NodesGroup<N> {
@Override
public Collection<N> getNodes() {
return (Collection<N>) connectionManager.getClients();
Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
List<N> result = new ArrayList<N>();
for (MasterSlaveEntry masterSlaveEntry : entries) {
RedisClientEntry masterEntry = new RedisClientEntry(masterSlaveEntry.getClient(), connectionManager.getCommandExecutor(), NodeType.MASTER);
result.add((N) masterEntry);
for (ClientConnectionsEntry slaveEntry : masterSlaveEntry.getSlaveEntries()) {
if (slaveEntry.getFreezeReason() == null || slaveEntry.getFreezeReason() == FreezeReason.RECONNECT) {
RedisClientEntry entry = new RedisClientEntry(slaveEntry.getClient(), connectionManager.getCommandExecutor(), slaveEntry.getNodeType());
result.add((N) entry);
}
}
}
return result;
}
@Override
public boolean pingAll() {
List<RedisClientEntry> clients = new ArrayList<RedisClientEntry>(connectionManager.getClients());
List<RedisClientEntry> clients = new ArrayList<RedisClientEntry>((Collection<RedisClientEntry>)getNodes());
final Map<RedisConnection, RFuture<String>> result = new ConcurrentHashMap<RedisConnection, RFuture<String>>(clients.size());
final CountDownLatch latch = new CountDownLatch(clients.size());
for (RedisClientEntry entry : clients) {

@ -15,6 +15,11 @@
*/
package org.redisson.api;
/**
*
* @author Nikita Koksharov
*
*/
public enum NodeType {
MASTER, SLAVE, SENTINEL

@ -315,6 +315,11 @@ public class RedisClient {
return;
}
if (!hasOwnTimer && !hasOwnExecutor && !hasOwnResolver && !hasOwnGroup) {
result.trySuccess(null);
return;
}
Thread t = new Thread() {
@Override
public void run() {

@ -67,8 +67,6 @@ public interface ConnectionManager {
IdleConnectionWatcher getConnectionWatcher();
Collection<RedisClientEntry> getClients();
void shutdownAsync(RedisClient client);
int calcSlot(String key);

@ -21,7 +21,6 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -139,8 +138,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
private final Map<RedisClient, RedisClientEntry> clientEntries = PlatformDependent.newConcurrentHashMap();
private IdleConnectionWatcher connectionWatcher;
private final ConnectionEventsHub connectionEventsHub = new ConnectionEventsHub();
@ -422,22 +419,17 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
@Override
public RedisClient createClient(NodeType type, URI address, String sslHostname) {
RedisClient client = createClient(type, address, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname);
clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type));
return client;
}
@Override
public RedisClient createClient(NodeType type, InetSocketAddress address, URI uri, String sslHostname) {
RedisClient client = createClient(type, address, uri, config.getConnectTimeout(), config.getRetryInterval() * config.getRetryAttempts(), sslHostname);
clientEntries.put(client, new RedisClientEntry(client, commandExecutor, type));
return client;
}
@Override
public void shutdownAsync(RedisClient client) {
if (clientEntries.remove(client) == null) {
log.error("Can't find client {}", client);
}
client.shutdownAsync();
}
@ -664,11 +656,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return group.isTerminated();
}
@Override
public Collection<RedisClientEntry> getClients() {
return Collections.unmodifiableCollection(clientEntries.values());
}
@Override
public EventLoopGroup getGroup() {
return group;

@ -17,6 +17,7 @@ package org.redisson.connection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
@ -29,12 +30,9 @@ import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterConnectionManager;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig;
@ -48,7 +46,6 @@ import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.TransferListener;
import org.redisson.misc.URIBuilder;
import org.redisson.pubsub.PublishSubscribeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -348,6 +345,16 @@ public class MasterSlaveEntry {
return addSlave(client, freezed, nodeType);
}
public Collection<ClientConnectionsEntry> getSlaveEntries() {
List<ClientConnectionsEntry> result = new ArrayList<ClientConnectionsEntry>();
for (ClientConnectionsEntry slaveEntry : slaveBalancer.getEntries()) {
if (slaveEntry.getNodeType() == NodeType.SLAVE) {
result.add(slaveEntry);
}
}
return result;
}
public RedisClient getClient() {
return masterEntry.getClient();
}

@ -17,6 +17,8 @@ package org.redisson.connection.balancer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.redisson.api.NodeType;
@ -98,6 +100,10 @@ public class LoadBalancerManager {
return result;
}
public Collection<ClientConnectionsEntry> getEntries() {
return Collections.unmodifiableCollection(client2Entry.values());
}
public int getAvailableClients() {
int count = 0;
for (ClientConnectionsEntry connectionEntry : client2Entry.values()) {
@ -154,12 +160,6 @@ public class LoadBalancerManager {
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(RedisClient redisClient, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(redisClient);
return freeze(connectionEntry, freezeReason);
}
public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) {
return null;

Loading…
Cancel
Save