[ISSUE#2868] Refactor nacos watch

pull/2881/head
ruansheng 2 years ago
parent 02ed8f3aae
commit f57fea67ba

@ -35,6 +35,7 @@ import org.springframework.context.annotation.Configuration;
/**
* @author xiaojing
* @author echooymxq
* @author ruansheng
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ -51,12 +52,28 @@ public class NacosDiscoveryClientConfiguration {
return new NacosDiscoveryClient(nacosServiceDiscovery);
}
/**
* NacosWatch is no longer enabled by default .
* see https://github.com/alibaba/spring-cloud-alibaba/issues/2868
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true)
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = false)
public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
}
/**
* Spring Cloud Gateway HeartBeat .
* publish an event every 30 seconds
* see https://github.com/alibaba/spring-cloud-alibaba/issues/2868
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.gateway.discovery.locator.enabled", matchIfMissing = false)
public NacosGatewayLocatorHeartBeat nacosGatewayHeartBeat(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosGatewayLocatorHeartBeat(nacosDiscoveryProperties);
}
}

@ -0,0 +1,102 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.nacos.discovery;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author yuhuangbin
* @author ruansheng
*/
public class NacosGatewayLocatorHeartBeat implements ApplicationEventPublisherAware, SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(NacosGatewayLocatorHeartBeat.class);
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private final ThreadPoolTaskScheduler taskScheduler;
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
private final AtomicBoolean running = new AtomicBoolean(false);
private ApplicationEventPublisher publisher;
private ScheduledFuture<?> watchFuture;
public NacosGatewayLocatorHeartBeat(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
this.taskScheduler = getTaskScheduler();
}
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Nacos-SCG-HeartBeat-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
}
@Override
public void start() {
log.info("Start nacos gateway locator heartBeat task scheduler.");
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::publishHeartBeat, this.nacosDiscoveryProperties.getWatchDelay());
}
@Override
public void stop() {
if (this.watchFuture != null) {
// shutdown current user-thread,
// then the other daemon-threads will terminate automatic.
this.taskScheduler.shutdown();
this.watchFuture.cancel(true);
}
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public boolean isRunning() {
return this.running.get();
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
/**
* nacos doesn't support watch now , publish an event every 30 seconds.
*/
public void publishHeartBeat() {
HeartbeatEvent event = new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement());
this.publisher.publishEvent(event);
}
}

@ -21,9 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager;
@ -36,65 +34,30 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author xiaojing
* @author yuhuangbin
* @author pengfei.lu
* @author ruansheng
*/
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean {
public class NacosWatch implements SmartLifecycle, DisposableBean {
private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);
private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
private final Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
private ApplicationEventPublisher publisher;
private ScheduledFuture<?> watchFuture;
private final NacosServiceManager nacosServiceManager;
private final NacosDiscoveryProperties properties;
private final ThreadPoolTaskScheduler taskScheduler;
public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties) {
this.nacosServiceManager = nacosServiceManager;
this.properties = properties;
this.taskScheduler = getTaskScheduler();
}
@Deprecated
public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties,
ObjectProvider<ThreadPoolTaskScheduler> taskScheduler) {
this.nacosServiceManager = nacosServiceManager;
this.properties = properties;
this.taskScheduler = taskScheduler.stream().findAny()
.orElseGet(NacosWatch::getTaskScheduler);
}
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
@Override
@ -136,8 +99,6 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
log.error("namingService subscribe failed, properties:{}", properties, e);
}
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
@ -161,12 +122,6 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
if (this.watchFuture != null) {
// shutdown current user-thread,
// then the other daemon-threads will terminate automatic.
this.taskScheduler.shutdown();
this.watchFuture.cancel(true);
}
EventListener eventListener = listenerMap.get(buildKey());
try {
@ -191,14 +146,6 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
return 0;
}
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
@Override
public void destroy() {
this.stop();

Loading…
Cancel
Save