@ -15,44 +15,20 @@
* /
package org.redisson.connection ;
import io.netty.channel.EventLoopGroup ;
import io.netty.channel.epoll.EpollDatagramChannel ;
import io.netty.channel.epoll.EpollEventLoopGroup ;
import io.netty.channel.epoll.EpollSocketChannel ;
import io.netty.channel.kqueue.KQueueDatagramChannel ;
import io.netty.channel.kqueue.KQueueEventLoopGroup ;
import io.netty.channel.kqueue.KQueueSocketChannel ;
import io.netty.channel.nio.NioEventLoopGroup ;
import io.netty.channel.socket.SocketChannel ;
import io.netty.channel.socket.nio.NioDatagramChannel ;
import io.netty.channel.socket.nio.NioSocketChannel ;
import io.netty.resolver.AddressResolver ;
import io.netty.resolver.AddressResolverGroup ;
import io.netty.resolver.DefaultAddressResolverGroup ;
import io.netty.resolver.dns.DnsServerAddressStreamProviders ;
import io.netty.util.Timer ;
import io.netty.util.TimerTask ;
import io.netty.util.* ;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.* ;
import io.netty.util.internal.PlatformDependent ;
import org.redisson.ElementsSubscribeService ;
import org.redisson.Version ;
import org.redisson.api.NodeType ;
import org.redisson.client.* ;
import org.redisson.client.codec.Codec ;
import org.redisson.client.protocol.RedisCommand ;
import org.redisson.cluster.ClusterSlotRange ;
import org.redisson.config.* ;
import org.redisson.misc.InfinitySemaphoreLatch ;
import org.redisson.config.BaseConfig ;
import org.redisson.config.BaseMasterSlaveServersConfig ;
import org.redisson.config.MasterSlaveServersConfig ;
import org.redisson.config.ReadMode ;
import org.redisson.misc.RedisURI ;
import org.redisson.pubsub.PublishSubscribeService ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.net.InetAddress ;
import java.net.InetSocketAddress ;
import java.net.UnknownHostException ;
import java.util.* ;
import java.util.concurrent.* ;
import java.util.stream.Collectors ;
@ -64,77 +40,26 @@ import java.util.stream.Collectors;
* /
public class MasterSlaveConnectionManager implements ConnectionManager {
public static final Timeout DUMMY_TIMEOUT = new Timeout ( ) {
@Override
public Timer timer ( ) {
return null ;
}
@Override
public TimerTask task ( ) {
return null ;
}
@Override
public boolean isExpired ( ) {
return false ;
}
@Override
public boolean isCancelled ( ) {
return false ;
}
@Override
public boolean cancel ( ) {
return true ;
}
} ;
protected final String id ;
public static final int MAX_SLOT = 16384 ;
protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange ( 0 , MAX_SLOT - 1 ) ;
private final Logger log = LoggerFactory . getLogger ( getClass ( ) ) ;
private HashedWheelTimer timer ;
protected Codec codec ;
protected final EventLoopGroup group ;
protected final Class < ? extends SocketChannel > socketChannelClass ;
protected DNSMonitor dnsMonitor ;
protected MasterSlaveServersConfig config ;
private MasterSlaveEntry masterSlaveEntry ;
private final Promise < Void > shutdownPromise = ImmediateEventExecutor . INSTANCE . newPromise ( ) ;
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch ( ) ;
protected IdleConnectionWatcher connectionWatcher ;
private final ConnectionEventsHub connectionEventsHub ;
private final ExecutorService executor ;
private final Config cfg ;
protected final AddressResolverGroup < InetSocketAddress > resolverGroup ;
protected PublishSubscribeService subscribeService ;
pr ivate final ElementsSubscribeService elementsSubscribeService = new ElementsSubscribeService ( this ) ;
protected final ServiceManager serviceManager ;
protected PublishSubscribeService subscribeService ;
private final Map < RedisURI , RedisConnection > nodeConnections = new ConcurrentHashMap < > ( ) ;
public MasterSlaveConnectionManager ( BaseMasterSlaveServersConfig < ? > cfg , Config config , UUID id , ConnectionEventsHub connectionEventsHub ) {
this (config , id , connectionEventsHub ) ;
public MasterSlaveConnectionManager ( BaseMasterSlaveServersConfig < ? > cfg , ServiceManager serviceManager ) {
this . serviceManager = serviceManager ;
if ( cfg instanceof MasterSlaveServersConfig ) {
this . config = ( MasterSlaveServersConfig ) cfg ;
@ -146,74 +71,16 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this . config = create ( cfg ) ;
}
initTimer ( ) ;
serviceManager . setConfig ( this . config ) ;
serviceManager . initTimer ( ) ;
subscribeService = new PublishSubscribeService ( this ) ;
}
private MasterSlaveConnectionManager ( Config cfg , UUID id , ConnectionEventsHub connectionEventsHub ) {
this . id = id . toString ( ) ;
Version . logVersion ( ) ;
if ( cfg . getTransportMode ( ) = = TransportMode . EPOLL ) {
if ( cfg . getEventLoopGroup ( ) = = null ) {
this . group = new EpollEventLoopGroup ( cfg . getNettyThreads ( ) , new DefaultThreadFactory ( "redisson-netty" ) ) ;
} else {
this . group = cfg . getEventLoopGroup ( ) ;
}
this . socketChannelClass = EpollSocketChannel . class ;
if ( PlatformDependent . isAndroid ( ) ) {
this . resolverGroup = DefaultAddressResolverGroup . INSTANCE ;
} else {
this . resolverGroup = cfg . getAddressResolverGroupFactory ( ) . create ( EpollDatagramChannel . class , DnsServerAddressStreamProviders . platformDefault ( ) ) ;
}
} else if ( cfg . getTransportMode ( ) = = TransportMode . KQUEUE ) {
if ( cfg . getEventLoopGroup ( ) = = null ) {
this . group = new KQueueEventLoopGroup ( cfg . getNettyThreads ( ) , new DefaultThreadFactory ( "redisson-netty" ) ) ;
} else {
this . group = cfg . getEventLoopGroup ( ) ;
}
this . socketChannelClass = KQueueSocketChannel . class ;
if ( PlatformDependent . isAndroid ( ) ) {
this . resolverGroup = DefaultAddressResolverGroup . INSTANCE ;
} else {
this . resolverGroup = cfg . getAddressResolverGroupFactory ( ) . create ( KQueueDatagramChannel . class , DnsServerAddressStreamProviders . platformDefault ( ) ) ;
}
} else {
if ( cfg . getEventLoopGroup ( ) = = null ) {
this . group = new NioEventLoopGroup ( cfg . getNettyThreads ( ) , new DefaultThreadFactory ( "redisson-netty" ) ) ;
} else {
this . group = cfg . getEventLoopGroup ( ) ;
}
this . socketChannelClass = NioSocketChannel . class ;
if ( PlatformDependent . isAndroid ( ) ) {
this . resolverGroup = DefaultAddressResolverGroup . INSTANCE ;
} else {
this . resolverGroup = cfg . getAddressResolverGroupFactory ( ) . create ( NioDatagramChannel . class , DnsServerAddressStreamProviders . platformDefault ( ) ) ;
}
}
if ( cfg . getExecutor ( ) = = null ) {
int threads = Runtime . getRuntime ( ) . availableProcessors ( ) * 2 ;
if ( cfg . getThreads ( ) ! = 0 ) {
threads = cfg . getThreads ( ) ;
}
executor = Executors . newFixedThreadPool ( threads , new DefaultThreadFactory ( "redisson" ) ) ;
} else {
executor = cfg . getExecutor ( ) ;
}
this . cfg = cfg ;
this . codec = cfg . getCodec ( ) ;
this . connectionEventsHub = connectionEventsHub ;
if ( cfg . getConnectionListener ( ) ! = null ) {
this . connectionEventsHub . addListener ( cfg . getConnectionListener ( ) ) ;
}
@Override
public ServiceManager getServiceManager ( ) {
return serviceManager ;
}
protected void closeNodeConnections ( ) {
nodeConnections . values ( ) . stream ( )
. map ( c - > c . getRedisClient ( ) . shutdownAsync ( ) )
@ -269,31 +136,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} ) ;
}
@Override
public String getId ( ) {
return id ;
}
@Override
public boolean isClusterMode ( ) {
return false ;
}
@Override
public Config getCfg ( ) {
return cfg ;
}
@Override
public MasterSlaveServersConfig getConfig ( ) {
return config ;
}
@Override
public Codec getCodec ( ) {
return codec ;
}
@Override
public Collection < MasterSlaveEntry > getEntrySet ( ) {
if ( masterSlaveEntry ! = null ) {
@ -302,30 +149,12 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return Collections . emptyList ( ) ;
}
private void initTimer ( ) {
int [ ] timeouts = new int [ ] { config . getRetryInterval ( ) , config . getTimeout ( ) } ;
Arrays . sort ( timeouts ) ;
int minTimeout = timeouts [ 0 ] ;
if ( minTimeout % 100 ! = 0 ) {
minTimeout = ( minTimeout % 100 ) / 2 ;
} else if ( minTimeout = = 100 ) {
minTimeout = 50 ;
} else {
minTimeout = 100 ;
}
timer = new HashedWheelTimer ( new DefaultThreadFactory ( "redisson-timer" ) , minTimeout , TimeUnit . MILLISECONDS , 1024 , false ) ;
connectionWatcher = new IdleConnectionWatcher ( group , config ) ;
subscribeService = new PublishSubscribeService ( this ) ;
}
public void connect ( ) {
try {
if ( config . checkSkipSlavesInit ( ) ) {
masterSlaveEntry = new SingleEntry ( this , connectionWatcher, config ) ;
masterSlaveEntry = new SingleEntry ( this , serviceManager . getConnectionWatcher ( ) , config ) ;
} else {
masterSlaveEntry = new MasterSlaveEntry ( this , connectionWatcher, config ) ;
masterSlaveEntry = new MasterSlaveEntry ( this , serviceManager . getConnectionWatcher ( ) , config ) ;
}
CompletableFuture < RedisClient > masterFuture = masterSlaveEntry . setupMasterEntry ( new RedisURI ( config . getMasterAddress ( ) ) ) ;
masterFuture . join ( ) ;
@ -356,7 +185,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if ( config . getDnsMonitoringInterval ( ) ! = - 1 ) {
Set < RedisURI > slaveAddresses = config . getSlaveAddresses ( ) . stream ( ) . map ( r - > new RedisURI ( r ) ) . collect ( Collectors . toSet ( ) ) ;
dnsMonitor = new DNSMonitor ( this , masterHost ,
slaveAddresses , config . getDnsMonitoringInterval ( ) , resolverGroup) ;
slaveAddresses , config . getDnsMonitoringInterval ( ) , se rviceManager. getR esolverGroup( ) ) ;
dnsMonitor . start ( ) ;
}
}
@ -419,8 +248,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return client ;
}
@Override
public RedisClient createClient ( NodeType type , RedisURI address , int timeout , int commandTimeout , String sslHostname ) {
protected RedisClient createClient ( NodeType type , RedisURI address , int timeout , int commandTimeout , String sslHostname ) {
RedisClientConfig redisConfig = createRedisConfig ( type , address , timeout , commandTimeout , sslHostname ) ;
return RedisClient . create ( redisConfig ) ;
}
@ -435,11 +263,11 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
protected RedisClientConfig createRedisConfig ( NodeType type , RedisURI address , int timeout , int commandTimeout , String sslHostname ) {
RedisClientConfig redisConfig = new RedisClientConfig ( ) ;
redisConfig . setAddress ( address )
. setTimer ( timer)
. setExecutor ( executor)
. setResolverGroup ( resolverGroup)
. setGroup ( group)
. setSocketChannelClass ( s ocketChannelClass)
. setTimer ( serviceManager. ge tT imer( ) )
. setExecutor ( s erviceManager. getE xecutor( ) )
. setResolverGroup ( se rviceManager. getR esolverGroup( ) )
. setGroup ( serviceMana ger. getG roup( ) )
. setSocketChannelClass ( s erviceManager. getS ocketChannelClass( ) )
. setConnectTimeout ( timeout )
. setCommandTimeout ( commandTimeout )
. setSslHostname ( sslHostname )
@ -451,22 +279,22 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
. setSslKeystorePassword ( config . getSslKeystorePassword ( ) )
. setSslProtocols ( config . getSslProtocols ( ) )
. setClientName ( config . getClientName ( ) )
. setKeepPubSubOrder ( cfg. isKeepPubSubOrder ( ) )
. setKeepPubSubOrder ( servi ceManager. getC fg( ) . isKeepPubSubOrder ( ) )
. setPingConnectionInterval ( config . getPingConnectionInterval ( ) )
. setKeepAlive ( config . isKeepAlive ( ) )
. setTcpNoDelay ( config . isTcpNoDelay ( ) )
. setUsername ( config . getUsername ( ) )
. setPassword ( config . getPassword ( ) )
. setNettyHook ( cfg. getNettyHook ( ) )
. setNettyHook ( servi ceManager. getC fg( ) . getNettyHook ( ) )
. setCredentialsResolver ( config . getCredentialsResolver ( ) )
. setConnectedListener ( addr - > {
if ( ! isShuttingDown( ) ) {
connectionEventsHub. fireConnect ( addr ) ;
if ( ! serviceManager. isShuttingDown( ) ) {
servi ceManager. getC onnectionEventsHub( ) . fireConnect ( addr ) ;
}
} )
. setDisconnectedListener ( addr - > {
if ( ! isShuttingDown( ) ) {
connectionEventsHub. fireDisconnect ( addr ) ;
if ( ! serviceManager. isShuttingDown( ) ) {
servi ceManager. getC onnectionEventsHub( ) . fireDisconnect ( addr ) ;
}
} ) ;
@ -522,7 +350,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
MasterSlaveEntry entry = getEntry ( source ) ;
if ( entry = = null ) {
CompletableFuture < RedisConnection > f = new CompletableFuture < > ( ) ;
f . completeExceptionally ( createNodeNotFoundException( source ) ) ;
f . completeExceptionally ( serviceManager. createNodeNotFoundException( source ) ) ;
return f ;
}
// fix for https://github.com/redisson/redisson/issues/1548
@ -554,7 +382,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
MasterSlaveEntry entry = getEntry ( source ) ;
if ( entry = = null ) {
CompletableFuture < RedisConnection > f = new CompletableFuture < > ( ) ;
f . completeExceptionally ( createNodeNotFoundException( source ) ) ;
f . completeExceptionally ( serviceManager. createNodeNotFoundException( source ) ) ;
return f ;
}
@ -568,16 +396,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return entry . connectionReadOp ( command ) ;
}
public RedisNodeNotFoundException createNodeNotFoundException ( NodeSource source ) {
RedisNodeNotFoundException ex ;
if ( source . getSlot ( ) ! = null & & source . getAddr ( ) = = null & & source . getRedisClient ( ) = = null ) {
ex = new RedisNodeNotFoundException ( "Node for slot: " + source . getSlot ( ) + " hasn't been discovered yet. Check cluster slots coverage using CLUSTER NODES command. Increase value of retryAttempts and/or retryInterval settings." ) ;
} else {
ex = new RedisNodeNotFoundException ( "Node: " + source + " hasn't been discovered yet. Increase value of retryAttempts and/or retryInterval settings." ) ;
}
return ex ;
}
@Override
public void releaseWrite ( NodeSource source , RedisConnection connection ) {
MasterSlaveEntry entry = getEntry ( source ) ;
@ -609,8 +427,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
if ( dnsMonitor ! = null ) {
dnsMonitor . stop ( ) ;
}
connectionWatcher. stop ( ) ;
servi ceManager. getC onnectionWatcher( ) . stop ( ) ;
List < CompletableFuture < Void > > futures = new ArrayList < > ( ) ;
for ( MasterSlaveEntry entry : getEntrySet ( ) ) {
@ -623,64 +441,26 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
} catch ( Exception e ) {
// skip
}
resolverGroup. close ( ) ;
se rviceManager. getR esolverGroup( ) . close ( ) ;
s hutdownLatch. close ( ) ;
if ( cfg. getExecutor ( ) = = null ) {
executor. shutdown ( ) ;
s erviceManager. getS hutdownLatch( ) . close ( ) ;
if ( servi ceManager. getC fg( ) . getExecutor ( ) = = null ) {
s erviceManager. getE xecutor( ) . shutdown ( ) ;
try {
executor. awaitTermination ( timeout , unit ) ;
s erviceManager. getE xecutor( ) . awaitTermination ( timeout , unit ) ;
} catch ( InterruptedException e ) {
Thread . currentThread ( ) . interrupt ( ) ;
}
}
shutdownPromise . trySuccess ( null ) ;
shutdownLatch . awaitUninterruptibly ( ) ;
if ( cfg . getEventLoopGroup ( ) = = null ) {
group . shutdownGracefully ( quietPeriod , timeout , unit ) . syncUninterruptibly ( ) ;
}
serviceManager . getShutdownPromise ( ) . trySuccess ( null ) ;
serviceManager . getShutdownLatch ( ) . awaitUninterruptibly ( ) ;
timer . stop ( ) ;
}
@Override
public boolean isShuttingDown ( ) {
return shutdownLatch . isClosed ( ) ;
}
@Override
public boolean isShutdown ( ) {
return group . isTerminated ( ) ;
}
@Override
public EventLoopGroup getGroup ( ) {
return group ;
}
@Override
public Timeout newTimeout ( TimerTask task , long delay , TimeUnit unit ) {
try {
return timer . newTimeout ( task , delay , unit ) ;
} catch ( IllegalStateException e ) {
if ( isShuttingDown ( ) ) {
return DUMMY_TIMEOUT ;
}
throw e ;
if ( serviceManager . getCfg ( ) . getEventLoopGroup ( ) = = null ) {
serviceManager . getGroup ( ) . shutdownGracefully ( quietPeriod , timeout , unit ) . syncUninterruptibly ( ) ;
}
}
@Override
public InfinitySemaphoreLatch getShutdownLatch ( ) {
return shutdownLatch ;
}
@Override
public Future < Void > getShutdownPromise ( ) {
return shutdownPromise ;
serviceManager . getTimer ( ) . stop ( ) ;
}
protected void stopThreads ( ) {
@ -691,65 +471,8 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
return subscribeService ;
}
public ElementsSubscribeService getElementsSubscribeService ( ) {
return elementsSubscribeService ;
}
public ExecutorService getExecutor ( ) {
return executor ;
}
public RedisURI getLastClusterNode ( ) {
return null ;
}
@Override
public RedisURI applyNatMap ( RedisURI address ) {
return address ;
}
@Override
public CompletableFuture < RedisURI > resolveIP ( RedisURI address ) {
return resolveIP ( address . getScheme ( ) , address ) ;
}
protected final CompletableFuture < RedisURI > resolveIP ( String scheme , RedisURI address ) {
if ( address . isIP ( ) ) {
RedisURI addr = toURI ( scheme , address . getHost ( ) , "" + address . getPort ( ) ) ;
return CompletableFuture . completedFuture ( addr ) ;
}
CompletableFuture < RedisURI > result = new CompletableFuture < > ( ) ;
AddressResolver < InetSocketAddress > resolver = resolverGroup . getResolver ( getGroup ( ) . next ( ) ) ;
InetSocketAddress addr = InetSocketAddress . createUnresolved ( address . getHost ( ) , address . getPort ( ) ) ;
Future < InetSocketAddress > future = resolver . resolve ( addr ) ;
future . addListener ( ( FutureListener < InetSocketAddress > ) f - > {
if ( ! f . isSuccess ( ) ) {
log . error ( "Unable to resolve {}" , address , f . cause ( ) ) ;
result . completeExceptionally ( f . cause ( ) ) ;
return ;
}
InetSocketAddress s = f . getNow ( ) ;
RedisURI uri = toURI ( scheme , s . getAddress ( ) . getHostAddress ( ) , "" + address . getPort ( ) ) ;
result . complete ( uri ) ;
} ) ;
return result ;
}
protected final RedisURI toURI ( String scheme , String host , String port ) {
// convert IPv6 address to unified compressed format
if ( NetUtil . isValidIpV6Address ( host ) ) {
byte [ ] addr = NetUtil . createByteArrayFromIpAddressString ( host ) ;
try {
InetAddress ia = InetAddress . getByAddress ( host , addr ) ;
host = ia . getHostAddress ( ) ;
} catch ( UnknownHostException e ) {
throw new RuntimeException ( e ) ;
}
}
RedisURI uri = new RedisURI ( scheme + "://" + host + ":" + port ) ;
return applyNatMap ( uri ) ;
}
}