From f8161141224157aea0bcc08b36332aa2c2e58a80 Mon Sep 17 00:00:00 2001
From: theonefx <chenxilzx1@gmail.com>
Date: Wed, 12 May 2021 20:27:50 +0800
Subject: [PATCH] re subscribe when failed

---
 .../cloud/dubbo/env/DubboCloudProperties.java | 20 +++++
 .../dubbo/registry/DubboCloudRegistry.java    | 65 +++++++++++++--
 .../registry/ReSubscribeMetadataJob.java      | 82 +++++++++++++++++++
 .../registry/SpringCloudRegistryFactory.java  |  4 +-
 4 files changed, 162 insertions(+), 9 deletions(-)
 create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java

diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java
index 577d19e59..5135c5a8c 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/env/DubboCloudProperties.java
@@ -51,6 +51,10 @@ public class DubboCloudProperties {
 
 	private String registryType = DUBBO_CLOUD_REGISTRY_PROPERTY_VALUE;
 
+	private int maxReSubscribeMetadataTimes = 1000;
+
+	private int reSubscribeMetadataIntervial = 5;
+
 	public String getSubscribedServices() {
 		return subscribedServices;
 	}
@@ -91,4 +95,20 @@ public class DubboCloudProperties {
 		this.registryType = registryType;
 	}
 
+	public int getMaxReSubscribeMetadataTimes() {
+		return maxReSubscribeMetadataTimes;
+	}
+
+	public void setMaxReSubscribeMetadataTimes(int maxReSubscribeMetadataTimes) {
+		this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes;
+	}
+
+	public int getReSubscribeMetadataIntervial() {
+		return reSubscribeMetadataIntervial;
+	}
+
+	public void setReSubscribeMetadataIntervial(int reSubscribeMetadataIntervial) {
+		this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial;
+	}
+
 }
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java
index 81464733d..b7768c307 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/DubboCloudRegistry.java
@@ -23,6 +23,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -108,11 +111,23 @@ public class DubboCloudRegistry extends FailbackRegistry {
 
 	private final String currentApplicationName;
 
+	private final Map<URL, NotifyListener> urlNotifyListenerMap = new ConcurrentHashMap<>();
+
+	private final Map<String, ReSubscribeMetadataJob> reConnectJobMap = new ConcurrentHashMap<>();
+
+	private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor(
+			2);
+
+	private final int maxReSubscribeMetadataTimes;
+
+	private final int reSubscribeMetadataIntervial;
+
 	public DubboCloudRegistry(URL url, DiscoveryClient discoveryClient,
 			DubboServiceMetadataRepository repository,
 			DubboMetadataServiceProxy dubboMetadataConfigServiceProxy,
 			JSONUtils jsonUtils, DubboGenericServiceFactory dubboGenericServiceFactory,
-			ConfigurableApplicationContext applicationContext) {
+			ConfigurableApplicationContext applicationContext,
+			int maxReSubscribeMetadataTimes, int reSubscribeMetadataIntervial) {
 
 		super(url);
 		this.servicesLookupInterval = url
@@ -125,6 +140,11 @@ public class DubboCloudRegistry extends FailbackRegistry {
 		this.applicationContext = applicationContext;
 		this.dubboMetadataUtils = getBean(DubboMetadataUtils.class);
 		this.currentApplicationName = dubboMetadataUtils.getCurrentApplicationName();
+		this.maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes;
+		this.reSubscribeMetadataIntervial = reSubscribeMetadataIntervial;
+
+		reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES);
+		reConnectPool.allowCoreThreadTimeOut(true);
 	}
 
 	private <T> T getBean(Class<T> beanClass) {
@@ -175,6 +195,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
 		}
 		else { // for general Dubbo Services
 			subscribeURLs(url, listener);
+			urlNotifyListenerMap.put(url, listener);
 		}
 	}
 
@@ -204,7 +225,16 @@ public class DubboCloudRegistry extends FailbackRegistry {
 							logger.debug(
 									"handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}",
 									event.getServiceName(), url.getServiceKey());
-							subscribeURLs(url, serviceNames, listener);
+							try {
+								subscribeURLs(url, serviceNames, listener);
+								reConnectJobMap.remove(serviceName);
+							}
+							catch (Exception e) {
+								logger.warn(String.format(
+										"subscribeURLs failed, serviceName = %s, try reSubscribe again",
+										serviceName), e);
+								addReSubscribeMetadataJob(serviceName, 0);
+							}
 						}
 					}
 
@@ -216,8 +246,19 @@ public class DubboCloudRegistry extends FailbackRegistry {
 				});
 	}
 
