|
|
@ -703,24 +703,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
if (entry == null) {
|
|
|
|
if (entry == null) {
|
|
|
|
entry = getEntry(source);
|
|
|
|
entry = getEntry(source);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
|
|
|
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
|
|
|
|
|
|
|
|
return RedissonPromise.newFailedFuture(ex);
|
|
|
|
|
|
|
|
}
|
|
|
|
return entry.connectionWriteOp(command);
|
|
|
|
return entry.connectionWriteOp(command);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private MasterSlaveEntry getEntry(NodeSource source) {
|
|
|
|
private MasterSlaveEntry getEntry(NodeSource source) {
|
|
|
|
// workaround for slots in migration state
|
|
|
|
// slots handling during migration state
|
|
|
|
if (source.getRedirect() != null) {
|
|
|
|
if (source.getRedirect() != null) {
|
|
|
|
MasterSlaveEntry e = getEntry(source.getAddr());
|
|
|
|
return getEntry(source.getAddr());
|
|
|
|
if (e == null) {
|
|
|
|
|
|
|
|
throw new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return e;
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
MasterSlaveEntry e = getEntry(source.getSlot());
|
|
|
|
return getEntry(source.getSlot());
|
|
|
|
if (e == null) {
|
|
|
|
|
|
|
|
throw new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return e;
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
@ -739,8 +735,20 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
|
|
|
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
|
|
|
|
|
|
|
|
return RedissonPromise.newFailedFuture(ex);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return entry.connectionReadOp(command, source.getAddr());
|
|
|
|
return entry.connectionReadOp(command, source.getAddr());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
|
|
|
RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
|
|
|
|
|
|
|
|
return RedissonPromise.newFailedFuture(ex);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return entry.connectionReadOp(command);
|
|
|
|
return entry.connectionReadOp(command);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -758,8 +766,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
if (entry == null) {
|
|
|
|
if (entry == null) {
|
|
|
|
entry = getEntry(source);
|
|
|
|
entry = getEntry(source);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
|
|
|
log.error("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
|
|
|
|
|
|
|
|
} else {
|
|
|
|
entry.releaseWrite(connection);
|
|
|
|
entry.releaseWrite(connection);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void releaseRead(NodeSource source, RedisConnection connection) {
|
|
|
|
public void releaseRead(NodeSource source, RedisConnection connection) {
|
|
|
@ -767,9 +779,14 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
|
|
|
|
if (entry == null) {
|
|
|
|
if (entry == null) {
|
|
|
|
entry = getEntry(source);
|
|
|
|
entry = getEntry(source);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (entry == null) {
|
|
|
|
|
|
|
|
log.error("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
|
|
|
|
|
|
|
|
} else {
|
|
|
|
entry.releaseRead(connection);
|
|
|
|
entry.releaseRead(connection);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void shutdown() {
|
|
|
|
public void shutdown() {
|
|
|
|
shutdown(2, 15, TimeUnit.SECONDS);//default netty value
|
|
|
|
shutdown(2, 15, TimeUnit.SECONDS);//default netty value
|
|
|
|