refactoring

pull/2979/head
Nikita Koksharov 5 years ago
parent cd38c340f4
commit 14bcc02243

@ -202,12 +202,18 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
return null;
}
protected void removeClient(RedisClient client) {
client2entry.remove(client);
}
protected void addClient(MasterSlaveEntry entry) {
client2entry.put(entry.getClient(), entry);
@Override
protected RFuture<RedisClient> changeMaster(int slot, RedisURI address) {
MasterSlaveEntry entry = getEntry(slot);
RedisClient oldClient = entry.getClient();
RFuture<RedisClient> future = super.changeMaster(slot, address);
future.onComplete((res, e) -> {
if (e == null) {
client2entry.remove(oldClient);
client2entry.put(entry.getClient(), entry);
}
});
return future;
}
@Override
@ -229,6 +235,22 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
shutdownEntry(entry);
}
private void shutdownEntry(MasterSlaveEntry entry) {
if (entry != null && entry.decReference() == 0) {
client2entry.remove(entry.getClient());
entry.getAllEntries().forEach(e -> entry.nodeDown(e));
entry.masterDown();
entry.shutdownAsync();
subscribeService.remove(entry);
String slaves = entry.getAllEntries().stream()
.filter(e -> !e.getClient().getAddr().equals(entry.getClient().getAddr()))
.map(e -> e.getClient().toString())
.collect(Collectors.joining(","));
log.info("{} master and related slaves: {} removed", entry.getClient().getAddr(), slaves);
}
}
@Override
protected RedisClientConfig createRedisConfig(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout, sslHostname);

@ -147,7 +147,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService(this);
private PublishSubscribeService subscribeService;
protected PublishSubscribeService subscribeService;
private final Map<Object, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
@ -521,41 +521,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return masterSlaveEntry;
}
protected void removeClient(RedisClient client) {
}
protected void addClient(MasterSlaveEntry entry) {
}
protected final RFuture<RedisClient> changeMaster(int slot, RedisURI address) {
final MasterSlaveEntry entry = getEntry(slot);
final RedisClient oldClient = entry.getClient();
RFuture<RedisClient> future = entry.changeMaster(address);
future.onComplete((res, e) -> {
if (e == null) {
removeClient(oldClient);
addClient(entry);
}
});
return future;
protected RFuture<RedisClient> changeMaster(int slot, RedisURI address) {
MasterSlaveEntry entry = getEntry(slot);
return entry.changeMaster(address);
}
protected final void shutdownEntry(MasterSlaveEntry entry) {
if (entry != null && entry.decReference() == 0) {
removeClient(entry.getClient());
entry.getAllEntries().forEach(e -> entry.nodeDown(e));
entry.masterDown();
entry.shutdownAsync();
subscribeService.remove(entry);
String slaves = entry.getAllEntries().stream()
.filter(e -> !e.getClient().getAddr().equals(entry.getClient().getAddr()))
.map(e -> e.getClient().toString())
.collect(Collectors.joining(","));
log.info("{} master and related slaves: {} removed", entry.getClient().getAddr(), slaves);
}
}
@Override
public RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry entry = getEntry(source);

@ -480,7 +480,9 @@ public class MasterSlaveEntry {
RPromise<Void> result = new RedissonPromise<Void>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2);
masterEntry.getClient().shutdownAsync().onComplete(listener);
if (masterEntry != null) {
masterEntry.getClient().shutdownAsync().onComplete(listener);
}
slaveBalancer.shutdownAsync().onComplete(listener);
return result;
}

Loading…
Cancel
Save