pull/562/head
mercyblitz 6 years ago
parent 0a9930c8c2
commit 18c4fccebf

@ -31,8 +31,7 @@ import org.springframework.cloud.alibaba.dubbo.util.JSONUtils;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
@ -40,7 +39,6 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
@ -50,7 +48,6 @@ import static org.apache.dubbo.common.Constants.PROTOCOL_KEY;
import static org.apache.dubbo.common.Constants.PROVIDER_SIDE;
import static org.apache.dubbo.common.Constants.SIDE_KEY;
import static org.apache.dubbo.common.Constants.VERSION_KEY;
import static org.springframework.util.ObjectUtils.isEmpty;
import static org.springframework.util.StringUtils.hasText;
/**
@ -67,6 +64,8 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
protected static final String DUBBO_METADATA_SERVICE_CLASS_NAME = DubboMetadataService.class.getName();
private static final Set<String> schedulerTasks = new HashSet<>();
protected final Logger logger = LoggerFactory.getLogger(getClass());
/**
@ -82,6 +81,7 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
private final JSONUtils jsonUtils;
protected final ScheduledExecutorService servicesLookupScheduler;
public AbstractSpringCloudRegistry(URL url,
@ -159,9 +159,14 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
doSubscribeDubboServiceURLs(url, listener);
schedule(() -> {
doSubscribeDubboServiceURLs(url, listener);
});
submitSchedulerTaskIfAbsent(url, listener);
}
private void submitSchedulerTaskIfAbsent(URL url, NotifyListener listener) {
String taskId = url.toIdentityString();
if (schedulerTasks.add(taskId)) {
schedule(() -> doSubscribeDubboServiceURLs(url, listener));
}
}
protected void doSubscribeDubboServiceURLs(URL url, NotifyListener listener) {
@ -194,8 +199,7 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
});
if (logger.isDebugEnabled()) {
logger.debug("The subscribed URLs[ service name : {} , protocol : {}] will be notified : {}"
, serviceName, protocol, subscribedURLs);
logger.debug("The subscribed URL[{}] will notify all URLs : {}", url, subscribedURLs);
}
allSubscribedURLs.addAll(subscribedURLs);
@ -205,6 +209,22 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
});
}
private List<ServiceInstance> getServiceInstances(String serviceName) {
return hasText(serviceName) ? doGetServiceInstances(serviceName) : emptyList();
}
private List<ServiceInstance> doGetServiceInstances(String serviceName) {
List<ServiceInstance> serviceInstances = emptyList();
try {
serviceInstances = discoveryClient.getInstances(serviceName);
} catch (Exception e) {
if (logger.isErrorEnabled()) {
logger.error(e.getMessage(), e);
}
}
return serviceInstances;
}
private List<URL> getExportedURLs(DubboMetadataService dubboMetadataService, URL url) {
String serviceInterface = url.getServiceInterface();
String group = url.getParameter(GROUP_KEY);
@ -220,7 +240,6 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
).collect(Collectors.toList());
}
private void subscribeDubboMetadataServiceURLs(URL url, NotifyListener listener) {
String serviceInterface = url.getServiceInterface();
String group = url.getParameter(GROUP_KEY);
@ -248,16 +267,6 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
}
}
private Set<String> filterServiceNames(Collection<String> serviceNames) {
return new LinkedHashSet<>(filter(serviceNames, this::supports));
}
protected abstract boolean supports(String serviceName);
protected final Set<String> getAllServiceNames() {
return new LinkedHashSet<>(discoveryClient.getServices());
}
protected boolean isAdminURL(URL url) {
return Constants.ADMIN_PROTOCOL.equals(url.getProtocol());
}
@ -266,56 +275,8 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface());
}
/**
* Get the service names for Dubbo OPS
*
* @param url {@link URL}
* @return non-null
*/
protected Set<String> getServiceNamesForOps(URL url) {
Set<String> serviceNames = getAllServiceNames();
return filterServiceNames(serviceNames);
}
private void doSubscribe(final URL url, final NotifyListener listener, final Collection<String> serviceNames) {
subscribe(url, listener, serviceNames);
schedule(() -> {
subscribe(url, listener, serviceNames);
});
}
protected ScheduledFuture<?> schedule(Runnable runnable) {
return this.servicesLookupScheduler.scheduleAtFixedRate(runnable, servicesLookupInterval,
servicesLookupInterval, TimeUnit.SECONDS);
}
protected List<ServiceInstance> getServiceInstances(String serviceName) {
return hasText(serviceName) ? discoveryClient.getInstances(serviceName) : emptyList();
}
private void subscribe(final URL url, final NotifyListener listener, final Collection<String> serviceNames) {
for (String serviceName : serviceNames) {
List<ServiceInstance> serviceInstances = getServiceInstances(serviceName);
if (!isEmpty(serviceInstances)) {
notifySubscriber(url, listener, serviceInstances);
}
}
}
/**
* Notify the Healthy {@link ServiceInstance service instance} to subscriber.
*
* @param url {@link URL}
* @param listener {@link NotifyListener}
* @param serviceInstances all {@link ServiceInstance instances}
*/
protected abstract void notifySubscriber(URL url, NotifyListener listener, List<ServiceInstance> serviceInstances);
protected <T> Collection<T> filter(Collection<T> collection, Predicate<T> filter) {
return collection.stream()
.filter(filter)
.collect(Collectors.toList());
}
}

@ -17,18 +17,14 @@
package org.springframework.cloud.alibaba.dubbo.registry;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.RegistryFactory;
import org.springframework.cloud.alibaba.dubbo.metadata.repository.DubboServiceMetadataRepository;
import org.springframework.cloud.alibaba.dubbo.service.DubboMetadataServiceProxy;
import org.springframework.cloud.alibaba.dubbo.util.JSONUtils;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
/**
* Dubbo {@link RegistryFactory} uses Spring Cloud Service Registration abstraction, whose protocol is "spring-cloud"
@ -57,18 +53,4 @@ public class SpringCloudRegistry extends AbstractSpringCloudRegistry {
protected void doUnregister0(URL url) {
dubboServiceMetadataRepository.unexportURL(url);
}
@Override
protected boolean supports(String serviceName) {
return dubboServiceMetadataRepository.isSubscribedService(serviceName);
}
@Override
protected void notifySubscriber(URL url, NotifyListener listener, List<ServiceInstance> serviceInstances) {
List<URL> urls = serviceInstances.stream()
.map(dubboServiceMetadataRepository::getDubboMetadataServiceURLs)
.flatMap(List::stream)
.collect(Collectors.toList());
notify(url, listener, urls);
}
}

Loading…
Cancel
Save