@ -23,6 +23,9 @@ import java.util.List;
import java.util.Map ;
import java.util.Map ;
import java.util.Objects ;
import java.util.Objects ;
import java.util.Set ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ScheduledThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import java.util.function.Supplier ;
import java.util.function.Supplier ;
import java.util.stream.Collectors ;
import java.util.stream.Collectors ;
@ -45,12 +48,10 @@ import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.ApplicationListener ;
import org.springframework.context.ApplicationListener ;
import org.springframework.context.ConfigurableApplicationContext ;
import org.springframework.context.ConfigurableApplicationContext ;
import org.springframework.core.Ordered ;
import org.springframework.core.Ordered ;
import org.springframework.core.annotation.Order ;
import org.springframework.util.CollectionUtils ;
import org.springframework.util.CollectionUtils ;
import static java.lang.String.format ;
import static java.lang.String.format ;
import static java.util.Collections.emptyList ;
import static java.util.Collections.emptyList ;
import static java.util.stream.StreamSupport.stream ;
import static org.apache.dubbo.common.URLBuilder.from ;
import static org.apache.dubbo.common.URLBuilder.from ;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY ;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY ;
import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY ;
import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY ;
@ -110,11 +111,23 @@ public class DubboCloudRegistry extends FailbackRegistry {
private final String currentApplicationName ;
private final String currentApplicationName ;
private final Map < URL , NotifyListener > urlNotifyListenerMap = new ConcurrentHashMap < > ( ) ;
private final Map < String , ReSubscribeMetadataJob > reConnectJobMap = new ConcurrentHashMap < > ( ) ;
private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor (
2 ) ;
private final int maxReSubscribeMetadataTimes ;
private final int reSubscribeMetadataIntervial ;
public DubboCloudRegistry ( URL url , DiscoveryClient discoveryClient ,
public DubboCloudRegistry ( URL url , DiscoveryClient discoveryClient ,
DubboServiceMetadataRepository repository ,
DubboServiceMetadataRepository repository ,
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy ,
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy ,
JSONUtils jsonUtils , DubboGenericServiceFactory dubboGenericServiceFactory ,
JSONUtils jsonUtils , DubboGenericServiceFactory dubboGenericServiceFactory ,
ConfigurableApplicationContext applicationContext ) {
ConfigurableApplicationContext applicationContext ,
int maxReSubscribeMetadataTimes , int reSubscribeMetadataIntervial ) {
super ( url ) ;
super ( url ) ;
this . servicesLookupInterval = url
this . servicesLookupInterval = url
@ -127,6 +140,11 @@ public class DubboCloudRegistry extends FailbackRegistry {
this . applicationContext = applicationContext ;
this . applicationContext = applicationContext ;
this . dubboMetadataUtils = getBean ( DubboMetadataUtils . class ) ;
this . dubboMetadataUtils = getBean ( DubboMetadataUtils . class ) ;
this . currentApplicationName = dubboMetadataUtils . getCurrentApplicationName ( ) ;
this . currentApplicationName = dubboMetadataUtils . getCurrentApplicationName ( ) ;
this . maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes ;
this . reSubscribeMetadataIntervial = reSubscribeMetadataIntervial ;
reConnectPool . setKeepAliveTime ( 10 , TimeUnit . MINUTES ) ;
reConnectPool . allowCoreThreadTimeOut ( true ) ;
}
}
private < T > T getBean ( Class < T > beanClass ) {
private < T > T getBean ( Class < T > beanClass ) {
@ -177,6 +195,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
}
}
else { // for general Dubbo Services
else { // for general Dubbo Services
subscribeURLs ( url , listener ) ;
subscribeURLs ( url , listener ) ;
urlNotifyListenerMap . put ( url , listener ) ;
}
}
}
}
@ -188,19 +207,34 @@ public class DubboCloudRegistry extends FailbackRegistry {
// Async subscription
// Async subscription
registerServiceInstancesChangedListener ( url ,
registerServiceInstancesChangedListener ( url ,
new ApplicationListener< ServiceInstancesChangedEvent > ( ) {
new ServiceInstanceChangeListener ( ) {
private final URL url2subscribe = url ;
@Override
public int getOrder ( ) {
return Ordered . LOWEST_PRECEDENCE ;
}
@Override
@Override
@Order
public void onApplicationEvent ( ServiceInstancesChangedEvent event ) {
public void onApplicationEvent ( ServiceInstancesChangedEvent event ) {
Set < String > serviceNames = getServices ( url ) ;
Set < String > serviceNames = getServices ( url ) ;
String serviceName = event . getServiceName ( ) ;
String serviceName = event . getServiceName ( ) ;
if ( serviceNames . contains ( serviceName ) ) {
if ( serviceNames . contains ( serviceName ) ) {
subscribeURLs ( url , serviceNames , listener ) ;
logger . debug (
"handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}" ,
event . getServiceName ( ) , url . getServiceKey ( ) ) ;
try {
subscribeURLs ( url , serviceNames , listener ) ;
reConnectJobMap . remove ( serviceName ) ;
}
catch ( Exception e ) {
logger . warn ( String . format (
"subscribeURLs failed, serviceName = %s, try reSubscribe again" ,
serviceName ) , e ) ;
addReSubscribeMetadataJob ( serviceName , 0 ) ;
}
}
}
}
}
@ -212,8 +246,19 @@ public class DubboCloudRegistry extends FailbackRegistry {
} ) ;
} ) ;
}
}
private void subscribeURLs ( URL url , Set < String > serviceNames ,
void addReSubscribeMetadataJob ( String serviceName , int count ) {
NotifyListener listener ) {
if ( count > maxReSubscribeMetadataTimes ) {
logger . error (
"reSubscribe failed too many times, serviceName = {}, count = {}" ,
serviceName , count ) ;
return ;
}
ReSubscribeMetadataJob job = new ReSubscribeMetadataJob ( serviceName , this , count ) ;
reConnectJobMap . put ( serviceName , job ) ;
reConnectPool . schedule ( job , reSubscribeMetadataIntervial , TimeUnit . SECONDS ) ;
}
void subscribeURLs ( URL url , Set < String > serviceNames , NotifyListener listener ) {
List < URL > subscribedURLs = new LinkedList < > ( ) ;
List < URL > subscribedURLs = new LinkedList < > ( ) ;
@ -252,6 +297,10 @@ public class DubboCloudRegistry extends FailbackRegistry {
serviceName ) ) ;
serviceName ) ) ;
}
}
}
}
else {
logger . debug ( "subscribe from serviceName = {}, size = {}" , serviceName ,
serviceInstances . size ( ) ) ;
}
List < URL > exportedURLs = getExportedURLs ( subscribedURL , serviceName ,
List < URL > exportedURLs = getExportedURLs ( subscribedURL , serviceName ,
serviceInstances ) ;
serviceInstances ) ;
@ -308,6 +357,16 @@ public class DubboCloudRegistry extends FailbackRegistry {
String protocol = templateURL . getProtocol ( ) ;
String protocol = templateURL . getProtocol ( ) ;
Integer port = repository . getDubboProtocolPort ( serviceInstance ,
Integer port = repository . getDubboProtocolPort ( serviceInstance ,
protocol ) ;
protocol ) ;
// reserve tag
String tag = null ;
List < URL > urls = jsonUtils . toURLs ( serviceInstance . getMetadata ( )
. get ( "dubbo.metadata-service.urls" ) ) ;
if ( urls ! = null & & urls . size ( ) > 0 ) {
Map < String , String > parameters = urls . get ( 0 ) . getParameters ( ) ;
tag = parameters . get ( "dubbo.tag" ) ;
}
if ( Objects . equals ( templateURL . getHost ( ) , host )
if ( Objects . equals ( templateURL . getHost ( ) , host )
& & Objects . equals ( templateURL . getPort ( ) , port ) ) { // use
& & Objects . equals ( templateURL . getPort ( ) , port ) ) { // use
// templateURL
// templateURL
@ -331,7 +390,8 @@ public class DubboCloudRegistry extends FailbackRegistry {
// the template
// the template
// URL
// URL
. setHost ( host ) // reset the host
. setHost ( host ) // reset the host
. setPort ( port ) ; // reset the port
. setPort ( port ) // reset the port
. addParameter ( "dubbo.tag" , tag ) ; // reset the tag
return clonedURLBuilder . build ( ) ;
return clonedURLBuilder . build ( ) ;
}
}
@ -378,7 +438,7 @@ public class DubboCloudRegistry extends FailbackRegistry {
return metadata . containsKey ( METADATA_SERVICE_URLS_PROPERTY_NAME ) ;
return metadata . containsKey ( METADATA_SERVICE_URLS_PROPERTY_NAME ) ;
}
}
private Set < String > getServices ( URL url ) {
Set < String > getServices ( URL url ) {
Set < String > subscribedServices = repository . getSubscribedServices ( ) ;
Set < String > subscribedServices = repository . getSubscribedServices ( ) ;
// TODO Add the filter feature
// TODO Add the filter feature
return subscribedServices ;
return subscribedServices ;
@ -408,11 +468,6 @@ public class DubboCloudRegistry extends FailbackRegistry {
listener . notify ( subscribedURLs ) ;
listener . notify ( subscribedURLs ) ;
}
}
private List < ServiceInstance > getServiceInstances ( Iterable < String > serviceNames ) {
return stream ( serviceNames . spliterator ( ) , false ) . map ( this : : getServiceInstances )
. flatMap ( Collection : : stream ) . collect ( Collectors . toList ( ) ) ;
}
private List < ServiceInstance > getServiceInstances ( String serviceName ) {
private List < ServiceInstance > getServiceInstances ( String serviceName ) {
return hasText ( serviceName ) ? doGetServiceInstances ( serviceName ) : emptyList ( ) ;
return hasText ( serviceName ) ? doGetServiceInstances ( serviceName ) : emptyList ( ) ;
}
}
@ -460,27 +515,38 @@ public class DubboCloudRegistry extends FailbackRegistry {
private void subscribeDubboMetadataServiceURLs ( URL subscribedURL ,
private void subscribeDubboMetadataServiceURLs ( URL subscribedURL ,
NotifyListener listener ) {
NotifyListener listener ) {
// Sync subscription
subscribeDubboMetadataServiceURLs ( subscribedURL , listener ,
subscribeDubboMetadataServiceURLs ( subscribedURL , listener ,
getServiceName ( subscribedURL ) ) ;
getServiceName ( subscribedURL ) ) ;
// Sync subscription
// Sync subscription
if ( containsProviderCategory ( subscribedURL ) ) {
if ( containsProviderCategory ( subscribedURL ) ) {
registerServiceInstancesChangedListener ( subscribedURL ,
registerServiceInstancesChangedListener ( subscribedURL ,
new ApplicationListener< ServiceInstancesChangedEvent > ( ) {
new ServiceInstanceChangeListener ( ) {
private final URL url2subscribe = subscribedURL ;
@Override
public int getOrder ( ) {
return Ordered . LOWEST_PRECEDENCE - 1 ;
}
@Override
@Override
@Order ( Ordered . LOWEST_PRECEDENCE - 1 )
public void onApplicationEvent (
public void onApplicationEvent (
ServiceInstancesChangedEvent event ) {
ServiceInstancesChangedEvent event ) {
String sourceServiceName = event . getServiceName ( ) ;
String sourceServiceName = event . getServiceName ( ) ;
List < ServiceInstance > serviceInstances = event
. getServiceInstances ( ) ;
String serviceName = getServiceName ( subscribedURL ) ;
String serviceName = getServiceName ( subscribedURL ) ;
if ( Objects . equals ( sourceServiceName , serviceName ) ) {
if ( Objects . equals ( sourceServiceName , serviceName ) ) {
logger . debug (
"handle serviceInstanceChange of metadata service, serviceName = {}, subscribeUrl={}" ,
event . getServiceName ( ) ,
subscribedURL . getServiceKey ( ) ) ;
// only update serviceInstances of the specified
// serviceName
subscribeDubboMetadataServiceURLs ( subscribedURL , listener ,
subscribeDubboMetadataServiceURLs ( subscribedURL , listener ,
sourceServiceName ) ;
sourceServiceName , serviceInstances );
}
}
}
}
@ -498,34 +564,25 @@ public class DubboCloudRegistry extends FailbackRegistry {
}
}
private void subscribeDubboMetadataServiceURLs ( URL subscribedURL ,
private void subscribeDubboMetadataServiceURLs ( URL subscribedURL ,
NotifyListener listener , String serviceName ) {
NotifyListener listener , String serviceName ,
List < ServiceInstance > serviceInstances ) {
String serviceInterface = subscribedURL . getServiceInterface ( ) ;
String serviceInterface = subscribedURL . getServiceInterface ( ) ;
String version = subscribedURL . getParameter ( VERSION_KEY ) ;
String version = subscribedURL . getParameter ( VERSION_KEY ) ;
String protocol = subscribedURL . getParameter ( PROTOCOL_KEY ) ;
String protocol = subscribedURL . getParameter ( PROTOCOL_KEY ) ;
List < ServiceInstance > serviceInstances = getServiceInstances ( serviceName ) ;
List < URL > urls = dubboMetadataUtils . getDubboMetadataServiceURLs ( serviceInstances ,
List < URL > urls = dubboMetadataUtils . getDubboMetadataServiceURLs ( serviceInstances ,
serviceInterface , version , protocol ) ;
serviceInterface , version , protocol ) ;
notifyAllSubscribedURLs ( subscribedURL , urls , listener ) ;
notifyAllSubscribedURLs ( subscribedURL , urls , listener ) ;
}
}
// private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
private void subscribeDubboMetadataServiceURLs ( URL subscribedURL ,
// NotifyListener listener, Set<String> serviceNames) {
NotifyListener listener , String serviceName ) {
//
List < ServiceInstance > serviceInstances = getServiceInstances ( serviceName ) ;
// String serviceInterface = subscribedURL.getServiceInterface();
subscribeDubboMetadataServiceURLs ( subscribedURL , listener , serviceName ,
// String version = subscribedURL.getParameter(VERSION_KEY);
serviceInstances ) ;
// String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
}
//
// List<ServiceInstance> serviceInstances = getServiceInstances(serviceNames);
//
// List<URL> urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
// serviceInterface, version, protocol);
//
// notifyAllSubscribedURLs(subscribedURL, urls, listener);
// }
private boolean containsProviderCategory ( URL subscribedURL ) {
private boolean containsProviderCategory ( URL subscribedURL ) {
String category = subscribedURL . getParameter ( CATEGORY_KEY ) ;
String category = subscribedURL . getParameter ( CATEGORY_KEY ) ;
@ -546,6 +603,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
return ADMIN_PROTOCOL . equals ( url . getProtocol ( ) ) ;
return ADMIN_PROTOCOL . equals ( url . getProtocol ( ) ) ;
}
}
public Map < URL , NotifyListener > getUrlNotifyListenerMap ( ) {
return urlNotifyListenerMap ;
}
public Map < String , ReSubscribeMetadataJob > getReConnectJobMap ( ) {
return reConnectJobMap ;
}
protected boolean isDubboMetadataServiceURL ( URL url ) {
protected boolean isDubboMetadataServiceURL ( URL url ) {
return DUBBO_METADATA_SERVICE_CLASS_NAME . equals ( url . getServiceInterface ( ) ) ;
return DUBBO_METADATA_SERVICE_CLASS_NAME . equals ( url . getServiceInterface ( ) ) ;
}
}