refactoring

pull/3455/head
Nikita Koksharov 4 years ago
parent 0db60f7542
commit 7938d255b5

@ -361,56 +361,53 @@ abstract class ConnectionPool<T extends RedisConnection> {
connectionManager.getConnectionEventsHub().fireDisconnect(entry.getClient().getAddr()); connectionManager.getConnectionEventsHub().fireDisconnect(entry.getClient().getAddr());
connectionManager.newTimeout(new TimerTask() { connectionManager.newTimeout(timeout -> {
@Override synchronized (entry) {
public void run(Timeout timeout) throws Exception { if (entry.getFreezeReason() != FreezeReason.RECONNECT
synchronized (entry) { || connectionManager.isShuttingDown()) {
if (entry.getFreezeReason() != FreezeReason.RECONNECT return;
|| connectionManager.isShuttingDown()) {
return;
}
} }
}
RFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync(); RFuture<RedisConnection> connectionFuture = entry.getClient().connectAsync();
connectionFuture.onComplete((c, e) -> { connectionFuture.onComplete((c, e) -> {
synchronized (entry) { synchronized (entry) {
if (entry.getFreezeReason() != FreezeReason.RECONNECT) { if (entry.getFreezeReason() != FreezeReason.RECONNECT) {
return;
}
}
if (e != null) {
scheduleCheck(entry);
return;
}
if (!c.isActive()) {
c.closeAsync();
scheduleCheck(entry);
return; return;
} }
}
if (e != null) {
scheduleCheck(entry);
return;
}
if (!c.isActive()) {
c.closeAsync();
scheduleCheck(entry);
return;
}
RFuture<String> f = c.async(RedisCommands.PING); RFuture<String> f = c.async(RedisCommands.PING);
f.onComplete((t, ex) -> { f.onComplete((t, ex) -> {
try { try {
synchronized (entry) { synchronized (entry) {
if (entry.getFreezeReason() != FreezeReason.RECONNECT) { if (entry.getFreezeReason() != FreezeReason.RECONNECT) {
return; return;
}
} }
}
if ("PONG".equals(t)) { if ("PONG".equals(t)) {
if (masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT)) { if (masterSlaveEntry.slaveUp(entry, FreezeReason.RECONNECT)) {
log.info("slave {} has been successfully reconnected", entry.getClient().getAddr()); log.info("slave {} has been successfully reconnected", entry.getClient().getAddr());
}
} else {
scheduleCheck(entry);
} }
} finally { } else {
c.closeAsync(); scheduleCheck(entry);
} }
}); } finally {
}); c.closeAsync();
} }
});
});
}, config.getFailedSlaveReconnectionInterval(), TimeUnit.MILLISECONDS); }, config.getFailedSlaveReconnectionInterval(), TimeUnit.MILLISECONDS);
} }

Loading…
Cancel
Save