Merge pull request #1729 from yuhuangbin/master

[Enhancement] NacosWatch keep publish HeartbeatEvent
pull/1736/head
Mercy Ma 4 years ago committed by GitHub
commit c0b056a90e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,6 +20,7 @@ import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager; import com.alibaba.cloud.nacos.NacosServiceManager;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -31,6 +32,7 @@ import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration; import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
/** /**
* @author xiaojing * @author xiaojing
@ -56,8 +58,10 @@ public class NacosDiscoveryClientConfiguration {
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", @ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
matchIfMissing = true) matchIfMissing = true)
public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager, public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) { NacosDiscoveryProperties nacosDiscoveryProperties,
return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties); ObjectProvider<TaskScheduler> taskExecutorObjectProvider) {
return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties,
taskExecutorObjectProvider);
} }
} }

@ -54,7 +54,8 @@ public class NacosServiceDiscovery {
*/ */
public List<ServiceInstance> getInstances(String serviceId) throws NacosException { public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
String group = discoveryProperties.getGroup(); String group = discoveryProperties.getGroup();
List<Instance> instances = namingService().selectInstances(serviceId, group, true); List<Instance> instances = namingService().selectInstances(serviceId, group,
true);
return hostToServiceInstanceList(instances, serviceId); return hostToServiceInstanceList(instances, serviceId);
} }
@ -111,7 +112,8 @@ public class NacosServiceDiscovery {
} }
private NamingService namingService() { private NamingService namingService() {
return nacosServiceManager.getNamingService(discoveryProperties.getNacosProperties()); return nacosServiceManager
.getNamingService(discoveryProperties.getNacosProperties());
} }
} }

@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -35,10 +36,13 @@ import com.alibaba.nacos.api.naming.pojo.Instance;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent; import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/** /**
* @author xiaojing * @author xiaojing
@ -56,14 +60,27 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
private ApplicationEventPublisher publisher; private ApplicationEventPublisher publisher;
private ScheduledFuture<?> watchFuture;
private NacosServiceManager nacosServiceManager; private NacosServiceManager nacosServiceManager;
private final NacosDiscoveryProperties properties; private final NacosDiscoveryProperties properties;
private final TaskScheduler taskScheduler;
public NacosWatch(NacosServiceManager nacosServiceManager, public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties) { NacosDiscoveryProperties properties,
ObjectProvider<TaskScheduler> taskScheduler) {
this.nacosServiceManager = nacosServiceManager; this.nacosServiceManager = nacosServiceManager;
this.properties = properties; this.properties = properties;
this.taskScheduler = taskScheduler.getIfAvailable(NacosWatch::getTaskScheduler);
}
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
} }
@Override @Override
@ -97,8 +114,6 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
instanceOptional.ifPresent(currentInstance -> { instanceOptional.ifPresent(currentInstance -> {
resetIfNeeded(currentInstance); resetIfNeeded(currentInstance);
}); });
publisher.publishEvent(new HeartbeatEvent(NacosWatch.this,
nacosWatchIndex.getAndIncrement()));
} }
} }
}); });
@ -112,6 +127,9 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
catch (Exception e) { catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e); log.error("namingService subscribe failed, properties:{}", properties, e);
} }
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
} }
} }
@ -135,10 +153,17 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
@Override @Override
public void stop() { public void stop() {
if (this.running.compareAndSet(true, false)) { if (this.running.compareAndSet(true, false)) {
if (this.watchFuture != null) {
// shutdown current user-thread,
// then the other daemon-threads will terminate automatic.
((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
this.watchFuture.cancel(true);
}
EventListener eventListener = listenerMap.get(buildKey()); EventListener eventListener = listenerMap.get(buildKey());
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
try { try {
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
namingService.unsubscribe(properties.getService(), properties.getGroup(), namingService.unsubscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener); Arrays.asList(properties.getClusterName()), eventListener);
} }
@ -159,4 +184,12 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
return 0; return 0;
} }
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
} }

Loading…
Cancel
Save