@ -13,14 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License .
* /
package org.redisson.connection ;
import io.netty.util.concurrent.GlobalEventExecutor ;
import io.netty.util.concurrent.ScheduledFuture ;
package org.redisson.cluster ;
import java.net.InetSocketAddress ;
import java.net.URI ;
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.List ;
@ -36,17 +34,23 @@ import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection ;
import org.redisson.client.RedisConnectionException ;
import org.redisson.client.protocol.RedisCommands ;
import org.redisson.connection.ClusterNodeInfo.Flag ;
import org.redisson.cluster.ClusterNodeInfo.Flag ;
import org.redisson.connection.MasterSlaveConnectionManager ;
import org.redisson.connection.MasterSlaveEntry ;
import org.redisson.connection.SingleEntry ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import io.netty.util.concurrent.GlobalEventExecutor ;
import io.netty.util.concurrent.ScheduledFuture ;
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory . getLogger ( getClass ( ) ) ;
private final Map < URI , RedisConnection > nodeConnections = new HashMap < URI , RedisConnection > ( ) ;
private final Map < Integer , ClusterPartition > lastPartitions = new HashMap < Integer , ClusterPartition > ( ) ;
private final Map < ClusterSlotRange , ClusterPartition > lastPartitions = new HashMap < ClusterSlotRange , ClusterPartition > ( ) ;
private ScheduledFuture < ? > monitorFuture ;
@ -64,8 +68,8 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
String nodesValue = connection . sync ( RedisCommands . CLUSTER_NODES ) ;
Map< Integer , ClusterPartition > partitions = parsePartitions ( nodesValue ) ;
for ( ClusterPartition partition : partitions .values ( ) ) {
Collection< ClusterPartition > partitions = parsePartitions ( nodesValue ) ;
for ( ClusterPartition partition : partitions ) {
addMasterEntry ( partition , cfg ) ;
}
@ -98,7 +102,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private void addMasterEntry ( ClusterPartition partition , ClusterServersConfig cfg ) {
if ( partition . isMasterFail ( ) ) {
log . warn ( " master: {} for slot range: {}-{} add failed. Reason - server has FAIL flag", partition . getMasterAddress ( ) , partition . getS tartSlot( ) , partition . getEndSlot ( ) ) ;
log . warn ( " add master: {} for slot ranges : {} failed. Reason - server has FAIL flag", partition . getMasterAddress ( ) , partition . getS lotRanges ( ) ) ;
return ;
}
@ -108,18 +112,20 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
}
Map < String , String > params = connection . sync ( RedisCommands . CLUSTER_INFO ) ;
if ( "fail" . equals ( params . get ( "cluster_state" ) ) ) {
log . warn ( " master: {} for slot range: {}-{} add failed. Reason - cluster_state:fail", partition . getMasterAddress ( ) , partition . getS tartSlot( ) , partition . getEndSlot ( ) ) ;
log . warn ( " add master: {} for slot ranges : {} failed. Reason - cluster_state:fail", partition . getMasterAddress ( ) , partition . getS lotRanges ( ) ) ;
return ;
}
MasterSlaveServersConfig config = create ( cfg ) ;
log . info ( " master: {} for slot range: {}-{} added ", partition . getMasterAddress ( ) , partition . getS tartSlot( ) , partition . getEndSlot ( ) ) ;
log . info ( " added master: {} for slot ranges : {}", partition . getMasterAddress ( ) , partition . getS lotRanges ( ) ) ;
config . setMasterAddress ( partition . getMasterAddress ( ) ) ;
SingleEntry entry = new SingleEntry ( partition . getS tartSlot( ) , partition . getEndSlot ( ) , this , config ) ;
SingleEntry entry = new SingleEntry ( partition . getS lotRanges ( ) , this , config ) ;
entry . setupMasterEntry ( config . getMasterAddress ( ) . getHost ( ) , config . getMasterAddress ( ) . getPort ( ) ) ;
entries . put ( partition . getEndSlot ( ) , entry ) ;
lastPartitions . put ( partition . getEndSlot ( ) , partition ) ;
for ( ClusterSlotRange slotRange : partition . getSlotRanges ( ) ) {
addMaster ( slotRange , entry ) ;
lastPartitions . put ( slotRange , partition ) ;
}
}
private void monitorClusterChange ( final ClusterServersConfig cfg ) {
@ -129,115 +135,141 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
try {
for ( URI addr : cfg . getNodeAddresses ( ) ) {
RedisConnection connection = connect ( cfg , addr ) ;
String nodesValue = connection . sync ( RedisCommands . CLUSTER_NODES ) ;
log . debug ( "cluster nodes state: {}" , nodesValue ) ;
Map < Integer , ClusterPartition > partitions = parsePartitions ( nodesValue ) ;
for ( ClusterPartition newPart : partitions . values ( ) ) {
for ( ClusterPartition part : lastPartitions . values ( ) ) {
if ( newPart . getMasterAddress ( ) . equals ( part . getMasterAddress ( ) ) ) {
if ( connection = = null ) {
continue ;
}
log . debug ( "found endslot {} for {} fail {}" , part . getEndSlot ( ) , part . getMasterAddress ( ) , newPart . isMasterFail ( ) ) ;
String nodesValue = connection . sync ( RedisCommands . CLUSTER_NODES ) ;
if ( newPart . isMasterFail ( ) ) {
ClusterPartition newMasterPart = partitions . get ( part . getEndSlot ( ) ) ;
if ( ! newMasterPart . getMasterAddress ( ) . equals ( part . getMasterAddress ( ) ) ) {
log . info ( "changing master from {} to {} for {}" ,
part . getMasterAddress ( ) , newMasterPart . getMasterAddress ( ) , newMasterPart . getEndSlot ( ) ) ;
URI newUri = newMasterPart . getMasterAddress ( ) ;
URI oldUri = part . getMasterAddress ( ) ;
log . debug ( "cluster nodes state: {}" , nodesValue ) ;
changeMaster ( newMasterPart . getEndSlot ( ) , newUri . getHost ( ) , newUri . getPort ( ) ) ;
slaveDown ( newMasterPart . getEndSlot ( ) , oldUri . getHost ( ) , oldUri . getPort ( ) ) ;
Collection < ClusterPartition > newPartitions = parsePartitions ( nodesValue ) ;
checkMasterNodesChange ( newPartitions ) ;
checkSlotsChange ( cfg , newPartitions ) ;
part . setMasterAddress ( newMasterPart . getMasterAddress ( ) ) ;
}
}
break ;
}
}
}
checkSlotsChange ( cfg , partitions ) ;
break ;
break ;
}
} catch ( Exception e ) {
log . error ( e . getMessage ( ) , e ) ;
}
}
} , cfg . getScanInterval ( ) , cfg . getScanInterval ( ) , TimeUnit . MILLISECONDS ) ;
}
private void checkSlotsChange ( ClusterServersConfig cfg , Map < Integer , ClusterPartition > partitions ) {
Set < Integer > removeSlots = new HashSet < Integer > ( lastPartitions . keySet ( ) ) ;
removeSlots . removeAll ( partitions . keySet ( ) ) ;
lastPartitions . keySet ( ) . removeAll ( removeSlots ) ;
if ( ! removeSlots . isEmpty ( ) ) {
log . info ( "{} slots found to remove" , removeSlots . size ( ) ) ;
private Collection < ClusterSlotRange > slots ( Collection < ClusterPartition > partitions ) {
List < ClusterSlotRange > result = new ArrayList < ClusterSlotRange > ( ) ;
for ( ClusterPartition clusterPartition : partitions ) {
result . addAll ( clusterPartition . getSlotRanges ( ) ) ;
}
return result ;
}
Map < Integer , MasterSlaveEntry > removeAddrs = new HashMap < Integer , MasterSlaveEntry > ( ) ;
for ( Integer slot : removeSlots ) {
private ClusterPartition find ( Collection < ClusterPartition > partitions , ClusterSlotRange slotRange ) {
for ( ClusterPartition clusterPartition : partitions ) {
if ( clusterPartition . getSlotRanges ( ) . contains ( slotRange ) ) {
return clusterPartition ;
}
}
return null ;
}
private void checkMasterNodesChange ( Collection < ClusterPartition > newPartitions ) {
for ( ClusterPartition newPart : newPartitions ) {
for ( ClusterPartition currentPart : lastPartitions . values ( ) ) {
if ( ! newPart . getMasterAddress ( ) . equals ( currentPart . getMasterAddress ( ) ) ) {
continue ;
}
// current master marked as failed
if ( newPart . isMasterFail ( ) ) {
for ( ClusterSlotRange currentSlotRange : currentPart . getSlotRanges ( ) ) {
ClusterPartition newMasterPart = find ( newPartitions , currentSlotRange ) ;
// does partition has a new master?
if ( ! newMasterPart . getMasterAddress ( ) . equals ( currentPart . getMasterAddress ( ) ) ) {
log . info ( "changing master from {} to {} for {}" ,
currentPart . getMasterAddress ( ) , newMasterPart . getMasterAddress ( ) , currentSlotRange ) ;
URI newUri = newMasterPart . getMasterAddress ( ) ;
URI oldUri = currentPart . getMasterAddress ( ) ;
changeMaster ( currentSlotRange , newUri . getHost ( ) , newUri . getPort ( ) ) ;
slaveDown ( currentSlotRange , oldUri . getHost ( ) , oldUri . getPort ( ) ) ;
currentPart . setMasterAddress ( newMasterPart . getMasterAddress ( ) ) ;
}
}
}
break ;
}
}
}
private void checkSlotsChange ( ClusterServersConfig cfg , Collection < ClusterPartition > partitions ) {
Collection < ClusterSlotRange > partitionsSlots = slots ( partitions ) ;
Set < ClusterSlotRange > removedSlots = new HashSet < ClusterSlotRange > ( lastPartitions . keySet ( ) ) ;
removedSlots . removeAll ( partitionsSlots ) ;
lastPartitions . keySet ( ) . removeAll ( removedSlots ) ;
if ( ! removedSlots . isEmpty ( ) ) {
log . info ( "{} slot ranges found to remove" , removedSlots . size ( ) ) ;
}
Map < ClusterSlotRange , MasterSlaveEntry > removeAddrs = new HashMap < ClusterSlotRange , MasterSlaveEntry > ( ) ;
for ( ClusterSlotRange slot : removedSlots ) {
MasterSlaveEntry entry = removeMaster ( slot ) ;
entry . shutdownMasterAsync ( ) ;
removeAddrs . put ( slot , entry ) ;
}
Set < Integer > addSlots = new HashSet < Integer > ( partitions . keySet ( ) ) ;
addSlots . removeAll ( lastPartitions . keySet ( ) ) ;
if ( ! addSlots . isEmpty ( ) ) {
log . info ( "{} slots found to add" , addSlots . size ( ) ) ;
Set < ClusterSlotRange> addedSlots = new HashSet < ClusterSlotRange > ( partitionsSlots ) ;
add ed Slots. removeAll ( lastPartitions . keySet ( ) ) ;
if ( ! add ed Slots. isEmpty ( ) ) {
log . info ( "{} slots found to add" , add ed Slots. size ( ) ) ;
}
for ( Integer slot : addSlots ) {
ClusterPartition partition = partitions . get ( slot ) ;
for ( ClusterSlotRange slot : adde dSlots) {
ClusterPartition partition = find( partitions , slot ) ;
addMasterEntry ( partition , cfg ) ;
}
for ( Entry < Integer , MasterSlaveEntry > entry : removeAddrs . entrySet ( ) ) {
for ( Entry < ClusterSlotRange , MasterSlaveEntry > entry : removeAddrs . entrySet ( ) ) {
InetSocketAddress url = entry . getValue ( ) . getClient ( ) . getAddr ( ) ;
slaveDown ( entry . getKey ( ) , url . getHostName ( ) , url . getPort ( ) ) ;
}
}
private Map < Integer , ClusterPartition > parsePartitions ( String nodesValue ) {
private Collection< ClusterPartition > parsePartitions ( String nodesValue ) {
Map < String , ClusterPartition > partitions = new HashMap < String , ClusterPartition > ( ) ;
Map < Integer , ClusterPartition > result = new HashMap < Integer , ClusterPartition > ( ) ;
List < ClusterNodeInfo > nodes = parse ( nodesValue ) ;
for ( ClusterNodeInfo clusterNodeInfo : nodes ) {
if ( clusterNodeInfo . getFlags ( ) . contains ( Flag . NOADDR ) ) {
if ( clusterNodeInfo . containsFlag ( Flag . NOADDR ) ) {
// skip it
continue ;
}
String id = clusterNodeInfo . getNodeId ( ) ;
if ( clusterNodeInfo . getFlags ( ) . contains ( Flag . SLAVE ) ) {
if ( clusterNodeInfo . containsFlag ( Flag . SLAVE ) ) {
id = clusterNodeInfo . getSlaveOf ( ) ;
}
ClusterPartition partition = partitions . get ( id ) ;
if ( partition = = null ) {
partition = new ClusterPartition ( ) ;
partitions . put ( id , partition ) ;
}
if ( clusterNodeInfo . getFlags ( ) . contains ( Flag . FAIL ) ) {
if ( clusterNodeInfo . containsFlag ( Flag . FAIL ) ) {
partition . setMasterFail ( true ) ;
}
if ( clusterNodeInfo . getFlags ( ) . contains ( Flag . SLAVE ) ) {
if ( clusterNodeInfo . containsFlag ( Flag . SLAVE ) ) {
partition . addSlaveAddress ( clusterNodeInfo . getAddress ( ) ) ;
} else {
partition . setStartSlot ( clusterNodeInfo . getStartSlot ( ) ) ;
partition . setEndSlot ( clusterNodeInfo . getEndSlot ( ) ) ;
result . put ( clusterNodeInfo . getEndSlot ( ) , partition ) ;
partition . addSlotRanges ( clusterNodeInfo . getSlotRanges ( ) ) ;
partition . setMasterAddress ( clusterNodeInfo . getAddress ( ) ) ;
}
}
return result ;
return partitions. values ( ) ;
}
private MasterSlaveServersConfig create ( ClusterServersConfig cfg ) {
@ -285,15 +317,15 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for ( int i = 0 ; i < params . length - 8 ; i + + ) {
String slots = params [ i + 8 ] ;
String [ ] parts = slots . split ( "-" ) ;
node = new ClusterNodeInfo ( node ) ;
node . setStartSlot ( Integer . valueOf ( parts [ 0 ] ) ) ;
node . setEndSlot ( Integer . valueOf ( parts [ 1 ] ) ) ;
nodes . add ( node ) ;
if ( parts . length = = 1 ) {
node . addSlotRange ( new ClusterSlotRange ( Integer . valueOf ( parts [ 0 ] ) , Integer . valueOf ( parts [ 0 ] ) ) ) ;
} else if ( parts . length = = 2 ) {
node . addSlotRange ( new ClusterSlotRange ( Integer . valueOf ( parts [ 0 ] ) , Integer . valueOf ( parts [ 1 ] ) ) ) ;
}
}
} else {
nodes . add ( node ) ;
}
nodes . add ( node ) ;
}
return nodes ;
}