@ -19,7 +19,6 @@ import io.netty.resolver.AddressResolver;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.FutureListener ;
import io.netty.util.concurrent.ScheduledFuture ;
import org.redisson.api.NatMapper ;
import org.redisson.api.NodeType ;
import org.redisson.api.RFuture ;
import org.redisson.client.* ;
@ -27,10 +26,7 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand ;
import org.redisson.cluster.ClusterNodeInfo.Flag ;
import org.redisson.cluster.ClusterPartition.Type ;
import org.redisson.config.ClusterServersConfig ;
import org.redisson.config.Config ;
import org.redisson.config.MasterSlaveServersConfig ;
import org.redisson.config.ReadMode ;
import org.redisson.config.* ;
import org.redisson.connection.* ;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason ;
import org.redisson.misc.RedisURI ;
@ -63,25 +59,30 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private String configEndpointHostName ;
private final NatMapper natMapper ;
private final AtomicReferenceArray < MasterSlaveEntry > slot2entry = new AtomicReferenceArray < > ( MAX_SLOT ) ;
private final Map < RedisClient , MasterSlaveEntry > client2entry = new ConcurrentHashMap < > ( ) ;
private ClusterServersConfig cfg ;
public ClusterConnectionManager ( ClusterServersConfig cfg , Config config , UUID id ) {
super ( config , id ) ;
super ( cfg , config , id ) ;
}
@Override
protected MasterSlaveServersConfig create ( BaseMasterSlaveServersConfig < ? > cfg ) {
this . cfg = ( ClusterServersConfig ) cfg ;
return super . create ( cfg ) ;
}
@Override
public void connect ( ) {
if ( cfg . getNodeAddresses ( ) . isEmpty ( ) ) {
throw new IllegalArgumentException ( "At least one cluster node should be defined!" ) ;
}
this . natMapper = cfg . getNatMapper ( ) ;
this . config = create ( cfg ) ;
initTimer ( this . config ) ;
Throwable lastException = null ;
List < String > failedMasters = new ArrayList < String > ( ) ;
List < String > failedMasters = new ArrayList < > ( ) ;
for ( String address : cfg . getNodeAddresses ( ) ) {
RedisURI addr = new RedisURI ( address ) ;
CompletionStage < RedisConnection > connectionFuture = connectToNode ( cfg , addr , addr . getHost ( ) ) ;
@ -91,14 +92,14 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
if ( cfg . getNodeAddresses ( ) . size ( ) = = 1 & & ! addr . isIP ( ) ) {
configEndpointHostName = addr . getHost ( ) ;
}
clusterNodesCommand = RedisCommands . CLUSTER_NODES ;
if ( addr . isSsl ( ) ) {
clusterNodesCommand = RedisCommands . CLUSTER_NODES_SSL ;
}
List < ClusterNodeInfo > nodes = connection . sync ( clusterNodesCommand ) ;
StringBuilder nodesValue = new StringBuilder ( ) ;
for ( ClusterNodeInfo clusterNodeInfo : nodes ) {
nodesValue . append ( clusterNodeInfo . getNodeInfo ( ) ) . append ( "\n" ) ;
@ -106,7 +107,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
log . info ( "Redis cluster nodes configuration got from {}:\n{}" , connection . getRedisClient ( ) . getAddr ( ) , nodesValue ) ;
lastClusterNode = addr ;
CompletableFuture < Collection < ClusterPartition > > partitionsFuture = parsePartitions ( nodes ) ;
Collection < ClusterPartition > partitions ;
try {
@ -163,7 +164,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
throw new RedisConnectionException ( "Not all slots covered! Only " + lastPartitions . size ( ) + " slots are available. Set checkSlotsCoverage = false to avoid this check. Failed masters according to cluster status: " + failedMasters , lastException ) ;
}
}
scheduleClusterChangeCheck ( cfg ) ;
}
@ -818,7 +819,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
@Override
public RedisURI applyNatMap ( RedisURI address ) {
return natMapper . map ( address ) ;
return cfg. getNatMapper ( ) . map ( address ) ;
}
private CompletableFuture < Collection < ClusterPartition > > parsePartitions ( List < ClusterNodeInfo > nodes ) {