support heartbeat event
parent
e6a1ba09fb
commit
c545885d1c
@ -0,0 +1,107 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alibaba.nacos.discovery;
|
||||
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.pojo.ListView;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.alibaba.nacos.NacosDiscoveryProperties;
|
||||
import org.springframework.cloud.alibaba.nacos.NacosServiceInstance;
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
import org.springframework.cloud.client.discovery.DiscoveryClient;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* @author xiaojing
|
||||
* @author renhaojun
|
||||
*/
|
||||
public class NacosDiscoveryClient implements DiscoveryClient {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory
|
||||
.getLogger(NacosDiscoveryClient.class);
|
||||
public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";
|
||||
|
||||
private NacosDiscoveryProperties discoveryProperties;
|
||||
|
||||
public NacosDiscoveryClient(NacosDiscoveryProperties discoveryProperties) {
|
||||
this.discoveryProperties = discoveryProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return DESCRIPTION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ServiceInstance> getInstances(String serviceId) {
|
||||
try {
|
||||
List<Instance> instances = discoveryProperties.namingServiceInstance()
|
||||
.selectInstances(serviceId, true);
|
||||
return hostToServiceInstanceList(instances, serviceId);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
"Can not get hosts from nacos server. serviceId: " + serviceId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static ServiceInstance hostToServiceInstance(Instance instance,
|
||||
String serviceId) {
|
||||
NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();
|
||||
nacosServiceInstance.setHost(instance.getIp());
|
||||
nacosServiceInstance.setPort(instance.getPort());
|
||||
nacosServiceInstance.setServiceId(serviceId);
|
||||
Map<String, String> metadata = new HashMap<>();
|
||||
metadata.put("instanceId", instance.getInstanceId());
|
||||
metadata.put("weight", instance.getWeight() + "");
|
||||
metadata.put("healthy", instance.isHealthy() + "");
|
||||
metadata.put("cluster", instance.getClusterName() + "");
|
||||
metadata.putAll(instance.getMetadata());
|
||||
nacosServiceInstance.setMetadata(metadata);
|
||||
|
||||
if (metadata.containsKey("secure")) {
|
||||
boolean secure = Boolean.parseBoolean(metadata.get("secure"));
|
||||
nacosServiceInstance.setSecure(secure);
|
||||
}
|
||||
return nacosServiceInstance;
|
||||
}
|
||||
|
||||
private static List<ServiceInstance> hostToServiceInstanceList(
|
||||
List<Instance> instances, String serviceId) {
|
||||
List<ServiceInstance> result = new ArrayList<>(instances.size());
|
||||
for (Instance instance : instances) {
|
||||
result.add(hostToServiceInstance(instance, serviceId));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getServices() {
|
||||
|
||||
try {
|
||||
ListView<String> services = discoveryProperties.namingServiceInstance()
|
||||
.getServicesOfServer(1, Integer.MAX_VALUE);
|
||||
return services.getData();
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.error("get service name from nacos server fail,", e);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Copyright (C) 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alibaba.nacos.discovery;
|
||||
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.alibaba.nacos.ConditionalOnNacosDiscoveryEnabled;
|
||||
import org.springframework.cloud.alibaba.nacos.NacosDiscoveryProperties;
|
||||
import org.springframework.cloud.alibaba.nacos.discovery.NacosDiscoveryClient;
|
||||
import org.springframework.cloud.alibaba.nacos.discovery.NacosWatch;
|
||||
import org.springframework.cloud.client.CommonsClientAutoConfiguration;
|
||||
import org.springframework.cloud.client.discovery.DiscoveryClient;
|
||||
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClientAutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author xiaojing
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnNacosDiscoveryEnabled
|
||||
@EnableConfigurationProperties
|
||||
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
|
||||
CommonsClientAutoConfiguration.class })
|
||||
public class NacosDiscoveryClientAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public DiscoveryClient nacosDiscoveryClient(
|
||||
NacosDiscoveryProperties discoveryProperties) {
|
||||
return new NacosDiscoveryClient(discoveryProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public NacosDiscoveryProperties nacosProperties() {
|
||||
return new NacosDiscoveryProperties();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) {
|
||||
return new NacosWatch(nacosDiscoveryProperties);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,174 @@
|
||||
/*
|
||||
* Copyright (C) 2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.alibaba.nacos.discovery;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.alibaba.nacos.api.naming.NamingService;
|
||||
import com.alibaba.nacos.api.naming.listener.EventListener;
|
||||
import com.alibaba.nacos.api.naming.pojo.ListView;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cloud.alibaba.nacos.NacosDiscoveryProperties;
|
||||
import org.springframework.cloud.client.discovery.event.HeartbeatEvent;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.context.ApplicationEventPublisherAware;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.scheduling.TaskScheduler;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
|
||||
/**
|
||||
* @author xiaojing
|
||||
*/
|
||||
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);
|
||||
|
||||
private final NacosDiscoveryProperties properties;
|
||||
|
||||
private final TaskScheduler taskScheduler;
|
||||
|
||||
private final AtomicLong nacosWatchIndex = new AtomicLong(0);
|
||||
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
|
||||
private ApplicationEventPublisher publisher;
|
||||
|
||||
private ScheduledFuture<?> watchFuture;
|
||||
|
||||
private Set<String> cacheServices = new HashSet<>();
|
||||
|
||||
private HashMap<String, EventListener> subscribeListeners = new HashMap<>();
|
||||
|
||||
public NacosWatch(NacosDiscoveryProperties properties) {
|
||||
this(properties, getTaskScheduler());
|
||||
}
|
||||
|
||||
public NacosWatch(NacosDiscoveryProperties properties, TaskScheduler taskScheduler) {
|
||||
this.properties = properties;
|
||||
this.taskScheduler = taskScheduler;
|
||||
}
|
||||
|
||||
private static ThreadPoolTaskScheduler getTaskScheduler() {
|
||||
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
|
||||
taskScheduler.initialize();
|
||||
return taskScheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
|
||||
this.publisher = publisher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
this.stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (this.running.compareAndSet(false, true)) {
|
||||
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
|
||||
this::nacosServicesWatch, this.properties.getWatchDelay());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (this.running.compareAndSet(true, false) && this.watchFuture != null) {
|
||||
this.watchFuture.cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public void nacosServicesWatch() {
|
||||
try {
|
||||
|
||||
boolean changed = false;
|
||||
NamingService namingService = properties.namingServiceInstance();
|
||||
|
||||
ListView<String> listView = properties.namingServiceInstance()
|
||||
.getServicesOfServer(1, Integer.MAX_VALUE);
|
||||
|
||||
List<String> serviceList = listView.getData();
|
||||
|
||||
// if there are new services found, publish event
|
||||
Set<String> currentServices = new HashSet<>(serviceList);
|
||||
currentServices.removeAll(cacheServices);
|
||||
if (currentServices.size() > 0) {
|
||||
changed = true;
|
||||
}
|
||||
|
||||
// if some services disappear, publish event
|
||||
if (cacheServices.removeAll(new HashSet<>(serviceList))
|
||||
&& cacheServices.size() > 0) {
|
||||
changed = true;
|
||||
|
||||
for (String serviceName : cacheServices) {
|
||||
namingService.unsubscribe(serviceName,
|
||||
subscribeListeners.get(serviceName));
|
||||
subscribeListeners.remove(serviceName);
|
||||
}
|
||||
}
|
||||
|
||||
cacheServices = new HashSet<>(serviceList);
|
||||
|
||||
// subscribe services's node change, publish event if nodes changed
|
||||
for (String serviceName : cacheServices) {
|
||||
if (!subscribeListeners.containsKey(serviceName)) {
|
||||
EventListener eventListener = event -> NacosWatch.this.publisher
|
||||
.publishEvent(new HeartbeatEvent(NacosWatch.this,
|
||||
nacosWatchIndex.getAndIncrement()));
|
||||
subscribeListeners.put(serviceName, eventListener);
|
||||
namingService.subscribe(serviceName, eventListener);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if (changed) {
|
||||
this.publisher.publishEvent(
|
||||
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
|
||||
}
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("Error watching Nacos Service change", e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue