refactoring

pull/456/head
Nikita 9 years ago
parent 6adbd9387f
commit a60d3f491e

@ -346,10 +346,11 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses()); failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) { for (URI uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri); currentPart.addFailedSlaveAddress(uri);
slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER); if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }
}
private void addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) { private void addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPartition currentPart, final ClusterPartition newPart) {
Set<URI> removedSlaves = new HashSet<URI>(currentPart.getSlaveAddresses()); Set<URI> removedSlaves = new HashSet<URI>(currentPart.getSlaveAddresses());
@ -358,9 +359,10 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (URI uri : removedSlaves) { for (URI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri); currentPart.removeSlaveAddress(uri);
slaveDown(entry, uri.getHost(), uri.getPort(), FreezeReason.MANAGER); if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) {
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
}
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses()); Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses()); addedSlaves.removeAll(currentPart.getSlaveAddresses());

@ -23,11 +23,11 @@ import java.util.concurrent.TimeUnit;
import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.core.NodeType; import org.redisson.core.NodeType;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
@ -44,6 +44,8 @@ import io.netty.util.concurrent.Promise;
*/ */
public interface ConnectionManager { public interface ConnectionManager {
void reattachPubSub(Collection<RedisPubSubConnection> allPubSubConnections);
boolean isClusterMode(); boolean isClusterMode();
<R> Future<R> newSucceededFuture(R value); <R> Future<R> newSucceededFuture(R value);
@ -62,8 +64,6 @@ public interface ConnectionManager {
<R> Future<R> newFailedFuture(Throwable cause); <R> Future<R> newFailedFuture(Throwable cause);
void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason);
Collection<RedisClientEntry> getClients(); Collection<RedisClientEntry> getClients();
void shutdownAsync(RedisClient client); void shutdownAsync(RedisClient client);

@ -542,13 +542,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return null; return null;
} }
public void slaveDown(MasterSlaveEntry entry, String host, int port, FreezeReason freezeReason) { @Override
Collection<RedisPubSubConnection> allPubSubConnections = entry.slaveDown(host, port, freezeReason); public void reattachPubSub(Collection<RedisPubSubConnection> allPubSubConnections) {
if (allPubSubConnections.isEmpty()) {
return;
}
// reattach listeners to other channels
for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) { for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) { for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) {
PubSubConnectionEntry pubSubEntry = mapEntry.getValue(); PubSubConnectionEntry pubSubEntry = mapEntry.getValue();
@ -620,8 +615,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} }
protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) { protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
MasterSlaveEntry entry = getEntry(slotRange); getEntry(slotRange).slaveDown(host, port, freezeReason);
slaveDown(entry, host, port, freezeReason);
} }
protected void changeMaster(ClusterSlotRange slotRange, String host, int port) { protected void changeMaster(ClusterSlotRange slotRange, String host, int port) {

@ -90,8 +90,10 @@ public class MasterSlaveEntry {
return writeConnectionHolder.add(masterEntry); return writeConnectionHolder.add(masterEntry);
} }
public Collection<RedisPubSubConnection> slaveDown(String host, int port, FreezeReason freezeReason) { public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
Collection<RedisPubSubConnection> conns = slaveBalancer.freeze(host, port, freezeReason); if (!slaveBalancer.freeze(host, port, freezeReason)) {
return false;
}
// add master as slave if no more slaves available // add master as slave if no more slaves available
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) {
@ -100,7 +102,7 @@ public class MasterSlaveEntry {
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort());
} }
} }
return conns; return true;
} }
public Future<Void> addSlave(String host, int port) { public Future<Void> addSlave(String host, int port) {
@ -134,7 +136,7 @@ public class MasterSlaveEntry {
// exclude master from slaves // exclude master from slaves
if (config.getReadMode() == ReadMode.SLAVE if (config.getReadMode() == ReadMode.SLAVE
&& (!addr.getHostName().equals(host) || port != addr.getPort())) { && (!addr.getHostName().equals(host) || port != addr.getPort())) {
connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); slaveDown(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort()); log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort());
} }
return true; return true;
@ -155,7 +157,7 @@ public class MasterSlaveEntry {
// more than one slave available, so master can be removed from slaves // more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE if (config.getReadMode() == ReadMode.SLAVE
&& slaveBalancer.getAvailableClients() > 1) { && slaveBalancer.getAvailableClients() > 1) {
connectionManager.slaveDown(this, host, port, FreezeReason.SYSTEM); slaveDown(host, port, FreezeReason.SYSTEM);
} }
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
} }

@ -16,7 +16,6 @@
package org.redisson.connection.balancer; package org.redisson.connection.balancer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collection;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
@ -37,7 +36,7 @@ public interface LoadBalancerManager {
boolean unfreeze(String host, int port, FreezeReason freezeReason); boolean unfreeze(String host, int port, FreezeReason freezeReason);
Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason); boolean freeze(String host, int port, FreezeReason freezeReason);
Future<Void> add(ClientConnectionsEntry entry); Future<Void> add(ClientConnectionsEntry entry);

@ -16,10 +16,6 @@
package org.redisson.connection.balancer; package org.redisson.connection.balancer;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
@ -99,16 +95,20 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
return false; return false;
} }
public Collection<RedisPubSubConnection> freeze(String host, int port, FreezeReason freezeReason) { public boolean freeze(String host, int port, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port); InetSocketAddress addr = new InetSocketAddress(host, port);
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); ClientConnectionsEntry connectionEntry = addr2Entry.get(addr);
if (connectionEntry == null) { if (connectionEntry == null) {
return Collections.emptyList(); return false;
} }
synchronized (connectionEntry) { synchronized (connectionEntry) {
log.debug("{} freezed", addr); if (connectionEntry.isFreezed()) {
return false;
}
connectionEntry.setFreezed(true); connectionEntry.setFreezed(true);
// only RECONNECT freeze reason could be replaced // only RECONNECT freeze reason could be replaced
if (connectionEntry.getFreezeReason() == null if (connectionEntry.getFreezeReason() == null
|| connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) { || connectionEntry.getFreezeReason() == FreezeReason.RECONNECT) {
@ -134,11 +134,9 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {
connection.closeAsync(); connection.closeAsync();
} }
synchronized (connectionEntry) { connectionManager.reattachPubSub(connectionEntry.getAllSubscribeConnections());
List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections());
connectionEntry.getAllSubscribeConnections().clear(); connectionEntry.getAllSubscribeConnections().clear();
return list; return true;
}
} }
public Future<RedisPubSubConnection> nextPubSubConnection() { public Future<RedisPubSubConnection> nextPubSubConnection() {

@ -277,7 +277,7 @@ abstract class ConnectionPool<T extends RedisConnection> {
private void checkForReconnect(ClientConnectionsEntry entry) { private void checkForReconnect(ClientConnectionsEntry entry) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts()); log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
scheduleCheck(entry); scheduleCheck(entry);

Loading…
Cancel
Save