@ -16,67 +16,54 @@
package com.alibaba.cloud.nacos.discovery ;
import java.util.concurrent.ScheduledFuture ;
import java.util.Arrays ;
import java.util.List ;
import java.util.Map ;
import java.util.Optional ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicLong ;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties ;
import com.alibaba.cloud.nacos.NacosServiceManager ;
import com.alibaba.nacos.api.exception.NacosException ;
import com.alibaba.nacos.api.naming.NamingService ;
import com.alibaba.nacos.api.naming.listener.Event ;
import com.alibaba.nacos.api.naming.listener.EventListener ;
import com.alibaba.nacos.api.naming.listener.NamingEvent ;
import com.alibaba.nacos.api.naming.pojo.Instance ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.ObjectProvider ;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent ;
import org.springframework.context.ApplicationEventPublisher ;
import org.springframework.context.ApplicationEventPublisherAware ;
import org.springframework.context.SmartLifecycle ;
import org.springframework.scheduling.TaskScheduler ;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler ;
/ * *
* @author xiaojing
* @author yuhuangbin
* /
public class NacosWatch implements ApplicationEventPublisherAware , SmartLifecycle {
private static final Logger log = LoggerFactory . getLogger ( NacosWatch . class ) ;
private final NacosDiscoveryProperties properties ;
private Map < String , EventListener > listenerMap = new ConcurrentHashMap < > ( 16 ) ;
private final TaskScheduler taskScheduler ;
private final AtomicBoolean running = new AtomicBoolean ( false ) ;
private final AtomicLong nacosWatchIndex = new AtomicLong ( 0 ) ;
private final AtomicBoolean running = new AtomicBoolean ( false ) ;
private ApplicationEventPublisher publisher ;
private ScheduledFuture< ? > watchFuture ;
private NacosServiceManager nacosServiceManager ;
public NacosWatch ( NacosDiscoveryProperties properties ) {
this ( properties , getTaskScheduler ( ) ) ;
}
private final NacosDiscoveryProperties properties ;
public NacosWatch ( NacosDiscoveryProperties properties , TaskScheduler taskScheduler ) {
public NacosWatch ( NacosServiceManager nacosServiceManager ,
NacosDiscoveryProperties properties ) {
this . nacosServiceManager = nacosServiceManager ;
this . properties = properties ;
this . taskScheduler = taskScheduler ;
}
/ * *
* The constructor with { @link NacosDiscoveryProperties } bean and the optional .
* { @link TaskScheduler } bean
* @param properties { @link NacosDiscoveryProperties } bean
* @param taskScheduler the optional { @link TaskScheduler } bean
* @since 2.2 .0
* /
public NacosWatch ( NacosDiscoveryProperties properties ,
ObjectProvider < TaskScheduler > taskScheduler ) {
this ( properties , taskScheduler . getIfAvailable ( NacosWatch : : getTaskScheduler ) ) ;
}
private static ThreadPoolTaskScheduler getTaskScheduler ( ) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ( ) ;
taskScheduler . setBeanName ( "Nacos-Watch-Task-Scheduler" ) ;
taskScheduler . initialize ( ) ;
return taskScheduler ;
}
@Override
@ -98,19 +85,67 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
@Override
public void start ( ) {
if ( this . running . compareAndSet ( false , true ) ) {
this . watchFuture = this . taskScheduler . scheduleWithFixedDelay (
this : : nacosServicesWatch , this . properties . getWatchDelay ( ) ) ;
EventListener eventListener = listenerMap . computeIfAbsent ( buildKey ( ) ,
event - > new EventListener ( ) {
@Override
public void onEvent ( Event event ) {
if ( event instanceof NamingEvent ) {
List < Instance > instances = ( ( NamingEvent ) event )
. getInstances ( ) ;
Optional < Instance > instanceOptional = selectCurrentInstance (
instances ) ;
instanceOptional . ifPresent ( currentInstance - > {
resetIfNeeded ( currentInstance ) ;
} ) ;
publisher . publishEvent ( new HeartbeatEvent ( NacosWatch . this ,
nacosWatchIndex . getAndIncrement ( ) ) ) ;
}
}
} ) ;
NamingService namingService = nacosServiceManager
. getNamingService ( properties . getNacosProperties ( ) ) ;
try {
namingService . subscribe ( properties . getService ( ) , properties . getGroup ( ) ,
Arrays . asList ( properties . getClusterName ( ) ) , eventListener ) ;
}
catch ( Exception e ) {
log . error ( "namingService subscribe failed, properties:{}" , properties , e ) ;
}
}
}
private String buildKey ( ) {
return String . join ( ":" , properties . getService ( ) , properties . getGroup ( ) ) ;
}
private void resetIfNeeded ( Instance instance ) {
if ( ! properties . getMetadata ( ) . equals ( instance . getMetadata ( ) ) ) {
properties . setMetadata ( instance . getMetadata ( ) ) ;
}
}
private Optional < Instance > selectCurrentInstance ( List < Instance > instances ) {
return instances . stream ( )
. filter ( instance - > properties . getIp ( ) . equals ( instance . getIp ( ) )
& & properties . getPort ( ) = = instance . getPort ( ) )
. findFirst ( ) ;
}
@Override
public void stop ( ) {
if ( this . running . compareAndSet ( true , false ) & & this . watchFuture ! = null ) {
// shutdown current user-thread,
// then the other daemon-threads will terminate automatic.
( ( ThreadPoolTaskScheduler ) this . taskScheduler ) . shutdown ( ) ;
this . watchFuture . cancel ( true ) ;
if ( this . running . compareAndSet ( true , false ) ) {
EventListener eventListener = listenerMap . get ( buildKey ( ) ) ;
NamingService namingService = nacosServiceManager
. getNamingService ( properties . getNacosProperties ( ) ) ;
try {
namingService . unsubscribe ( properties . getService ( ) , properties . getGroup ( ) ,
Arrays . asList ( properties . getClusterName ( ) ) , eventListener ) ;
}
catch ( NacosException e ) {
log . error ( "namingService unsubscribe failed, properties:{}" , properties ,
e ) ;
}
}
}
@ -124,12 +159,4 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
return 0 ;
}
public void nacosServicesWatch ( ) {
// nacos doesn't support watch now , publish an event every 30 seconds.
this . publisher . publishEvent (
new HeartbeatEvent ( this , nacosWatchIndex . getAndIncrement ( ) ) ) ;
}
}