From 8b1f78c237f3409a0e071dd2f92054c78cc4ac0a Mon Sep 17 00:00:00 2001 From: mercyblitz Date: Thu, 1 Aug 2019 09:58:34 +0800 Subject: [PATCH] Polish spring-cloud-incubator/spring-cloud-alibaba#623 : Optimize implementation and reuse the HeartbeatEvent background thread --- ...ubboServiceDiscoveryAutoConfiguration.java | 389 ++++++++++++++---- .../DubboServiceMetadataRepository.java | 53 +-- .../registry/AbstractSpringCloudRegistry.java | 6 + .../event/ServiceInstancesChangedEvent.java | 28 +- 4 files changed, 372 insertions(+), 104 deletions(-) diff --git a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/autoconfigure/DubboServiceDiscoveryAutoConfiguration.java b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/autoconfigure/DubboServiceDiscoveryAutoConfiguration.java index 213e97594..c62338192 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/autoconfigure/DubboServiceDiscoveryAutoConfiguration.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/autoconfigure/DubboServiceDiscoveryAutoConfiguration.java @@ -16,45 +16,64 @@ */ package com.alibaba.cloud.dubbo.autoconfigure; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.registry.NotifyListener; + +import com.alibaba.cloud.dubbo.env.DubboCloudProperties; import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository; import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent; +import com.alibaba.cloud.nacos.NacosDiscoveryProperties; +import com.alibaba.cloud.nacos.discovery.NacosWatch; import com.netflix.discovery.CacheRefreshedEvent; +import com.netflix.discovery.shared.Applications; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.zookeeper.Watcher; +import org.aspectj.lang.annotation.After; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.AutoConfigureOrder; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.event.HeartbeatEvent; +import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent; +import org.springframework.cloud.consul.discovery.ConsulCatalogWatch; import org.springframework.cloud.netflix.eureka.CloudEurekaClient; import org.springframework.cloud.zookeeper.discovery.ZookeeperDiscoveryProperties; -import org.springframework.cloud.zookeeper.discovery.dependency.ZookeeperDependencies; +import org.springframework.cloud.zookeeper.discovery.ZookeeperServiceWatch; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationListener; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.util.AntPathMatcher; import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Predicate; import static com.alibaba.cloud.dubbo.autoconfigure.DubboServiceDiscoveryAutoConfiguration.CONSUL_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME; +import static com.alibaba.cloud.dubbo.autoconfigure.DubboServiceDiscoveryAutoConfiguration.NACOS_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME; import static com.alibaba.cloud.dubbo.autoconfigure.DubboServiceDiscoveryAutoConfiguration.ZOOKEEPER_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME; import static com.alibaba.cloud.dubbo.autoconfigure.DubboServiceRegistrationAutoConfiguration.EUREKA_CLIENT_AUTO_CONFIGURATION_CLASS_NAME; -import static org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged; -import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged; import static org.springframework.util.StringUtils.hasText; /** @@ -70,7 +89,8 @@ import static org.springframework.util.StringUtils.hasText; @AutoConfigureAfter(name = { EUREKA_CLIENT_AUTO_CONFIGURATION_CLASS_NAME, ZOOKEEPER_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME, - CONSUL_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME + CONSUL_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME, + NACOS_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME }, value = {DubboServiceRegistrationAutoConfiguration.class}) public class DubboServiceDiscoveryAutoConfiguration { @@ -78,6 +98,8 @@ public class DubboServiceDiscoveryAutoConfiguration { public static final String CONSUL_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME = "org.springframework.cloud.consul.discovery.ConsulDiscoveryClientConfiguration"; + public static final String NACOS_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME = "com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration"; + private final DubboServiceMetadataRepository dubboServiceMetadataRepository; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -86,12 +108,21 @@ public class DubboServiceDiscoveryAutoConfiguration { private final DiscoveryClient discoveryClient; + private final AtomicReference heartbeatState = new AtomicReference<>(); + + /** + * @see #defaultHeartbeatEventChangePredicate() + */ + private final ObjectProvider> heartbeatEventChangedPredicate; + public DubboServiceDiscoveryAutoConfiguration(DubboServiceMetadataRepository dubboServiceMetadataRepository, ApplicationEventPublisher applicationEventPublisher, - DiscoveryClient discoveryClient) { + DiscoveryClient discoveryClient, + ObjectProvider> heartbeatEventChangedPredicate) { this.dubboServiceMetadataRepository = dubboServiceMetadataRepository; this.applicationEventPublisher = applicationEventPublisher; this.discoveryClient = discoveryClient; + this.heartbeatEventChangedPredicate = heartbeatEventChangedPredicate; } @@ -118,119 +149,319 @@ public class DubboServiceDiscoveryAutoConfiguration { return discoveryClient.getInstances(serviceName); } + /** + * Dispatch a {@link ServiceInstancesChangedEvent} when the {@link HeartbeatEvent} raised, use for these scenarios: + *
    + *
  • Eureka : {@link CloudEurekaClient#onCacheRefreshed()} publishes a {@link HeartbeatEvent} instead of {@link CacheRefreshedEvent}
  • + *
  • Zookeeper : {@link ZookeeperServiceWatch#childEvent(CuratorFramework, TreeCacheEvent)} publishes a + * {@link HeartbeatEvent} when {@link ZookeeperDiscoveryProperties#getRoot() the services' root path} has been changed
  • + *
  • Consul : {@link ConsulCatalogWatch#catalogServicesWatch()} publishes a {@link HeartbeatEvent} when + * Consul's Blocking Queries response + *
  • + *
  • Nacos : {@link NacosWatch#nacosServicesWatch()} publishes a {@link HeartbeatEvent} under a + * {@link TaskScheduler} every {@link NacosDiscoveryProperties#getWatchDelay()} milliseconds
  • + *
+ *

+ * In order to reduce the duplicated handling for {@link ServiceInstancesChangedEvent}, + * {@link #defaultHeartbeatEventChangePredicate()} method providers the default implementation to detect whether the + * {@link HeartbeatEvent#getValue() state} is changed or not. If and only if changed, the + * {@link #dispatchServiceInstancesChangedEvent(String, Collection)} will be executed. + *

+ * Note : + * Spring Cloud {@link HeartbeatEvent} has a critical flaw that can't figure out which service was changed precisely, + * it's just used for a notification that the + * {@link DubboServiceMetadataRepository#getSubscribedServices() subscribed services} used to + * {@link NotifyListener#notify(List) notify} the Dubbo consumer cached {@link URL URLs}. + * Because of some {@link DiscoveryClient} implementations have the better and fine-grained the event mechanism for service instances + * change, thus {@link HeartbeatEvent} handle will be ignored in these scenarios: + *

    + *
  • Zookeeper : {@link Watcher}
  • + *
  • Nacos : {@link com.alibaba.nacos.api.naming.listener.EventListener}
  • + *
+ *

+ * If the customized {@link DiscoveryClient} also providers the similar mechanism, the implementation could declare + * a Spring Bean of {@link Predicate} of {@link HeartbeatEvent} to override + * {@link #defaultHeartbeatEventChangePredicate() default one}. + * + * @param event the instance of {@link HeartbeatEvent} + * @see HeartbeatEvent + */ + @EventListener(HeartbeatEvent.class) + public void onHeartbeatEvent(HeartbeatEvent event) { + heartbeatEventChangedPredicate.ifAvailable(predicate -> { + if (predicate.test(event)) { + /** + * Try to re-initialize the subscribed services, in order to sense the change of services if + * {@link DubboCloudProperties#getSubscribedServices()} is wildcard that indicates all services should be + * subscribed. + */ + dubboServiceMetadataRepository.initSubscribedServices(); + // Dispatch ServiceInstancesChangedEvent + forEachSubscribedServices(serviceName -> { + List serviceInstances = getInstances(serviceName); + dispatchServiceInstancesChangedEvent(serviceName, serviceInstances); + }); + } + }); + } + + /** + * The default {@link Predicate} implementation of {@link HeartbeatEvent} based on the comparison between + * previous and current {@link HeartbeatEvent#getValue() state value}, the {@link DiscoveryClient} + * implementation may override current. + * + * @return {@link Predicate} + * @see EurekaConfiguration#heartbeatEventChangedPredicate() + */ + @Bean + @ConditionalOnMissingBean + public Predicate defaultHeartbeatEventChangePredicate() { + return event -> { + Object oldState = heartbeatState.get(); + Object newState = event.getValue(); + return heartbeatState.compareAndSet(oldState, newState) && !Objects.equals(oldState, newState); + }; + } + + /** + * Eureka Customized Configuration + */ @Configuration @ConditionalOnBean(name = EUREKA_CLIENT_AUTO_CONFIGURATION_CLASS_NAME) - class EurekaConfiguration { + public class EurekaConfiguration { + + private final AtomicReference appsHashCodeCache = new AtomicReference<>(); /** - * Dispatch a {@link ServiceInstancesChangedEvent} when the {@link HeartbeatEvent} raised - *

- * {@link CloudEurekaClient#onCacheRefreshed()} publishes a HeartbeatEvent instead of {@link CacheRefreshedEvent} + * Compare {@link Applications#getAppsHashCode() apps hash code} between last {@link Applications} and current. * - * @param event the {@link HeartbeatEvent} instance - * @see HeartbeatEvent - * @see CloudEurekaClient#onCacheRefreshed() - * @see CacheRefreshedEvent + * @see Applications#getAppsHashCode() */ - @EventListener(HeartbeatEvent.class) - public void onHeartbeatEvent(HeartbeatEvent event) { - forEachSubscribedServices(serviceName -> { - List serviceInstances = getInstances(serviceName); - dispatchServiceInstancesChangedEvent(serviceName, serviceInstances); - }); + @Bean + public Predicate heartbeatEventChangedPredicate() { + return event -> { + String oldAppsHashCode = appsHashCodeCache.get(); + CloudEurekaClient cloudEurekaClient = (CloudEurekaClient) event.getSource(); + Applications applications = cloudEurekaClient.getApplications(); + String appsHashCode = applications.getAppsHashCode(); + return appsHashCodeCache.compareAndSet(oldAppsHashCode, appsHashCode) && + !Objects.equals(oldAppsHashCode, appsHashCode); + }; } - } + /** + * Zookeeper Customized Configuration + */ @Configuration @ConditionalOnBean(name = ZOOKEEPER_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME) - class ZookeeperConfiguration { + @Aspect + public class ZookeeperConfiguration implements ApplicationListener { + /** + * The pointcut expression for {@link ZookeeperServiceWatch#childEvent(CuratorFramework, TreeCacheEvent)} + */ + public static final String CHILD_EVENT_POINTCUT_EXPRESSION = + "execution(void org.springframework.cloud.zookeeper.discovery.ZookeeperServiceWatch.childEvent(..)) && args(client,event)"; /** - * The Key is watched Zookeeper path, the value is an instance of {@link CuratorWatcher} + * The path separator of Zookeeper node */ - private final Map watcherCaches = new ConcurrentHashMap<>(); + public static final String NODE_PATH_SEPARATOR = "/"; - private final ZookeeperDiscoveryProperties zookeeperDiscoveryProperties; + /** + * The path variable name for the name of service + */ + private static final String SERVICE_NAME_PATH_VARIABLE_NAME = "serviceName"; - private final ObjectProvider zookeeperDependencies; + /** + * The path variable name for the id of {@link ServiceInstance service instance} + */ + private static final String SERVICE_INSTANCE_ID_PATH_VARIABLE_NAME = "serviceInstanceId"; - private final CuratorFramework curatorFramework; + private final ZookeeperServiceWatch zookeeperServiceWatch; private final String rootPath; - ZookeeperConfiguration(ZookeeperDiscoveryProperties zookeeperDiscoveryProperties, - ObjectProvider zookeeperDependencies, - CuratorFramework curatorFramework) { - this.zookeeperDiscoveryProperties = zookeeperDiscoveryProperties; - this.zookeeperDependencies = zookeeperDependencies; - this.curatorFramework = curatorFramework; + private final AntPathMatcher pathMatcher; + + /** + * Ant Path pattern for {@link ServiceInstance} : + *

+ *

+ * ${{@link #rootPath}}/{serviceName}/{serviceInstanceId} + * + * @see #rootPath + * @see #SERVICE_NAME_PATH_VARIABLE_NAME + * @see #SERVICE_INSTANCE_ID_PATH_VARIABLE_NAME + */ + private final String serviceInstancePathPattern; + + /** + * The {@link ThreadLocal} holds the processed service name + */ + private final ThreadLocal processedServiceNameThreadLocal; + + ZookeeperConfiguration( + ZookeeperDiscoveryProperties zookeeperDiscoveryProperties, + ZookeeperServiceWatch zookeeperServiceWatch) { + this.zookeeperServiceWatch = zookeeperServiceWatch; this.rootPath = zookeeperDiscoveryProperties.getRoot(); + this.pathMatcher = new AntPathMatcher(NODE_PATH_SEPARATOR); + this.serviceInstancePathPattern = rootPath + + NODE_PATH_SEPARATOR + "{" + SERVICE_NAME_PATH_VARIABLE_NAME + "}" + + NODE_PATH_SEPARATOR + "{" + SERVICE_INSTANCE_ID_PATH_VARIABLE_NAME + "}"; + this.processedServiceNameThreadLocal = new ThreadLocal<>(); } - @EventListener(ContextRefreshedEvent.class) - public void onContextRefreshedEvent(ContextRefreshedEvent event) { - forEachSubscribedServices(this::registerServiceWatcher); + /** + * Zookeeper uses {@link TreeCacheEvent} to trigger {@link #dispatchServiceInstancesChangedEvent(String, Collection)} + * , thus {@link HeartbeatEvent} handle is always ignored + * + * @return false forever + */ + @Bean + public Predicate heartbeatEventChangedPredicate() { + return event -> false; } - private void registerServiceWatcher(String serviceName) { - - String servicePath = buildServicePath(serviceName); - - CuratorWatcher watcher = watcherCaches.computeIfAbsent(servicePath, - path -> new ServiceInstancesChangedWatcher(serviceName)); + /** + * Handle on {@link InstanceRegisteredEvent} after + * {@link ZookeeperServiceWatch#onApplicationEvent(InstanceRegisteredEvent)} + * + * @param event {@link InstanceRegisteredEvent} + * @see #reattachTreeCacheListeners() + */ + @Override + public void onApplicationEvent(InstanceRegisteredEvent event) { + reattachTreeCacheListeners(); + } - try { - curatorFramework.getChildren().usingWatcher(watcher).forPath(servicePath); - } catch (KeeperException.NoNodeException e) { - // ignored - if (logger.isErrorEnabled()) { - logger.error(e.getMessage()); + /** + * Re-attach the {@link TreeCacheListener TreeCacheListeners} + */ + private void reattachTreeCacheListeners() { + + TreeCache treeCache = zookeeperServiceWatch.getCache(); + + Listenable listenable = treeCache.getListenable(); + + /** + * All registered TreeCacheListeners except {@link ZookeeperServiceWatch}. + * Usually, "otherListeners" will be empty because Spring Cloud Zookeeper only adds + * "zookeeperServiceWatch" bean as {@link TreeCacheListener} + */ + List otherListeners = new LinkedList<>(); + + if (listenable instanceof ListenerContainer) { + ListenerContainer listenerContainer = (ListenerContainer) listenable; + listenerContainer.forEach(listener -> { + /** + * Even though "listener" is an instance of {@link ZookeeperServiceWatch}, + * "zookeeperServiceWatch" bean that was enhanced by AOP is different from the former, + * thus it's required to exclude "listener" + */ + if (!(listener instanceof ZookeeperServiceWatch)) { + otherListeners.add(listener); + } + return null; + }); + + // remove all TreeCacheListeners temporarily + listenerContainer.clear(); + // re-store zookeeperServiceWatch, and make sure it's first one + // now "beforeChildEvent" is available for Spring AOP + listenerContainer.addListener(zookeeperServiceWatch); + // re-store others + otherListeners.forEach(listenerContainer::addListener); + } else { + if (logger.isWarnEnabled()) { + logger.warn("Tell me which version Curator framework current application used? I will do better :D"); } - } catch (Exception e) { - throw new IllegalStateException(e.getMessage(), e); } } - class ServiceInstancesChangedWatcher implements CuratorWatcher { - - private final String serviceName; - - ServiceInstancesChangedWatcher(String serviceName) { - this.serviceName = serviceName; + /** + * Try to {@link #dispatchServiceInstancesChangedEvent(String, Collection) dispatch} + * {@link ServiceInstancesChangedEvent} before {@link ZookeeperServiceWatch#childEvent(CuratorFramework, TreeCacheEvent)} + * execution if required + * + * @param client {@link CuratorFramework} + * @param event {@link TreeCacheEvent} + */ + @Before(CHILD_EVENT_POINTCUT_EXPRESSION) + public void beforeChildEvent(CuratorFramework client, TreeCacheEvent event) { + if (supportsEventType(event)) { + String serviceName = resolveServiceName(event); + if (hasText(serviceName)) { + dispatchServiceInstancesChangedEvent(serviceName, getInstances(serviceName)); + } } + } - @Override - public void process(WatchedEvent event) throws Exception { + @After(CHILD_EVENT_POINTCUT_EXPRESSION) + public void afterChildEvent(CuratorFramework client, TreeCacheEvent event) { + } - Watcher.Event.EventType eventType = event.getType(); + /** + * Resolve the name of service + * + * @param event {@link TreeCacheEvent} + * @return If the Zookeeper's {@link ChildData#getPath() node path} that was notified comes from + * {@link ServiceInstance the service instance}, return it's parent path as the service name, + * or return null + */ + private String resolveServiceName(TreeCacheEvent event) { + ChildData childData = event.getData(); + String path = childData.getPath(); + if (logger.isDebugEnabled()) { + logger.debug("ZK node[path : {}] event type : {}", path, event.getType()); + } - if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) { - dispatchServiceInstancesChangedEvent(serviceName, getInstances(serviceName)); - } + String serviceName = null; - // re-register again - registerServiceWatcher(serviceName); + if (pathMatcher.match(serviceInstancePathPattern, path)) { + Map variables = pathMatcher.extractUriTemplateVariables(serviceInstancePathPattern, path); + serviceName = variables.get(SERVICE_NAME_PATH_VARIABLE_NAME); } - } - private String buildServicePath(String serviceName) { - return rootPath + "/" + serviceRelativePath(serviceName); + return serviceName; } - private String serviceRelativePath(String serviceName) { - return Optional.ofNullable(zookeeperDependencies.getIfAvailable()) - .map(z -> z.getAliasForPath(serviceName)) - .orElse(serviceName); + /** + * The {@link TreeCacheEvent#getType() event type} is supported or not + * + * @param event {@link TreeCacheEvent} + * @return the rule is same as {@link ZookeeperServiceWatch#childEvent(CuratorFramework, TreeCacheEvent)} method + */ + private boolean supportsEventType(TreeCacheEvent event) { + TreeCacheEvent.Type eventType = event.getType(); + return eventType.equals(TreeCacheEvent.Type.NODE_ADDED) + || eventType.equals(TreeCacheEvent.Type.NODE_REMOVED) + || eventType.equals(TreeCacheEvent.Type.NODE_UPDATED); } - } @Configuration @ConditionalOnBean(name = CONSUL_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME) - @AutoConfigureOrder class ConsulConfiguration { } + + @Configuration + @ConditionalOnBean(name = NACOS_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME) + class NacosConfiguration { + + /** + * Nacos uses {@link EventListener} to trigger {@link #dispatchServiceInstancesChangedEvent(String, Collection)} + * , thus {@link HeartbeatEvent} handle is always ignored + * + * @return false forever + */ + @Bean + public Predicate heartbeatEventChangedPredicate() { + return event -> false; + } + + } } diff --git a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java index 976b27531..c44009364 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java @@ -169,11 +169,29 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto @Autowired private DubboMetadataServiceExporter dubboMetadataServiceExporter; + // ==================================================================================== // @PostConstruct - public void init() { - initSubscribedServices(); + public void initSubscribedServices() { + synchronized (monitor) { + // If subscribes all services + if (ALL_DUBBO_SERVICES.equals(dubboCloudProperties.getSubscribedServices())) { + List services = discoveryClient.getServices(); + subscribedServices = new HashSet<>(services); + if (logger.isWarnEnabled()) { + logger.warn("Current application will subscribe all services(size:{}) in registry, " + + "a lot of memory and CPU cycles may be used, " + + "thus it's strongly recommend you using the externalized property '{}' " + + "to specify the services", + subscribedServices.size(), "dubbo.cloud.subscribed-services"); + } + } else { + subscribedServices = new HashSet<>(dubboCloudProperties.subscribedServices()); + } + // exclude current application name + excludeSelf(subscribedServices); + } } @Override @@ -185,7 +203,7 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto * Initialize the metadata */ private void initializeMetadata() { - subscribedServices.forEach(this::initializeMetadata); + doGetSubscribedServices().forEach(this::initializeMetadata); if (logger.isInfoEnabled()) { logger.info("The metadata of Dubbo services has been initialized"); } @@ -315,7 +333,7 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto * @return */ public boolean isSubscribedService(String serviceName) { - return subscribedServices.contains(serviceName); + return doGetSubscribedServices().contains(serviceName); } public void exportURL(URL url) { @@ -431,8 +449,14 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto return match(dubboRestServiceMetadataRepository, serviceName, requestMetadata); } + protected Set doGetSubscribedServices() { + synchronized (monitor) { + return subscribedServices; + } + } + public Set getSubscribedServices() { - return Collections.unmodifiableSet(subscribedServices); + return unmodifiableSet(doGetSubscribedServices()); } private T match(Map> repository, String serviceName, @@ -517,25 +541,6 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto return new LinkedHashMap<>(); } - private void initSubscribedServices() { - // If subscribes all services - if (ALL_DUBBO_SERVICES.equals(dubboCloudProperties.getSubscribedServices())) { - List services = discoveryClient.getServices(); - subscribedServices = new HashSet<>(services); - if (logger.isWarnEnabled()) { - logger.warn("Current application will subscribe all services(size:{}) in registry, " + - "a lot of memory and CPU cycles may be used, " + - "thus it's strongly recommend you using the externalized property '{}' " + - "to specify the services", - subscribedServices.size(), "dubbo.cloud.subscribed-services"); - } - } else { - subscribedServices = new HashSet<>(dubboCloudProperties.subscribedServices()); - } - - excludeSelf(subscribedServices); - } - private void excludeSelf(Set subscribedServices) { subscribedServices.remove(currentApplicationName); } diff --git a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/AbstractSpringCloudRegistry.java b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/AbstractSpringCloudRegistry.java index 8a32e9397..1c8e39861 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/AbstractSpringCloudRegistry.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/AbstractSpringCloudRegistry.java @@ -175,6 +175,10 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { applicationContext.addApplicationListener(new ApplicationListener() { @Override public void onApplicationEvent(ServiceInstancesChangedEvent event) { + if (event.isProcessed()) { // If processed, return immediately + return; + } + String serviceName = event.getServiceName(); Collection serviceInstances = event.getServiceInstances(); if (logger.isInfoEnabled()) { @@ -182,6 +186,8 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry { serviceName, serviceInstances.size()); } subscribeDubboServiceURLs(url, listener, serviceName, s -> serviceInstances); + // Mark event to be processed + event.process(); } }); } diff --git a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/event/ServiceInstancesChangedEvent.java b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/event/ServiceInstancesChangedEvent.java index a3cac681f..01a5fd628 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/event/ServiceInstancesChangedEvent.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/event/ServiceInstancesChangedEvent.java @@ -18,6 +18,8 @@ package com.alibaba.cloud.dubbo.registry.event; import org.springframework.cloud.client.ServiceInstance; import org.springframework.context.ApplicationEvent; +import org.springframework.context.event.ApplicationEventMulticaster; +import org.springframework.context.event.SimpleApplicationEventMulticaster; import java.util.Collection; @@ -32,6 +34,14 @@ public class ServiceInstancesChangedEvent extends ApplicationEvent { private final Collection serviceInstances; + /** + * Current event has been processed or not. + * Typically, Spring Event was based on sync {@link ApplicationEventMulticaster} + * + * @see SimpleApplicationEventMulticaster + */ + private boolean processed = false; + /** * @param serviceName The name of service that was changed * @param serviceInstances all {@link ServiceInstance service instances} @@ -56,4 +66,20 @@ public class ServiceInstancesChangedEvent extends ApplicationEvent { public Collection getServiceInstances() { return serviceInstances; } -} + + /** + * Mark current event being processed + */ + public void process() { + processed = true; + } + + /** + * Current event has been processed or not + * + * @return if processed, return true, or false + */ + public boolean isProcessed() { + return processed; + } +} \ No newline at end of file