-	private void subscribeURLs(URL url, Set<String> serviceNames,
-			NotifyListener listener) {
+	void addReSubscribeMetadataJob(String serviceName, int count) {
+		if (count > maxReSubscribeMetadataTimes) {
+			logger.error(
+					"reSubscribe failed too many times, serviceName = {}, count = {}",
+					serviceName, count);
+			return;
+		}
+		ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, this, count);
+		reConnectJobMap.put(serviceName, job);
+		reConnectPool.schedule(job, reSubscribeMetadataIntervial, TimeUnit.SECONDS);
+	}
+
+	void subscribeURLs(URL url, Set<String> serviceNames, NotifyListener listener) {
 
 		List<URL> subscribedURLs = new LinkedList<>();
 
@@ -393,7 +434,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
 		return metadata.containsKey(METADATA_SERVICE_URLS_PROPERTY_NAME);
 	}
 
-	private Set<String> getServices(URL url) {
+	Set<String> getServices(URL url) {
 		Set<String> subscribedServices = repository.getSubscribedServices();
 		// TODO Add the filter feature
 		return subscribedServices;
@@ -470,12 +511,12 @@ public class DubboCloudRegistry extends FailbackRegistry {
 	private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
 			NotifyListener listener) {
 
+		subscribeDubboMetadataServiceURLs(subscribedURL, listener,
+				getServiceName(subscribedURL));
+
 		// Sync subscription
 		if (containsProviderCategory(subscribedURL)) {
 
-			subscribeDubboMetadataServiceURLs(subscribedURL, listener,
-					getServiceName(subscribedURL));
-
 			registerServiceInstancesChangedListener(subscribedURL,
 					new ServiceInstanceChangeListener() {
 
@@ -558,6 +599,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
 		return ADMIN_PROTOCOL.equals(url.getProtocol());
 	}
 
+	public Map<URL, NotifyListener> getUrlNotifyListenerMap() {
+		return urlNotifyListenerMap;
+	}
+
+	public Map<String, ReSubscribeMetadataJob> getReConnectJobMap() {
+		return reConnectJobMap;
+	}
+
 	protected boolean isDubboMetadataServiceURL(URL url) {
 		return DUBBO_METADATA_SERVICE_CLASS_NAME.equals(url.getServiceInterface());
 	}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java
new file mode 100644
index 000000000..68e0f42cb
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/ReSubscribeMetadataJob.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.dubbo.registry;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.NotifyListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * For re subscribe URL from provider.
+ *
+ * @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
+ */
+public class ReSubscribeMetadataJob implements Runnable {
+
+	protected final Logger logger = LoggerFactory.getLogger(ReSubscribeMetadataJob.class);
+
+	private final String serviceName;
+
+	private final DubboCloudRegistry dubboCloudRegistry;
+
+	private final int errorCounts;
+
+	public ReSubscribeMetadataJob(String serviceName,
+			DubboCloudRegistry dubboCloudRegistry, int errorCounts) {
+		this.errorCounts = errorCounts;
+		this.serviceName = serviceName;
+		this.dubboCloudRegistry = dubboCloudRegistry;
+	}
+
+	public ReSubscribeMetadataJob(String serviceName,
+			DubboCloudRegistry dubboCloudRegistry) {
+		this(serviceName, dubboCloudRegistry, 0);
+	}
+
+	@Override
+	public void run() {
+		if (dubboCloudRegistry.getReConnectJobMap().get(serviceName) != this) {
+			return;
+		}
+		try {
+			for (Map.Entry<URL, NotifyListener> entry : dubboCloudRegistry
+					.getUrlNotifyListenerMap().entrySet()) {
+				doRun(entry.getKey(), entry.getValue());
+			}
+			dubboCloudRegistry.getReConnectJobMap().remove(serviceName);
+		}
+		catch (Exception e) {
+			logger.warn(String.format(
+					"reSubscribe failed, serviceName = %s, try refresh again",
+					serviceName), e);
+			dubboCloudRegistry.addReSubscribeMetadataJob(serviceName, errorCounts + 1);
+		}
+	}
+
+	private void doRun(URL url, NotifyListener listener) {
+		Set<String> serviceNames = dubboCloudRegistry.getServices(url);
+
+		if (serviceNames.contains(serviceName)) {
+			dubboCloudRegistry.subscribeURLs(url, serviceNames, listener);
+		}
+	}
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java
index f8b7896dc..d13c5179c 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-dubbo/src/main/java/com/alibaba/cloud/dubbo/registry/SpringCloudRegistryFactory.java
@@ -100,7 +100,9 @@ public class SpringCloudRegistryFactory extends AbstractRegistryFactory {
 		default:
 			registry = new DubboCloudRegistry(url, discoveryClient,
 					dubboServiceMetadataRepository, dubboMetadataConfigServiceProxy,
-					jsonUtils, dubboGenericServiceFactory, applicationContext);
+					jsonUtils, dubboGenericServiceFactory, applicationContext,
+					dubboCloudProperties.getMaxReSubscribeMetadataTimes(),
+					dubboCloudProperties.getReSubscribeMetadataIntervial());
 			break;
 		}