@ -15,12 +15,29 @@
* /
* /
package org.redisson.connection ;
package org.redisson.connection ;
import java.net.InetAddress ;
import java.net.UnknownHostException ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.atomic.AtomicReference ;
import org.redisson.Config ;
import org.redisson.Config ;
import org.redisson.MasterSlaveServersConfig ;
import org.redisson.MasterSlaveServersConfig ;
import org.redisson.SingleServerConfig ;
import org.redisson.SingleServerConfig ;
import org.redisson.client.RedisConnectionException ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import io.netty.util.concurrent.GlobalEventExecutor ;
import io.netty.util.concurrent.ScheduledFuture ;
public class SingleConnectionManager extends MasterSlaveConnectionManager {
public class SingleConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory . getLogger ( getClass ( ) ) ;
private final AtomicReference < InetAddress > currentMaster = new AtomicReference < InetAddress > ( ) ;
private ScheduledFuture < ? > monitorFuture ;
public SingleConnectionManager ( SingleServerConfig cfg , Config config ) {
public SingleConnectionManager ( SingleServerConfig cfg , Config config ) {
MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig ( ) ;
MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig ( ) ;
String addr = cfg . getAddress ( ) . getHost ( ) + ":" + cfg . getAddress ( ) . getPort ( ) ;
String addr = cfg . getAddress ( ) . getHost ( ) + ":" + cfg . getAddress ( ) . getPort ( ) ;
@ -37,6 +54,16 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
newconfig . setSlaveSubscriptionConnectionPoolSize ( cfg . getSubscriptionConnectionPoolSize ( ) ) ;
newconfig . setSlaveSubscriptionConnectionPoolSize ( cfg . getSubscriptionConnectionPoolSize ( ) ) ;
init ( newconfig , config ) ;
init ( newconfig , config ) ;
if ( cfg . isDnsMonitoring ( ) ) {
try {
this . currentMaster . set ( InetAddress . getByName ( cfg . getAddress ( ) . getHost ( ) ) ) ;
} catch ( UnknownHostException e ) {
throw new RedisConnectionException ( "Unknown host" , e ) ;
}
log . debug ( "DNS monitoring enabled; Current master set to {}" , currentMaster . get ( ) ) ;
monitorDnsChange ( cfg ) ;
}
}
}
@Override
@Override
@ -45,4 +72,35 @@ public class SingleConnectionManager extends MasterSlaveConnectionManager {
entries . put ( MAX_SLOT , entry ) ;
entries . put ( MAX_SLOT , entry ) ;
}
}
private void monitorDnsChange ( final SingleServerConfig cfg ) {
monitorFuture = GlobalEventExecutor . INSTANCE . scheduleWithFixedDelay ( new Runnable ( ) {
@Override
public void run ( ) {
try {
InetAddress master = currentMaster . get ( ) ;
InetAddress now = InetAddress . getByName ( cfg . getAddress ( ) . getHost ( ) ) ;
if ( ! now . getHostAddress ( ) . equals ( master . getHostAddress ( ) ) ) {
log . info ( "Detected DNS change. {} has changed from {} to {}" , cfg . getAddress ( ) . getHost ( ) , master . getHostAddress ( ) , now . getHostAddress ( ) ) ;
if ( currentMaster . compareAndSet ( master , now ) ) {
changeMaster ( MAX_SLOT , cfg . getAddress ( ) . getHost ( ) , cfg . getAddress ( ) . getPort ( ) ) ;
log . info ( "Master has been changed" ) ;
}
}
} catch ( Exception e ) {
log . error ( e . getMessage ( ) , e ) ;
}
}
} , cfg . getDnsMonitoringInterval ( ) , cfg . getDnsMonitoringInterval ( ) , TimeUnit . MILLISECONDS ) ;
}
@Override
public void shutdown ( ) {
if ( monitorFuture ! = null ) {
monitorFuture . cancel ( true ) ;
}
super . shutdown ( ) ;
}
}
}