diff --git a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/autoconfigure/DubboServiceDiscoveryAutoConfiguration.java b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/autoconfigure/DubboServiceDiscoveryAutoConfiguration.java index 8c3315920..4a91b762b 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/autoconfigure/DubboServiceDiscoveryAutoConfiguration.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/autoconfigure/DubboServiceDiscoveryAutoConfiguration.java @@ -23,8 +23,12 @@ import com.alibaba.cloud.dubbo.env.DubboCloudProperties; import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository; import com.alibaba.cloud.dubbo.registry.AbstractSpringCloudRegistry; import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent; +import com.alibaba.cloud.dubbo.registry.event.SubscribedServicesChangedEvent; import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.cloud.nacos.discovery.NacosWatch; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.netflix.discovery.CacheRefreshedEvent; import com.netflix.discovery.shared.Applications; import org.apache.curator.framework.CuratorFramework; @@ -61,25 +65,30 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.event.EventListener; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.AntPathMatcher; +import org.springframework.util.ReflectionUtils; import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Stream; import static com.alibaba.cloud.dubbo.autoconfigure.DubboServiceDiscoveryAutoConfiguration.CONSUL_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME; import static com.alibaba.cloud.dubbo.autoconfigure.DubboServiceDiscoveryAutoConfiguration.NACOS_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME; import static com.alibaba.cloud.dubbo.autoconfigure.DubboServiceDiscoveryAutoConfiguration.ZOOKEEPER_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME; import static com.alibaba.cloud.dubbo.autoconfigure.DubboServiceRegistrationAutoConfiguration.EUREKA_CLIENT_AUTO_CONFIGURATION_CLASS_NAME; +import static com.alibaba.cloud.nacos.discovery.NacosDiscoveryClient.hostToServiceInstanceList; import static org.springframework.util.StringUtils.hasText; /** * Dubbo Service Discovery Auto {@link Configuration} (after {@link DubboServiceRegistrationAutoConfiguration}) * + * @author Mercy * @see DubboServiceRegistrationAutoConfiguration * @see Configuration * @see DiscoveryClient @@ -126,11 +135,6 @@ public class DubboServiceDiscoveryAutoConfiguration { this.heartbeatEventChangedPredicate = heartbeatEventChangedPredicate; } - - private void forEachSubscribedServices(Consumer serviceNameConsumer) { - dubboServiceMetadataRepository.getSubscribedServices().forEach(serviceNameConsumer); - } - /** * Dispatch a {@link ServiceInstancesChangedEvent} * @@ -180,8 +184,8 @@ public class DubboServiceDiscoveryAutoConfiguration { * Because of some {@link DiscoveryClient} implementations have the better and fine-grained the event mechanism for service instances * change, thus {@link HeartbeatEvent} handle will be ignored in these scenarios: * *

* If the customized {@link DiscoveryClient} also providers the similar mechanism, the implementation could declare @@ -193,16 +197,17 @@ public class DubboServiceDiscoveryAutoConfiguration { */ @EventListener(HeartbeatEvent.class) public void onHeartbeatEvent(HeartbeatEvent event) { + /** + * Try to re-initialize the subscribed services, in order to sense the change of services if + * {@link DubboCloudProperties#getSubscribedServices()} is wildcard that indicates all services should be + * subscribed. + */ + Stream subscribedServices = dubboServiceMetadataRepository.initSubscribedServices(); + heartbeatEventChangedPredicate.ifAvailable(predicate -> { if (predicate.test(event)) { - /** - * Try to re-initialize the subscribed services, in order to sense the change of services if - * {@link DubboCloudProperties#getSubscribedServices()} is wildcard that indicates all services should be - * subscribed. - */ - dubboServiceMetadataRepository.initSubscribedServices(); - // Dispatch ServiceInstancesChangedEvent - forEachSubscribedServices(serviceName -> { + // Dispatch ServiceInstancesChangedEvent for each service + subscribedServices.forEach(serviceName -> { List serviceInstances = getInstances(serviceName); dispatchServiceInstancesChangedEvent(serviceName, serviceInstances); }); @@ -463,6 +468,18 @@ public class DubboServiceDiscoveryAutoConfiguration { @ConditionalOnBean(name = NACOS_DISCOVERY_AUTO_CONFIGURATION_CLASS_NAME) class NacosConfiguration { + private final NamingService namingService; + + /** + * the set of services is listening + */ + private final Set listeningServices; + + NacosConfiguration(NacosDiscoveryProperties nacosDiscoveryProperties) { + this.namingService = nacosDiscoveryProperties.namingServiceInstance(); + this.listeningServices = new ConcurrentSkipListSet<>(); + } + /** * Nacos uses {@link EventListener} to trigger {@link #dispatchServiceInstancesChangedEvent(String, Collection)} * , thus {@link HeartbeatEvent} handle is always ignored @@ -474,5 +491,26 @@ public class DubboServiceDiscoveryAutoConfiguration { return event -> false; } + @EventListener(SubscribedServicesChangedEvent.class) + public void onSubscribedServicesChangedEvent(SubscribedServicesChangedEvent event) throws Exception { + // subscribe EventListener for each service + event.getNewSubscribedServices().forEach(this::subscribeEventListener); + } + + private void subscribeEventListener(String serviceName) { + if (listeningServices.add(serviceName)) { + try { + namingService.subscribe(serviceName, event -> { + if (event instanceof NamingEvent) { + NamingEvent namingEvent = (NamingEvent) event; + List serviceInstances = hostToServiceInstanceList(namingEvent.getInstances(), serviceName); + dispatchServiceInstancesChangedEvent(serviceName, serviceInstances); + } + }); + } catch (NacosException e) { + ReflectionUtils.rethrowRuntimeException(e); + } + } + } } } diff --git a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java index c44009364..4a1436156 100644 --- a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java +++ b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/metadata/repository/DubboServiceMetadataRepository.java @@ -23,6 +23,7 @@ import com.alibaba.cloud.dubbo.http.matcher.RequestMetadataMatcher; import com.alibaba.cloud.dubbo.metadata.DubboRestServiceMetadata; import com.alibaba.cloud.dubbo.metadata.RequestMetadata; import com.alibaba.cloud.dubbo.metadata.ServiceRestMetadata; +import com.alibaba.cloud.dubbo.registry.event.SubscribedServicesChangedEvent; import com.alibaba.cloud.dubbo.service.DubboMetadataService; import com.alibaba.cloud.dubbo.service.DubboMetadataServiceExporter; import com.alibaba.cloud.dubbo.service.DubboMetadataServiceProxy; @@ -37,6 +38,9 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.commons.util.InetUtils; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.http.HttpRequest; import org.springframework.stereotype.Repository; import org.springframework.util.LinkedMultiValueMap; @@ -44,7 +48,6 @@ import org.springframework.util.MultiValueMap; import javax.annotation.PostConstruct; import java.util.Collections; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -52,6 +55,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.alibaba.cloud.dubbo.env.DubboCloudProperties.ALL_DUBBO_SERVICES; import static com.alibaba.cloud.dubbo.http.DefaultHttpRequest.builder; @@ -73,7 +77,7 @@ import static org.springframework.util.StringUtils.hasText; * @author Mercy */ @Repository -public class DubboServiceMetadataRepository implements SmartInitializingSingleton { +public class DubboServiceMetadataRepository implements SmartInitializingSingleton, ApplicationEventPublisherAware { /** * The prefix of {@link DubboMetadataService} : "dubbo.metadata-service." @@ -99,6 +103,8 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto */ private final Object monitor = new Object(); + private ApplicationEventPublisher applicationEventPublisher; + /** * A {@link Set} of service names that had been initialized */ @@ -117,7 +123,7 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto // =================================== Subscription =================================== // - private Set subscribedServices = emptySet(); + private volatile Set subscribedServices = emptySet(); /** * The subscribed {@link URL urls} {@link Map} of {@link DubboMetadataService}, @@ -172,26 +178,50 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto // ==================================================================================== // + /** + * Initialize {@link #subscribedServices the subscribed services} + * + * @return + */ @PostConstruct - public void initSubscribedServices() { - synchronized (monitor) { - // If subscribes all services - if (ALL_DUBBO_SERVICES.equals(dubboCloudProperties.getSubscribedServices())) { - List services = discoveryClient.getServices(); - subscribedServices = new HashSet<>(services); - if (logger.isWarnEnabled()) { - logger.warn("Current application will subscribe all services(size:{}) in registry, " + - "a lot of memory and CPU cycles may be used, " + - "thus it's strongly recommend you using the externalized property '{}' " + - "to specify the services", - subscribedServices.size(), "dubbo.cloud.subscribed-services"); - } - } else { - subscribedServices = new HashSet<>(dubboCloudProperties.subscribedServices()); + public Stream initSubscribedServices() { + Set newSubscribedServices = new LinkedHashSet<>(); + + // If subscribes all services + if (ALL_DUBBO_SERVICES.equals(dubboCloudProperties.getSubscribedServices())) { + List services = discoveryClient.getServices(); + newSubscribedServices.addAll(services); + if (logger.isWarnEnabled()) { + logger.warn("Current application will subscribe all services(size:{}) in registry, " + + "a lot of memory and CPU cycles may be used, " + + "thus it's strongly recommend you using the externalized property '{}' " + + "to specify the services", + newSubscribedServices.size(), "dubbo.cloud.subscribed-services"); } - // exclude current application name - excludeSelf(subscribedServices); + } else { + newSubscribedServices.addAll(dubboCloudProperties.subscribedServices()); } + + // exclude current application name + excludeSelf(newSubscribedServices); + + // copy from subscribedServices + Set oldSubscribedServices = this.subscribedServices; + + // volatile update subscribedServices to be new one + this.subscribedServices = newSubscribedServices; + + // dispatch SubscribedServicesChangedEvent + dispatchEvent(new SubscribedServicesChangedEvent(this, oldSubscribedServices, newSubscribedServices)); + + // clear old one, help GC + oldSubscribedServices.clear(); + + return newSubscribedServices.stream(); + } + + private void dispatchEvent(ApplicationEvent event) { + applicationEventPublisher.publishEvent(event); } @Override @@ -449,10 +479,12 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto return match(dubboRestServiceMetadataRepository, serviceName, requestMetadata); } + /** + * @return not-null + */ protected Set doGetSubscribedServices() { - synchronized (monitor) { - return subscribedServices; - } + Set subscribedServices = this.subscribedServices; + return subscribedServices == null ? emptySet() : subscribedServices; } public Set getSubscribedServices() { @@ -576,4 +608,9 @@ public class DubboServiceMetadataRepository implements SmartInitializingSingleto // Initialize DubboMetadataService with right version dubboMetadataConfigServiceProxy.initProxy(serviceName, version); } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } } diff --git a/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/event/SubscribedServicesChangedEvent.java b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/event/SubscribedServicesChangedEvent.java new file mode 100644 index 000000000..7ed8bce6f --- /dev/null +++ b/spring-cloud-alibaba-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/event/SubscribedServicesChangedEvent.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.cloud.dubbo.registry.event; + +import org.springframework.context.ApplicationEvent; + +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; + +/** + * {@link ApplicationEvent Event} raised when the subscribed services are changed + *

+ * + * @author Mercy + * @see ApplicationEvent + */ +public class SubscribedServicesChangedEvent extends ApplicationEvent { + + private final Set oldSubscribedServices; + + private final Set newSubscribedServices; + + private final boolean changed; + + /** + * Create a new ApplicationEvent. + * + * @param source the object on which the event initially occurred (never {@code null}) + * @param oldSubscribedServices the subscribed services before changed + * @param newSubscribedServices the subscribed services after changed + */ + public SubscribedServicesChangedEvent(Object source, Set oldSubscribedServices, Set newSubscribedServices) { + super(source); + this.oldSubscribedServices = new LinkedHashSet<>(oldSubscribedServices); + this.newSubscribedServices = new LinkedHashSet<>(newSubscribedServices); + this.changed = !Objects.equals(oldSubscribedServices, newSubscribedServices); + } + + public Set getOldSubscribedServices() { + return oldSubscribedServices; + } + + public Set getNewSubscribedServices() { + return newSubscribedServices; + } + + public boolean isChanged() { + return changed; + } +}