|
|
|
@ -18,7 +18,6 @@ package org.springframework.cloud.alibaba.dubbo.registry;
|
|
|
|
|
|
|
|
|
|
import org.apache.dubbo.common.Constants;
|
|
|
|
|
import org.apache.dubbo.common.URL;
|
|
|
|
|
import org.apache.dubbo.common.utils.UrlUtils;
|
|
|
|
|
import org.apache.dubbo.registry.NotifyListener;
|
|
|
|
|
import org.apache.dubbo.registry.RegistryFactory;
|
|
|
|
|
import org.apache.dubbo.registry.support.FailbackRegistry;
|
|
|
|
@ -29,20 +28,21 @@ import org.springframework.cloud.client.ServiceInstance;
|
|
|
|
|
import org.springframework.cloud.client.discovery.DiscoveryClient;
|
|
|
|
|
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.Iterator;
|
|
|
|
|
import java.util.LinkedHashSet;
|
|
|
|
|
import java.util.LinkedList;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.function.Predicate;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
|
import static java.util.Collections.emptyList;
|
|
|
|
|
import static java.util.Collections.singleton;
|
|
|
|
|
import static org.apache.dubbo.common.Constants.PROVIDER_SIDE;
|
|
|
|
|
import static org.apache.dubbo.common.Constants.SIDE_KEY;
|
|
|
|
|
import static org.springframework.util.ObjectUtils.isEmpty;
|
|
|
|
|
import static org.springframework.util.StringUtils.hasText;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Abstract Dubbo {@link RegistryFactory} uses Spring Cloud Service Registration abstraction, whose protocol is "spring-cloud"
|
|
|
|
@ -144,13 +144,8 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void filterServiceNames(Collection<String> serviceNames) {
|
|
|
|
|
filter(serviceNames, new Filter<String>() {
|
|
|
|
|
@Override
|
|
|
|
|
public boolean accept(String serviceName) {
|
|
|
|
|
return supports(serviceName);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
private Set<String> filterServiceNames(Collection<String> serviceNames) {
|
|
|
|
|
return new LinkedHashSet<>(filter(serviceNames, this::supports));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected abstract boolean supports(String serviceName);
|
|
|
|
@ -185,8 +180,7 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
|
|
|
|
|
*/
|
|
|
|
|
protected Set<String> getServiceNamesForOps(URL url) {
|
|
|
|
|
Set<String> serviceNames = getAllServiceNames();
|
|
|
|
|
filterServiceNames(serviceNames);
|
|
|
|
|
return serviceNames;
|
|
|
|
|
return filterServiceNames(serviceNames);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected abstract String getServiceName(URL url);
|
|
|
|
@ -206,7 +200,7 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected List<ServiceInstance> getServiceInstances(String serviceName) {
|
|
|
|
|
return discoveryClient.getInstances(serviceName);
|
|
|
|
|
return hasText(serviceName) ? discoveryClient.getInstances(serviceName) : emptyList();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void subscribe(final URL url, final NotifyListener listener, final Collection<String> serviceNames) {
|
|
|
|
@ -227,67 +221,9 @@ public abstract class AbstractSpringCloudRegistry extends FailbackRegistry {
|
|
|
|
|
*/
|
|
|
|
|
protected abstract void notifySubscriber(URL url, NotifyListener listener, List<ServiceInstance> serviceInstances);
|
|
|
|
|
|
|
|
|
|
protected void filterHealthyInstances(Collection<ServiceInstance> instances) {
|
|
|
|
|
filter(instances, new Filter<ServiceInstance>() {
|
|
|
|
|
@Override
|
|
|
|
|
public boolean accept(ServiceInstance data) {
|
|
|
|
|
// TODO check the details of status
|
|
|
|
|
// return serviceRegistry.getStatus(new DubboRegistration(data)) != null;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected List<URL> buildURLs(URL consumerURL, Collection<ServiceInstance> serviceInstances) {
|
|
|
|
|
if (serviceInstances.isEmpty()) {
|
|
|
|
|
return Collections.emptyList();
|
|
|
|
|
}
|
|
|
|
|
List<URL> urls = new LinkedList<URL>();
|
|
|
|
|
for (ServiceInstance serviceInstance : serviceInstances) {
|
|
|
|
|
URL url = buildURL(serviceInstance);
|
|
|
|
|
if (UrlUtils.isMatch(consumerURL, url)) {
|
|
|
|
|
urls.add(url);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return urls;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private URL buildURL(ServiceInstance serviceInstance) {
|
|
|
|
|
URL url = new URL(serviceInstance.getMetadata().get(Constants.PROTOCOL_KEY),
|
|
|
|
|
serviceInstance.getHost(),
|
|
|
|
|
serviceInstance.getPort(),
|
|
|
|
|
serviceInstance.getMetadata());
|
|
|
|
|
return url;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private <T> void filter(Collection<T> collection, Filter<T> filter) {
|
|
|
|
|
Iterator<T> iterator = collection.iterator();
|
|
|
|
|
while (iterator.hasNext()) {
|
|
|
|
|
T data = iterator.next();
|
|
|
|
|
if (!filter.accept(data)) { // remove if not accept
|
|
|
|
|
iterator.remove();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static <T> T[] of(T... values) {
|
|
|
|
|
return values;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A filter
|
|
|
|
|
*/
|
|
|
|
|
public interface Filter<T> {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Tests whether or not the specified data should be accepted.
|
|
|
|
|
*
|
|
|
|
|
* @param data The data to be tested
|
|
|
|
|
* @return <code>true</code> if and only if <code>data</code>
|
|
|
|
|
* should be accepted
|
|
|
|
|
*/
|
|
|
|
|
boolean accept(T data);
|
|
|
|
|
|
|
|
|
|
protected <T> Collection<T> filter(Collection<T> collection, Predicate<T> filter) {
|
|
|
|
|
return collection.stream()
|
|
|
|
|
.filter(filter)
|
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|