Merge pull request #2881 from ruansheng8/feature-nacosWatch-2021.x

Refactor NacosWatch and add NacosGatewayLocatorHeartBeat
pull/2910/head
Steve Rao 2 years ago committed by GitHub
commit f33eaded1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,102 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.nacos.discovery;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author yuhuangbin
* @author ruansheng
*/
public class GatewayLocatorHeartBeatPublisher implements ApplicationEventPublisherAware, SmartLifecycle {
private static final Logger log = LoggerFactory.getLogger(GatewayLocatorHeartBeatPublisher.class);
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private final ThreadPoolTaskScheduler taskScheduler;
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
private final AtomicBoolean running = new AtomicBoolean(false);
private ApplicationEventPublisher publisher;
private ScheduledFuture<?> watchFuture;
public GatewayLocatorHeartBeatPublisher(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
this.taskScheduler = getTaskScheduler();
}
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("HeartBeat-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
}
@Override
public void start() {
log.info("Start nacos gateway locator heartBeat task scheduler.");
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::publishHeartBeat, this.nacosDiscoveryProperties.getWatchDelay());
}
@Override
public void stop() {
if (this.watchFuture != null) {
// shutdown current user-thread,
// then the other daemon-threads will terminate automatic.
this.taskScheduler.shutdown();
this.watchFuture.cancel(true);
}
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public boolean isRunning() {
return this.running.get();
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
/**
* nacos doesn't support watch now , publish an event every 30 seconds.
*/
public void publishHeartBeat() {
HeartbeatEvent event = new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement());
this.publisher.publishEvent(event);
}
}

@ -35,6 +35,7 @@ import org.springframework.context.annotation.Configuration;
/**
* @author xiaojing
* @author echooymxq
* @author ruansheng
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ -51,12 +52,28 @@ public class NacosDiscoveryClientConfiguration {
return new NacosDiscoveryClient(nacosServiceDiscovery);
}
/**
* NacosWatch is no longer enabled by default .
* see https://github.com/alibaba/spring-cloud-alibaba/issues/2868
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true)
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = false)
public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
}
/**
* Spring Cloud Gateway HeartBeat .
* publish an event every 30 seconds
* see https://github.com/alibaba/spring-cloud-alibaba/issues/2868
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.gateway.discovery.locator.enabled", matchIfMissing = false)
public GatewayLocatorHeartBeatPublisher gatewayLocatorHeartBeatPublisher(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new GatewayLocatorHeartBeatPublisher(nacosDiscoveryProperties);
}
}

@ -21,9 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager;
@ -36,65 +34,30 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author xiaojing
* @author yuhuangbin
* @author pengfei.lu
* @author ruansheng
*/
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean {
public class NacosWatch implements SmartLifecycle, DisposableBean {
private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);
private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
private final Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
private ApplicationEventPublisher publisher;
private ScheduledFuture<?> watchFuture;
private final NacosServiceManager nacosServiceManager;
private final NacosDiscoveryProperties properties;
private final ThreadPoolTaskScheduler taskScheduler;
public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties) {
this.nacosServiceManager = nacosServiceManager;
this.properties = properties;
this.taskScheduler = getTaskScheduler();
}
@Deprecated
public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties,
ObjectProvider<ThreadPoolTaskScheduler> taskScheduler) {
this.nacosServiceManager = nacosServiceManager;
this.properties = properties;
this.taskScheduler = taskScheduler.stream().findAny()
.orElseGet(NacosWatch::getTaskScheduler);
}
private static ThreadPoolTaskScheduler getTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
taskScheduler.initialize();
return taskScheduler;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
@Override
@ -136,8 +99,6 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
log.error("namingService subscribe failed, properties:{}", properties, e);
}
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
@ -161,12 +122,6 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
if (this.watchFuture != null) {
// shutdown current user-thread,
// then the other daemon-threads will terminate automatic.
this.taskScheduler.shutdown();
this.watchFuture.cancel(true);
}
EventListener eventListener = listenerMap.get(buildKey());
try {
@ -191,14 +146,6 @@ public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycl
return 0;
}
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
@Override
public void destroy() {
this.stop();

@ -44,7 +44,7 @@
{
"name": "spring.cloud.nacos.discovery.watch.enabled",
"type": "java.lang.Boolean",
"defaultValue": "true",
"defaultValue": "false",
"description": "enable nacos discovery watch or not ."
},
{

@ -56,7 +56,8 @@ public class NacosDiscoveryClientConfigurationTest {
public void testDefaultInitialization() {
contextRunner.run(context -> {
assertThat(context).hasSingleBean(DiscoveryClient.class);
assertThat(context).hasSingleBean(NacosWatch.class);
// NacosWatch is no longer enabled by default
assertThat(context).doesNotHaveBean(NacosWatch.class);
});
}
@ -69,4 +70,26 @@ public class NacosDiscoveryClientConfigurationTest {
});
}
@Test
public void testNacosWatchEnabled() {
contextRunner.withPropertyValues("spring.cloud.nacos.discovery.watch.enabled=true")
.run(context -> assertThat(context).hasSingleBean(NacosWatch.class));
}
@Test
public void testDefaultGatewayLocatorHeartBeatPublisher() {
contextRunner.run(context ->
assertThat(context).doesNotHaveBean(GatewayLocatorHeartBeatPublisher.class)
);
}
@Test
public void testGatewayLocatorHeartBeatPublisherEnabled() {
contextRunner
.withPropertyValues("spring.cloud.gateway.discovery.locator.enabled=true")
.run(context ->
assertThat(context).hasSingleBean(GatewayLocatorHeartBeatPublisher.class)
);
}
}

Loading…
Cancel
Save