Master node failover handling improvements

pull/1705/head
Nikita 7 years ago
parent 952d96fd3c
commit 6ffb79766b

@ -78,7 +78,7 @@ public class CommandsQueue extends ChannelDuplexHandler {
}
command.getChannelPromise().tryFailure(
new WriteRedisConnectionException("Can't write command: " + command.getCommand() + " to channel: " + ctx.channel()));
new WriteRedisConnectionException("Channel has been closed! Can't write command: " + command.getCommand() + " to channel: " + ctx.channel()));
}
super.channelInactive(ctx);

@ -507,7 +507,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
Set<ClusterPartition> lastPartitions = getLastPartitions();
for (final ClusterPartition newPart : newPartitions) {
boolean masterFound = false;
for (ClusterPartition currentPart : lastPartitions) {
for (final ClusterPartition currentPart : lastPartitions) {
if (!newPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
continue;
}
@ -521,11 +521,19 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
// does partition has a new master?
if (!newMasterPart.getMasterAddress().equals(currentPart.getMasterAddress())) {
URI newUri = newMasterPart.getMasterAddress();
URI oldUri = currentPart.getMasterAddress();
final URI oldUri = currentPart.getMasterAddress();
changeMaster(slot, newUri);
RFuture<RedisClient> future = changeMaster(slot, newUri);
future.addListener(new FutureListener<RedisClient>() {
@Override
public void operationComplete(Future<RedisClient> future) throws Exception {
if (!future.isSuccess()) {
currentPart.setMasterAddress(oldUri);
}
}
});
currentPart.setMasterAddress(newMasterPart.getMasterAddress());
currentPart.setMasterAddress(newUri);
}
}
break;

@ -504,10 +504,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return slot2entry.get(slot);
}
protected final void changeMaster(int slot, URI address) {
protected final RFuture<RedisClient> changeMaster(int slot, URI address) {
final MasterSlaveEntry entry = getEntry(slot);
final RedisClient oldClient = entry.getClient();
entry.changeMaster(address).addListener(new FutureListener<RedisClient>() {
RFuture<RedisClient> future = entry.changeMaster(address);
future.addListener(new FutureListener<RedisClient>() {
@Override
public void operationComplete(Future<RedisClient> future) throws Exception {
if (future.isSuccess()) {
@ -516,6 +517,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
}
});
return future;
}
protected final void addEntry(Integer slot, MasterSlaveEntry entry) {

@ -422,7 +422,7 @@ public class MasterSlaveEntry {
@Override
public void operationComplete(Future<RedisClient> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't change master to: {}", address);
log.error("Unable to change master from: " + oldMaster.getClient().getAddr() + " to: " + address, future.cause());
return;
}

Loading…
Cancel
Save