Failover handling improvements

pull/1705/head
Nikita 7 years ago
parent 9d508ac3a3
commit d160fddd15

@ -506,11 +506,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected final void changeMaster(int slot, URI address) { protected final void changeMaster(int slot, URI address) {
final MasterSlaveEntry entry = getEntry(slot); final MasterSlaveEntry entry = getEntry(slot);
client2entry.remove(entry.getClient()); final RedisClient oldClient = entry.getClient();
entry.changeMaster(address).addListener(new FutureListener<RedisClient>() { entry.changeMaster(address).addListener(new FutureListener<RedisClient>() {
@Override @Override
public void operationComplete(Future<RedisClient> future) throws Exception { public void operationComplete(Future<RedisClient> future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
client2entry.remove(oldClient);
client2entry.put(entry.getClient(), entry); client2entry.put(entry.getClient(), entry);
} }
} }

@ -127,18 +127,19 @@ public class MasterSlaveEntry {
@Override @Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception { public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
client.shutdownAsync();
result.tryFailure(future.cause()); result.tryFailure(future.cause());
return; return;
} }
masterEntry = new ClientConnectionsEntry( masterEntry = new ClientConnectionsEntry(
client, client,
config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(), config.getMasterConnectionPoolSize(),
config.getSubscriptionConnectionMinimumIdleSize(), config.getSubscriptionConnectionMinimumIdleSize(),
config.getSubscriptionConnectionPoolSize(), config.getSubscriptionConnectionPoolSize(),
connectionManager, connectionManager,
NodeType.MASTER); NodeType.MASTER);
int counter = 1; int counter = 1;
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
@ -146,13 +147,13 @@ public class MasterSlaveEntry {
} }
CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client, counter); CountableListener<RedisClient> listener = new CountableListener<RedisClient>(result, client, counter);
RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry); RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);
writeFuture.addListener(listener); writeFuture.addListener(listener);
if (config.getSubscriptionMode() == SubscriptionMode.MASTER) { if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {
RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry); RFuture<Void> pubSubFuture = pubSubConnectionPool.add(masterEntry);
pubSubFuture.addListener(listener); pubSubFuture.addListener(listener);
} }
} }
}); });
@ -304,6 +305,14 @@ public class MasterSlaveEntry {
} }
} }
RFuture<Void> addFuture = slaveBalancer.add(entry); RFuture<Void> addFuture = slaveBalancer.add(entry);
addFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
client.shutdownAsync();
}
}
});
addFuture.addListener(new TransferListener<Void>(result)); addFuture.addListener(new TransferListener<Void>(result));
} }
}); });

@ -73,7 +73,9 @@ abstract class ConnectionPool<T extends RedisConnection> {
promise.addListener(new FutureListener<Void>() { promise.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
entries.add(entry); if (future.isSuccess()) {
entries.add(entry);
}
} }
}); });
initConnections(entry, promise, true); initConnections(entry, promise, true);
@ -119,13 +121,34 @@ abstract class ConnectionPool<T extends RedisConnection> {
public void operationComplete(Future<T> future) throws Exception { public void operationComplete(Future<T> future) throws Exception {
if (future.isSuccess()) { if (future.isSuccess()) {
T conn = future.getNow(); T conn = future.getNow();
if (!initPromise.isDone()) {
releaseConnection(entry, conn); releaseConnection(entry, conn);
} else {
conn.closeAsync();
}
} }
releaseConnection(entry); releaseConnection(entry);
if (!future.isSuccess()) { if (!future.isSuccess()) {
if (initPromise.isDone()) {
return;
}
for (RedisConnection connection : entry.getAllConnections()) {
if (!connection.isClosed()) {
connection.closeAsync();
}
}
entry.getAllConnections().clear();
for (RedisConnection connection : entry.getAllSubscribeConnections()) {
if (!connection.isClosed()) {
connection.closeAsync();
}
}
entry.getAllSubscribeConnections().clear();
int totalInitializedConnections = minimumIdleSize - initializedConnections.get(); int totalInitializedConnections = minimumIdleSize - initializedConnections.get();
String errorMsg; String errorMsg;
if (totalInitializedConnections == 0) { if (totalInitializedConnections == 0) {
@ -141,9 +164,8 @@ abstract class ConnectionPool<T extends RedisConnection> {
int value = initializedConnections.decrementAndGet(); int value = initializedConnections.decrementAndGet();
if (value == 0) { if (value == 0) {
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); if (initPromise.trySuccess(null)) {
if (!initPromise.trySuccess(null)) { log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
throw new IllegalStateException();
} }
} else if (value > 0 && !initPromise.isDone()) { } else if (value > 0 && !initPromise.isDone()) {
if (requests.incrementAndGet() <= minimumIdleSize) { if (requests.incrementAndGet() <= minimumIdleSize) {

Loading…
Cancel
Save