re subscribe when failed

pull/2071/head
theonefx 4 years ago
parent 3d5eaefe1c
commit f816114122

@ -51,6 +51,10 @@ public class DubboCloudProperties {
private String registryType = DUBBO_CLOUD_REGISTRY_PROPERTY_VALUE;
private int maxReSubscribeMetadataTimes = 1000;
private int reSubscribeMetadataIntervial = 5;
public String getSubscribedServices() {
return subscribedServices;
}
@ -91,4 +95,20 @@ public class DubboCloudProperties {
this.registryType = registryType;
}
public int getMaxReSubscribeMetadataTimes() {
return maxReSubscribeMetadataTimes;
}
public void setMaxReSubscribeMetadataTimes(int maxReSubscribeMetadataTimes) {
this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes;
}
public int getReSubscribeMetadataIntervial() {
return reSubscribeMetadataIntervial;
}
public void setReSubscribeMetadataIntervial(int reSubscribeMetadataIntervial) {
this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial;
}
}

@ -23,6 +23,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -108,11 +111,23 @@ public class DubboCloudRegistry extends FailbackRegistry {
private final String currentApplicationName;
private final Map<URL, NotifyListener> urlNotifyListenerMap = new ConcurrentHashMap<>();
private final Map<String, ReSubscribeMetadataJob> reConnectJobMap = new ConcurrentHashMap<>();
private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor(
2);
private final int maxReSubscribeMetadataTimes;
private final int reSubscribeMetadataIntervial;
public DubboCloudRegistry(URL url, DiscoveryClient discoveryClient,
DubboServiceMetadataRepository repository,
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy,
JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory,
ConfigurableApplicationContext applicationContext) {
ConfigurableApplicationContext applicationContext,
int maxReSubscribeMetadataTimes, int reSubscribeMetadataIntervial) {
super(url);
this.servicesLookupInterval = url
@ -125,6 +140,11 @@ public class DubboCloudRegistry extends FailbackRegistry {
this.applicationContext = applicationContext;
this.dubboMetadataUtils = getBean(DubboMetadataUtils.class);
this.currentApplicationName = dubboMetadataUtils.getCurrentApplicationName();
this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes;
this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial;
reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES);
reConnectPool.allowCoreThreadTimeOut(true);
}
private <T> T getBean(Class<T> beanClass) {
@ -175,6 +195,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
}
else { // for general Dubbo Services
subscribeURLs(url, listener);
urlNotifyListenerMap.put(url, listener);
}
}
@ -204,7 +225,16 @@ public class DubboCloudRegistry extends FailbackRegistry {
logger.debug(
"handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}",
event.getServiceName(), url.getServiceKey());
subscribeURLs(url, serviceNames, listener);
try {
subscribeURLs(url, serviceNames, listener);
reConnectJobMap.remove(serviceName);
}
catch (Exception e) {
logger.warn(String.format(
"subscribeURLs failed, serviceName = %s, try reSubscribe again",
serviceName), e);
addReSubscribeMetadataJob(serviceName, 0);
}
}
}
@ -216,8 +246,19 @@ public class DubboCloudRegistry extends FailbackRegistry {
});
}
private void subscribeURLs(URL url, Set<String> serviceNames,
NotifyListener listener) {
void addReSubscribeMetadataJob(String serviceName, int count) {
if (count > maxReSubscribeMetadataTimes) {
logger.error(
"reSubscribe failed too many times, serviceName = {}, count = {}",
serviceName, count);
return;
}
ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, this, count);
reConnectJobMap.put(serviceName, job);
reConnectPool.schedule(job, reSubscribeMetadataIntervial, TimeUnit.SECONDS);
}
void subscribeURLs(URL url, Set<String> serviceNames, NotifyListener listener) {
List<URL> subscribedURLs = new LinkedList<>();
@ -393,7 +434,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
return metadata.containsKey(METADATA_SERVICE_URLS_PROPERTY_NAME);
}
private Set<String> getServices(URL url) {
Set<String> getServices(URL url) {
Set<String> subscribedServices = repository.getSubscribedServices();
// TODO Add the filter feature
return subscribedServices;
@ -470,12 +511,12 @@ public class DubboCloudRegistry extends FailbackRegistry {
private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
NotifyListener listener) {
subscribeDubboMetadataServiceURLs(subscribedURL, listener,
getServiceName(subscribedURL));
// Sync subscription
if (containsProviderCategory(subscribedURL)) {
subscribeDubboMetadataServiceURLs(subscribedURL, listener,
getServiceName(subscribedURL));
registerServiceInstancesChangedListener(subscribedURL,
new ServiceInstanceChangeListener() {
@ -558,6 +599,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
return ADMIN_PROTOCOL.equals(url.getProtocol());
}
public Map<URL, NotifyListener> getUrlNotifyListenerMap() {
return urlNotifyListenerMap;
}
public Map<String, ReSubscribeMetadataJob> getReConnectJobMap() {
return reConnectJobMap;
}
protected boolean isDubboMetadataServiceURL(URL url) {
return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface());
}

@ -0,0 +1,82 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.dubbo.registry;
import java.util.Map;
import java.util.Set;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.NotifyListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* For re subscribe URL from provider.
*
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
*/
public class ReSubscribeMetadataJob implements Runnable {
protected final Logger logger = LoggerFactory.getLogger(ReSubscribeMetadataJob.class);
private final String serviceName;
private final DubboCloudRegistry dubboCloudRegistry;
private final int errorCounts;
public ReSubscribeMetadataJob(String serviceName,
DubboCloudRegistry dubboCloudRegistry, int errorCounts) {
this.errorCounts = errorCounts;
this.serviceName = serviceName;
this.dubboCloudRegistry = dubboCloudRegistry;
}
public ReSubscribeMetadataJob(String serviceName,
DubboCloudRegistry dubboCloudRegistry) {
this(serviceName, dubboCloudRegistry, 0);
}
@Override
public void run() {
if (dubboCloudRegistry.getReConnectJobMap().get(serviceName) != this) {
return;
}
try {
for (Map.Entry<URL, NotifyListener> entry : dubboCloudRegistry
.getUrlNotifyListenerMap().entrySet()) {
doRun(entry.getKey(), entry.getValue());
}
dubboCloudRegistry.getReConnectJobMap().remove(serviceName);
}
catch (Exception e) {
logger.warn(String.format(
"reSubscribe failed, serviceName = %s, try refresh again",
serviceName), e);
dubboCloudRegistry.addReSubscribeMetadataJob(serviceName, errorCounts + 1);
}
}
private void doRun(URL url, NotifyListener listener) {
Set<String> serviceNames = dubboCloudRegistry.getServices(url);
if (serviceNames.contains(serviceName)) {
dubboCloudRegistry.subscribeURLs(url, serviceNames, listener);
}
}
}

@ -100,7 +100,9 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
default:
registry = new DubboCloudRegistry(url, discoveryClient,
dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
jsonUtils, dubboGenericServiceFactory, applicationContext);
jsonUtils, dubboGenericServiceFactory, applicationContext,
dubboCloudProperties.getMaxReSubscribeMetadataTimes(),
dubboCloudProperties.getReSubscribeMetadataIntervial());
break;
}

Loading…
Cancel
Save