diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfiguration.java index 612345165..af722dda2 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfiguration.java @@ -18,8 +18,8 @@ package com.alibaba.cloud.nacos.discovery; import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; +import com.alibaba.cloud.nacos.NacosServiceManager; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -31,7 +31,6 @@ import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.TaskScheduler; /** * @author xiaojing @@ -56,9 +55,9 @@ public class NacosDiscoveryClientConfiguration { @ConditionalOnMissingBean @ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true) - public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties, - ObjectProvider taskScheduler) { - return new NacosWatch(nacosDiscoveryProperties, taskScheduler); + public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager, + NacosDiscoveryProperties nacosDiscoveryProperties) { + return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties); } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java index b5dfc8e3a..460191138 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/discovery/NacosWatch.java @@ -16,21 +16,28 @@ package com.alibaba.cloud.nacos.discovery; -import java.util.concurrent.ScheduledFuture; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; +import com.alibaba.cloud.nacos.NacosServiceManager; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.Event; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; +import com.alibaba.nacos.api.naming.pojo.Instance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.cloud.client.discovery.event.HeartbeatEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.SmartLifecycle; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * @author xiaojing @@ -39,44 +46,20 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl private static final Logger log = LoggerFactory.getLogger(NacosWatch.class); - private final NacosDiscoveryProperties properties; - - private final TaskScheduler taskScheduler; - - private final AtomicLong nacosWatchIndex = new AtomicLong(0); + private Map listenerMap = new ConcurrentHashMap<>(16); private final AtomicBoolean running = new AtomicBoolean(false); private ApplicationEventPublisher publisher; - private ScheduledFuture watchFuture; + private NacosServiceManager nacosServiceManager; - public NacosWatch(NacosDiscoveryProperties properties) { - this(properties, getTaskScheduler()); - } + private final NacosDiscoveryProperties properties; - public NacosWatch(NacosDiscoveryProperties properties, TaskScheduler taskScheduler) { + public NacosWatch(NacosServiceManager nacosServiceManager, + NacosDiscoveryProperties properties) { + this.nacosServiceManager = nacosServiceManager; this.properties = properties; - this.taskScheduler = taskScheduler; - } - - /** - * The constructor with {@link NacosDiscoveryProperties} bean and the optional. - * {@link TaskScheduler} bean - * @param properties {@link NacosDiscoveryProperties} bean - * @param taskScheduler the optional {@link TaskScheduler} bean - * @since 2.2.0 - */ - public NacosWatch(NacosDiscoveryProperties properties, - ObjectProvider taskScheduler) { - this(properties, taskScheduler.getIfAvailable(NacosWatch::getTaskScheduler)); - } - - private static ThreadPoolTaskScheduler getTaskScheduler() { - ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); - taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler"); - taskScheduler.initialize(); - return taskScheduler; } @Override @@ -98,19 +81,67 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl @Override public void start() { if (this.running.compareAndSet(false, true)) { - this.watchFuture = this.taskScheduler.scheduleWithFixedDelay( - this::nacosServicesWatch, this.properties.getWatchDelay()); + EventListener eventListener = listenerMap.computeIfAbsent(buildKey(), + event -> new EventListener() { + @Override + public void onEvent(Event event) { + if (event instanceof NamingEvent) { + List instances = ((NamingEvent) event) + .getInstances(); + Instance currentInstance = selectCurrentInstance( + instances); + if (Objects.nonNull(currentInstance)) { + resetIfNeeded(currentInstance); + publisher.publishEvent( + new HeartbeatEvent(this, currentInstance)); + } + } + } + }); + + NamingService namingService = nacosServiceManager + .getNamingService(properties.getNacosProperties()); + try { + namingService.subscribe(properties.getService(), properties.getGroup(), + Arrays.asList(properties.getClusterName()), eventListener); + } + catch (Exception e) { + log.error("namingService subscribe failed, properties:{}", properties, e); + } } } + private String buildKey() { + return String.join(":", properties.getService(), properties.getGroup()); + } + + private void resetIfNeeded(Instance instance) { + if (!properties.getMetadata().equals(instance.getMetadata())) { + properties.setMetadata(instance.getMetadata()); + } + } + + private Instance selectCurrentInstance(List instances) { + return instances.stream() + .filter(instance -> properties.getIp().equals(instance.getIp()) + && properties.getPort() == instance.getPort()) + .findFirst().orElse(null); + } + @Override public void stop() { - if (this.running.compareAndSet(true, false) && this.watchFuture != null) { - // shutdown current user-thread, - // then the other daemon-threads will terminate automatic. - ((ThreadPoolTaskScheduler) this.taskScheduler).shutdown(); - - this.watchFuture.cancel(true); + if (this.running.compareAndSet(true, false)) { + EventListener eventListener = listenerMap.get(buildKey()); + NamingService namingService = nacosServiceManager + .getNamingService(properties.getNacosProperties()); + try { + namingService.unsubscribe(properties.getService(), properties.getGroup(), + Arrays.asList(properties.getClusterName()), eventListener); + } + catch (NacosException e) { + log.error("namingService unsubscribe failed, properties:{}", properties, + e); + } } } @@ -124,12 +155,4 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl return 0; } - public void nacosServicesWatch() { - - // nacos doesn't support watch now , publish an event every 30 seconds. - this.publisher.publishEvent( - new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement())); - - } - }