@ -15,6 +15,7 @@
* /
package org.redisson.connection ;
import java.net.InetSocketAddress ;
import java.net.URI ;
import java.util.ArrayList ;
import java.util.Collections ;
@ -51,6 +52,7 @@ import org.redisson.misc.URIBuilder;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import io.netty.resolver.AddressResolver ;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.FutureListener ;
import io.netty.util.concurrent.ScheduledFuture ;
@ -70,7 +72,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Set < String > slaves = Collections . newSetFromMap ( PlatformDependent . < String , Boolean > newConcurrentHashMap ( ) ) ;
private final Set < URI > disconnectedSlaves = new HashSet < URI > ( ) ;
private String masterName ;
private ScheduledFuture < ? > monitorFuture ;
private AddressResolver < InetSocketAddress > sentinelResolver ;
public SentinelConnectionManager ( SentinelServersConfig cfg , Config config ) {
super ( config ) ;
@ -79,9 +83,12 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
throw new IllegalArgumentException ( "masterName parameter is not defined!" ) ;
}
this . masterName = cfg . getMasterName ( ) ;
this . config = create ( cfg ) ;
initTimer ( this . config ) ;
this . sentinelResolver = resolverGroup . getResolver ( getGroup ( ) . next ( ) ) ;
for ( URI addr : cfg . getSentinelAddresses ( ) ) {
RedisClient client = createClient ( NodeType . SENTINEL , addr , this . config . getConnectTimeout ( ) , this . config . getRetryInterval ( ) * this . config . getRetryAttempts ( ) ) ;
try {
@ -133,7 +140,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String host = createAddress ( ip , port ) ;
URI sentinelAddr = URIBuilder . create ( host ) ;
RFuture < Void > future = registerSentinel ( cfg, sentinelAddr, this . config ) ;
RFuture < Void > future = registerSentinel ( sentinelAddr, this . config ) ;
connectionFutures . add ( future ) ;
}
@ -159,6 +166,76 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
scheduleChangeCheck ( cfg , null ) ;
}
@Override
protected void startDNSMonitoring ( RedisClient masterHost ) {
if ( config . getDnsMonitoringInterval ( ) = = - 1 ) {
return ;
}
scheduleSentinelDNSCheck ( ) ;
}
protected void scheduleSentinelDNSCheck ( ) {
monitorFuture = group . schedule ( new Runnable ( ) {
@Override
public void run ( ) {
List < RedisClient > sentinels = new ArrayList < RedisClient > ( SentinelConnectionManager . this . sentinels . values ( ) ) ;
final AtomicInteger sentinelsCounter = new AtomicInteger ( sentinels . size ( ) ) ;
FutureListener < List < InetSocketAddress > > commonListener = new FutureListener < List < InetSocketAddress > > ( ) {
@Override
public void operationComplete ( Future < List < InetSocketAddress > > future ) throws Exception {
if ( sentinelsCounter . decrementAndGet ( ) = = 0 ) {
scheduleSentinelDNSCheck ( ) ;
}
}
} ;
for ( final RedisClient client : sentinels ) {
Future < List < InetSocketAddress > > allNodes = sentinelResolver . resolveAll ( InetSocketAddress . createUnresolved ( client . getAddr ( ) . getHostName ( ) , client . getAddr ( ) . getPort ( ) ) ) ;
allNodes . addListener ( new FutureListener < List < InetSocketAddress > > ( ) {
@Override
public void operationComplete ( Future < List < InetSocketAddress > > future ) throws Exception {
if ( ! future . isSuccess ( ) ) {
log . error ( "Unable to resolve " + client . getAddr ( ) . getHostName ( ) , future . cause ( ) ) ;
return ;
}
boolean clientFound = false ;
for ( InetSocketAddress addr : future . getNow ( ) ) {
boolean found = false ;
for ( RedisClient currentSentinel : SentinelConnectionManager . this . sentinels . values ( ) ) {
if ( currentSentinel . getAddr ( ) . getAddress ( ) . getHostAddress ( ) . equals ( addr . getAddress ( ) . getHostAddress ( ) )
& & currentSentinel . getAddr ( ) . getPort ( ) = = addr . getPort ( ) ) {
found = true ;
break ;
}
}
if ( ! found ) {
URI uri = convert ( addr . getAddress ( ) . getHostAddress ( ) , "" + addr . getPort ( ) ) ;
registerSentinel ( uri , getConfig ( ) ) ;
}
if ( client . getAddr ( ) . getAddress ( ) . getHostAddress ( ) . equals ( addr . getAddress ( ) . getHostAddress ( ) )
& & client . getAddr ( ) . getPort ( ) = = addr . getPort ( ) ) {
clientFound = true ;
}
}
if ( ! clientFound ) {
String addr = client . getAddr ( ) . getAddress ( ) . getHostAddress ( ) + ":" + client . getAddr ( ) . getPort ( ) ;
RedisClient sentinel = SentinelConnectionManager . this . sentinels . remove ( addr ) ;
if ( sentinel ! = null ) {
sentinel . shutdownAsync ( ) ;
log . warn ( "sentinel: {} has down" , addr ) ;
}
}
}
} ) ;
allNodes . addListener ( commonListener ) ;
}
}
} , config . getDnsMonitoringInterval ( ) , TimeUnit . MILLISECONDS ) ;
}
private void scheduleChangeCheck ( final SentinelServersConfig cfg , final Iterator < RedisClient > iterator ) {
monitorFuture = group . schedule ( new Runnable ( ) {
@Override
@ -269,13 +346,13 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String masterHost = map . get ( "master-host" ) ;
String masterPort = map . get ( "master-port" ) ;
if ( ! isUseSameMaster ( ip , port , masterHost , masterPort ) ) {
continue ;
}
if ( flags . contains ( "s_down" ) | | flags . contains ( "disconnected" ) ) {
slaveDown ( ip , port ) ;
continue ;
}
if ( ! isUseSameMaster ( ip , port , masterHost , masterPort ) ) {
continue ;
}
String slaveAddr = createAddress ( ip , port ) ;
currentSlaves . add ( slaveAddr ) ;
@ -305,7 +382,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} ) ;
slavesFuture . addListener ( commonListener ) ;
}
RFuture < List < Map < String , String > > > sentinelsFuture = connection . async ( StringCodec . INSTANCE , RedisCommands . SENTINEL_SENTINELS , cfg . getMasterName ( ) ) ;
sentinelsFuture . addListener ( new FutureListener < List < Map < String , String > > > ( ) {
@Override
@ -323,9 +400,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = map . get ( "ip" ) ;
String port = map . get ( "port" ) ;
String host = createAddress ( ip , port ) ;
URI sentinelAddr = URIBuilder . create ( host ) ;
registerSentinel ( cfg , sentinelAddr , getConfig ( ) ) ;
URI sentinelAddr = convert ( ip , port ) ;
registerSentinel ( sentinelAddr , getConfig ( ) ) ;
}
}
} ) ;
@ -350,7 +426,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return entry ;
}
private RFuture < Void > registerSentinel ( final SentinelServersConfig cfg , final URI addr , final MasterSlaveServersConfig c ) {
private RFuture < Void > registerSentinel ( final URI addr , final MasterSlaveServersConfig c ) {
String key = addr . getHost ( ) + ":" + addr . getPort ( ) ;
RedisClient client = sentinels . get ( key ) ;
if ( client ! = null ) {
@ -380,7 +456,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
log . debug ( "message {} from {}" , msg , channel ) ;
if ( "+sentinel" . equals ( channel ) ) {
onSentinelAdded ( cfg , ( String ) msg , c ) ;
onSentinelAdded ( ( String ) msg , c ) ;
}
if ( "+slave" . equals ( channel ) ) {
onSlaveAdded ( addr , ( String ) msg ) ;
@ -392,7 +468,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
onNodeUp ( addr , ( String ) msg ) ;
}
if ( "+switch-master" . equals ( channel ) ) {
onMasterChange ( cfg, addr, ( String ) msg ) ;
onMasterChange ( addr, ( String ) msg ) ;
}
}
@ -413,14 +489,14 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return RedissonPromise . newSucceededFuture ( null ) ;
}
protected void onSentinelAdded ( S entinelServersConfig cfg , S tring msg , MasterSlaveServersConfig c ) {
protected void onSentinelAdded ( S tring msg , MasterSlaveServersConfig c ) {
String [ ] parts = msg . split ( " " ) ;
if ( "sentinel" . equals ( parts [ 0 ] ) ) {
String ip = parts [ 2 ] ;
String port = parts [ 3 ] ;
URI uri = convert ( ip , port ) ;
registerSentinel ( cfg, uri, c ) ;
registerSentinel ( uri, c ) ;
}
}
@ -446,8 +522,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
protected RFuture < Void > addSlave ( final String ip , final String port , final String slaveAddr ) {
final RPromise < Void > result = new RedissonPromise < Void > ( ) ;
// to avoid addition twice
final MasterSlaveEntry entry = getEntry ( singleSlotRange . getStartSlot ( ) ) ;
final URI uri = convert ( ip , port ) ;
if ( slaves . add ( slaveAddr ) & & ! config . checkSkipSlavesInit ( ) ) {
final MasterSlaveEntry entry = getEntry ( singleSlotRange . getStartSlot ( ) ) ;
RFuture < Void > future = entry . addSlave ( URIBuilder . create ( slaveAddr ) ) ;
future . addListener ( new FutureListener < Void > ( ) {
@Override
@ -459,8 +536,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return ;
}
URI uri = convert ( ip , port ) ;
if ( entry . slaveUp ( uri , FreezeReason . MANAGER ) ) {
if ( entry . isSlaveUnfreezed ( uri ) | | entry . slaveUp ( uri , FreezeReason . MANAGER ) ) {
String slaveAddr = ip + ":" + port ;
log . info ( "slave: {} added" , slaveAddr ) ;
result . trySuccess ( null ) ;
@ -469,7 +545,9 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
} ) ;
} else {
slaveUp ( ip , port ) ;
if ( entry . hasSlave ( uri ) ) {
slaveUp ( ip , port ) ;
}
result . trySuccess ( null ) ;
}
return result ;
@ -589,11 +667,11 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
}
private void onMasterChange ( SentinelServersConfig cfg , URI addr , String msg ) {
private void onMasterChange ( URI addr , String msg ) {
String [ ] parts = msg . split ( " " ) ;
if ( parts . length > 3 ) {
if ( cfg. getMasterName ( ) . equals ( parts [ 0 ] ) ) {
if ( masterName . equals ( parts [ 0 ] ) ) {
String ip = parts [ 3 ] ;
String port = parts [ 4 ] ;