|
|
|
@ -16,21 +16,28 @@
|
|
|
|
|
|
|
|
|
|
package com.alibaba.cloud.nacos.discovery;
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
|
|
|
|
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
|
|
|
|
|
import com.alibaba.cloud.nacos.NacosServiceManager;
|
|
|
|
|
import com.alibaba.nacos.api.exception.NacosException;
|
|
|
|
|
import com.alibaba.nacos.api.naming.NamingService;
|
|
|
|
|
import com.alibaba.nacos.api.naming.listener.Event;
|
|
|
|
|
import com.alibaba.nacos.api.naming.listener.EventListener;
|
|
|
|
|
import com.alibaba.nacos.api.naming.listener.NamingEvent;
|
|
|
|
|
import com.alibaba.nacos.api.naming.pojo.Instance;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
import org.springframework.beans.factory.ObjectProvider;
|
|
|
|
|
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
|
|
|
|
|
import org.springframework.context.ApplicationEventPublisher;
|
|
|
|
|
import org.springframework.context.ApplicationEventPublisherAware;
|
|
|
|
|
import org.springframework.context.SmartLifecycle;
|
|
|
|
|
import org.springframework.scheduling.TaskScheduler;
|
|
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author xiaojing
|
|
|
|
@ -39,44 +46,20 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
|
|
|
|
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);
|
|
|
|
|
|
|
|
|
|
private final NacosDiscoveryProperties properties;
|
|
|
|
|
|
|
|
|
|
private final TaskScheduler taskScheduler;
|
|
|
|
|
|
|
|
|
|
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
|
|
|
|
|
private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
|
|
|
|
|
|
|
|
|
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
|
|
|
|
|
|
|
|
|
private ApplicationEventPublisher publisher;
|
|
|
|
|
|
|
|
|
|
private ScheduledFuture<?> watchFuture;
|
|
|
|
|
private NacosServiceManager nacosServiceManager;
|
|
|
|
|
|
|
|
|
|
public NacosWatch(NacosDiscoveryProperties properties) {
|
|
|
|
|
this(properties, getTaskScheduler());
|
|
|
|
|
}
|
|
|
|
|
private final NacosDiscoveryProperties properties;
|
|
|
|
|
|
|
|
|
|
public NacosWatch(NacosDiscoveryProperties properties, TaskScheduler taskScheduler) {
|
|
|
|
|
public NacosWatch(NacosServiceManager nacosServiceManager,
|
|
|
|
|
NacosDiscoveryProperties properties) {
|
|
|
|
|
this.nacosServiceManager = nacosServiceManager;
|
|
|
|
|
this.properties = properties;
|
|
|
|
|
this.taskScheduler = taskScheduler;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The constructor with {@link NacosDiscoveryProperties} bean and the optional.
|
|
|
|
|
* {@link TaskScheduler} bean
|
|
|
|
|
* @param properties {@link NacosDiscoveryProperties} bean
|
|
|
|
|
* @param taskScheduler the optional {@link TaskScheduler} bean
|
|
|
|
|
* @since 2.2.0
|
|
|
|
|
*/
|
|
|
|
|
public NacosWatch(NacosDiscoveryProperties properties,
|
|
|
|
|
ObjectProvider<TaskScheduler> taskScheduler) {
|
|
|
|
|
this(properties, taskScheduler.getIfAvailable(NacosWatch::getTaskScheduler));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static ThreadPoolTaskScheduler getTaskScheduler() {
|
|
|
|
|
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
|
|
|
|
taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
|
|
|
|
|
taskScheduler.initialize();
|
|
|
|
|
return taskScheduler;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@ -98,19 +81,67 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
|
|
|
|
|
@Override
|
|
|
|
|
public void start() {
|
|
|
|
|
if (this.running.compareAndSet(false, true)) {
|
|
|
|
|
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
|
|
|
|
|
this::nacosServicesWatch, this.properties.getWatchDelay());
|
|
|
|
|
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
|
|
|
|
|
event -> new EventListener() {
|
|
|
|
|
@Override
|
|
|
|
|
public void onEvent(Event event) {
|
|
|
|
|
if (event instanceof NamingEvent) {
|
|
|
|
|
List<Instance> instances = ((NamingEvent) event)
|
|
|
|
|
.getInstances();
|
|
|
|
|
Instance currentInstance = selectCurrentInstance(
|
|
|
|
|
instances);
|
|
|
|
|
if (Objects.nonNull(currentInstance)) {
|
|
|
|
|
resetIfNeeded(currentInstance);
|
|
|
|
|
publisher.publishEvent(
|
|
|
|
|
new HeartbeatEvent(this, currentInstance));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
NamingService namingService = nacosServiceManager
|
|
|
|
|
.getNamingService(properties.getNacosProperties());
|
|
|
|
|
try {
|
|
|
|
|
namingService.subscribe(properties.getService(), properties.getGroup(),
|
|
|
|
|
Arrays.asList(properties.getClusterName()), eventListener);
|
|
|
|
|
}
|
|
|
|
|
catch (Exception e) {
|
|
|
|
|
log.error("namingService subscribe failed, properties:{}", properties, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String buildKey() {
|
|
|
|
|
return String.join(":", properties.getService(), properties.getGroup());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void resetIfNeeded(Instance instance) {
|
|
|
|
|
if (!properties.getMetadata().equals(instance.getMetadata())) {
|
|
|
|
|
properties.setMetadata(instance.getMetadata());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Instance selectCurrentInstance(List<Instance> instances) {
|
|
|
|
|
return instances.stream()
|
|
|
|
|
.filter(instance -> properties.getIp().equals(instance.getIp())
|
|
|
|
|
&& properties.getPort() == instance.getPort())
|
|
|
|
|
.findFirst().orElse(null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void stop() {
|
|
|
|
|
if (this.running.compareAndSet(true, false) && this.watchFuture != null) {
|
|
|
|
|
// shutdown current user-thread,
|
|
|
|
|
// then the other daemon-threads will terminate automatic.
|
|
|
|
|
((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
|
|
|
|
|
|
|
|
|
|
this.watchFuture.cancel(true);
|
|
|
|
|
if (this.running.compareAndSet(true, false)) {
|
|
|
|
|
EventListener eventListener = listenerMap.get(buildKey());
|
|
|
|
|
NamingService namingService = nacosServiceManager
|
|
|
|
|
.getNamingService(properties.getNacosProperties());
|
|
|
|
|
try {
|
|
|
|
|
namingService.unsubscribe(properties.getService(), properties.getGroup(),
|
|
|
|
|
Arrays.asList(properties.getClusterName()), eventListener);
|
|
|
|
|
}
|
|
|
|
|
catch (NacosException e) {
|
|
|
|
|
log.error("namingService unsubscribe failed, properties:{}", properties,
|
|
|
|
|
e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -124,12 +155,4 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void nacosServicesWatch() {
|
|
|
|
|
|
|
|
|
|
// nacos doesn't support watch now , publish an event every 30 seconds.
|
|
|
|
|
this.publisher.publishEvent(
|
|
|
|
|
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|