@ -16,28 +16,27 @@
package com.alibaba.cloud.dubbo.registry ;
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.LinkedList ;
import java.util.List ;
import java.util.Map ;
import java.util.Objects ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ScheduledThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import java.util.function.Supplier ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.stream.Collectors ;
import com.alibaba.cloud.commons.lang.StringUtils ;
import com.alibaba.cloud.dubbo.metadata.RevisionResolver ;
import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository ;
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent ;
import com.alibaba.cloud.dubbo.service.DubboGenericServiceFactory ;
import com.alibaba.cloud.dubbo.service.DubboMetadataService ;
import com.alibaba.cloud.dubbo.service.DubboMetadataServiceProxy ;
import com.alibaba.cloud.dubbo.util.DubboMetadataUtils ;
import com.alibaba.cloud.dubbo.util.JSONUtils ;
import org.apache.dubbo.common.URL ;
import org.apache.dubbo.common.URLBuilder ;
import org.apache.dubbo.registry.NotifyListener ;
import org.apache.dubbo.registry.support.FailbackRegistry ;
import org.slf4j.Logger ;
@ -47,23 +46,14 @@ import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient ;
import org.springframework.context.ApplicationListener ;
import org.springframework.context.ConfigurableApplicationContext ;
import org.springframework.core.Ordered ;
import org.springframework.util.CollectionUtils ;
import static java.lang.String.format ;
import static java.util.Collections.emptyList ;
import static org.apache.dubbo.common. URLBuilder.from ;
import static org.apache.dubbo.common. constants.CommonConstants.CONSUMER ;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY ;
import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY ;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY ;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER ;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE ;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY ;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY ;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY ;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY ;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL ;
import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty ;
import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL ;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URLS_PROPERTY_NAME ;
import static org.springframework.util.StringUtils.hasText ;
@ -72,22 +62,14 @@ import static org.springframework.util.StringUtils.hasText;
* Dubbo Cloud { @link FailbackRegistry } is based on Spring Cloud { @link DiscoveryClient } .
*
* @author < a href = "mailto:mercyblitz@gmail.com" > Mercy < / a >
* @author < a href = "mailto:chenxilzx1@gmail.com" > theonefx < / a >
* /
public class DubboCloudRegistry extends FailbackRegistry {
/ * *
* The parameter name of { @link # servicesLookupInterval } .
* /
public static final String SERVICES_LOOKUP_INTERVAL_PARAM_NAME = "dubbo.services.lookup.interval" ;
public class DubboCloudRegistry extends FailbackRegistry
implements ApplicationListener < ServiceInstancesChangedEvent > {
protected static final String DUBBO_METADATA_SERVICE_CLASS_NAME = DubboMetadataService . class
. getName ( ) ;
/ * *
* Caches the IDs of { @link ApplicationListener } .
* /
private static final Set < String > REGISTER_LISTENERS = new HashSet < > ( ) ;
protected final Logger logger = LoggerFactory . getLogger ( getClass ( ) ) ;
private final DiscoveryClient discoveryClient ;
@ -98,334 +80,362 @@ public class DubboCloudRegistry extends FailbackRegistry {
private final JSONUtils jsonUtils ;
private final DubboGenericServiceFactory dubboGenericServiceFactory ;
private final DubboMetadataUtils dubboMetadataUtils ;
/ * *
* The interval in second of lookup service names ( only for Dubbo - OPS ) .
* /
private final long servicesLookupInterval ;
private final ConfigurableApplicationContext applicationContext ;
private final String currentApplicationName ;
private final Map < URL , NotifyListener > urlNotifyListenerMap = new ConcurrentHashMap < > ( ) ;
private final ReSubscribeManager reSubscribeManager ;
private final Map< String , ReSubscribeMetadataJob > reConnectJobMap = new ConcurrentHashMap < > ( ) ;
private final AtomicBoolean inited = new AtomicBoolean ( false ) ;
private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor (
2 ) ;
/ * *
* { subscribedURL : ServiceSubscribeHandler } .
* /
private final Map < URL , GenearalServiceSubscribeHandler > urlSubscribeHandlerMap = new ConcurrentHashMap < > ( ) ;
private final int maxReSubscribeMetadataTimes ;
/ * *
* { appName : MetadataServiceSubscribeHandler } .
* /
private final Map < String , MetadataServiceSubscribeHandler > metadataSubscribeHandlerMap = new ConcurrentHashMap < > ( ) ;
private final int reSubscribeMetadataIntervial ;
/ * *
* { appName : { revision : [ instances ] } } .
* /
private final Map < String , Map < String , List < ServiceInstance > > > serviceRevisionInstanceMap = new ConcurrentHashMap < > ( ) ;
public DubboCloudRegistry ( URL url , DiscoveryClient discoveryClient ,
DubboServiceMetadataRepository repository ,
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy ,
JSONUtils jsonUtils , DubboGenericServiceFactory dubboGenericServiceFactory ,
ConfigurableApplicationContext applicationContext ,
int maxReSubscribeMetadataTimes , int reSubscribeMetadataIntervial ) {
JSONUtils jsonUtils , ConfigurableApplicationContext applicationContext ) {
super ( url ) ;
this . servicesLookupInterval = url
. getParameter ( SERVICES_LOOKUP_INTERVAL_PARAM_NAME , 60L ) ;
this . discoveryClient = discoveryClient ;
this . repository = repository ;
this . dubboMetadataConfigServiceProxy = dubboMetadataConfigServiceProxy ;
this . jsonUtils = jsonUtils ;
this . dubboGenericServiceFactory = dubboGenericServiceFactory ;
this . applicationContext = applicationContext ;
this . dubboMetadataUtils = getBean ( DubboMetadataUtils . class ) ;
this . currentApplicationName = dubboMetadataUtils . getCurrentApplicationName ( ) ;
this . maxReSubscribeMetadataTimes = maxReSubscribeMetadataTimes ;
this . reSubscribeMetadataIntervial = reSubscribeMetadataIntervial ;
this . reSubscribeManager = new ReSubscribeManager ( this ) ;
}
private void preInit ( ) {
if ( inited . compareAndSet ( false , true ) ) {
Set < String > subscribeApps = getServices ( null ) ;
for ( String appName : subscribeApps ) {
List < ServiceInstance > instances = discoveryClient . getInstances ( appName ) ;
Map < String , List < ServiceInstance > > map = serviceRevisionInstanceMap
. computeIfAbsent ( appName , k - > new HashMap < > ( ) ) ;
for ( ServiceInstance instance : instances ) {
String revision = RevisionResolver . getRevision ( instance ) ;
List < ServiceInstance > list = map . computeIfAbsent ( revision ,
k - > new ArrayList < > ( ) ) ;
list . add ( instance ) ;
}
if ( map . size ( ) = = 0 ) {
logger . debug ( "APP {} preInited, instance siez is zero!!" , appName ) ;
}
else {
map . forEach ( ( revision , list ) - > logger . debug (
"APP {} revision {} preInited, instance size = {}" , appName ,
revision , list . size ( ) ) ) ;
}
}
metadataSubscribeHandlerMap . forEach ( ( url , handler ) - > handler . init ( ) ) ;
urlSubscribeHandlerMap . forEach ( ( url , handler ) - > handler . init ( ) ) ;
repository . initializeMetadata ( ) ;
reConnectPool . setKeepAliveTime ( 10 , TimeUnit . MINUTES ) ;
reConnectPool . allowCoreThreadTimeOut ( true ) ;
// meke sure everything prepared, then can listening
// ServiceInstanceChangeEvent
applicationContext . addApplicationListener ( this ) ;
logger . info ( "DubboCloudRegistry preInit Done." ) ;
}
}
private < T > T getBean ( Class < T > beanClass ) {
pr otected < T > T getBean ( Class < T > beanClass ) {
return this . applicationContext . getBean ( beanClass ) ;
}
protected boolean shouldRegister ( URL url ) {
protected boolean should Not Register( URL url ) {
String side = url . getParameter ( SIDE_KEY ) ;
boolean should = PROVIDER_SIDE . equals ( side ) ; // Only register the Provider.
if ( ! should ) {
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "The URL[{}] should not be registered." , url . toString ( ) ) ;
if ( logger . isDebugEnabled ( ) ) {
if ( ! should ) {
logger . debug ( "The URL should NOT!! be registered & unregistered [{}] ." ,
url ) ;
}
else {
logger . debug ( "The URL should be registered & unregistered [{}] ." , url ) ;
}
}
return should ;
return ! should ;
}
@Override
public final void doRegister ( URL url ) {
if ( ! shouldRegister ( url ) ) {
return ;
synchronized ( this ) {
preInit ( ) ;
if ( shouldNotRegister ( url ) ) {
return ;
}
repository . exportURL ( url ) ;
}
repository . exportURL ( url ) ;
}
@Override
public final void doUnregister ( URL url ) {
if ( ! shouldRegister ( url ) ) {
return ;
synchronized ( this ) {
preInit ( ) ;
if ( shouldNotRegister ( url ) ) {
return ;
}
repository . unexportURL ( url ) ;
}
repository . unexportURL ( url ) ;
}
@Override
public final void doSubscribe ( URL url , NotifyListener listener ) {
if ( isAdminURL ( url ) ) {
// TODO in future
if ( logger . isWarnEnabled ( ) ) {
logger . warn ( "This feature about admin will be supported in the future." ) ;
synchronized ( this ) {
preInit ( ) ;
if ( isAdminURL ( url ) ) {
// TODO in future
if ( logger . isWarnEnabled ( ) ) {
logger . warn (
"This feature about admin will be supported in the future." ) ;
}
}
else if ( isDubboMetadataServiceURL ( url ) & & containsProviderCategory ( url ) ) {
// for DubboMetadataService
String appName = getServiceName ( url ) ;
MetadataServiceSubscribeHandler handler = new MetadataServiceSubscribeHandler (
appName , url , listener , this , dubboMetadataUtils ) ;
if ( inited . get ( ) ) {
handler . init ( ) ;
}
metadataSubscribeHandlerMap . put ( appName , handler ) ;
}
else if ( isConsumerServiceURL ( url ) ) {
// for general Dubbo Services
GenearalServiceSubscribeHandler handler = new GenearalServiceSubscribeHandler (
url , listener , this , repository , jsonUtils ,
dubboMetadataConfigServiceProxy ) ;
if ( inited . get ( ) ) {
handler . init ( ) ;
}
urlSubscribeHandlerMap . put ( url , handler ) ;
}
}
else if ( isDubboMetadataServiceURL ( url ) ) { // for DubboMetadataService
subscribeDubboMetadataServiceURLs ( url , listener ) ;
}
else { // for general Dubbo Services
subscribeURLs ( url , listener ) ;
urlNotifyListenerMap . put ( url , listener ) ;
}
}
private void subscribeURLs ( URL url , NotifyListener listener ) {
// Sync subscription
subscribeURLs ( url , getServices ( url ) , listener ) ;
/ * *
* Process ServiceInstanceChangedEvent , refresh dubbo reference and metadata info .
* /
@Override
public void onApplicationEvent ( ServiceInstancesChangedEvent event ) {
// Async subscription
registerServiceInstancesChangedListener ( url ,
String appName = event . getServiceName ( ) ;
new ServiceInstanceChangeListener ( ) {
List < ServiceInstance > instances = filter ( event . getServiceInstances ( ) ! = null
? event . getServiceInstances ( ) : Collections . emptyList ( ) ) ;
@Override
public int getOrder ( ) {
return Ordered . LOWEST_PRECEDENCE ;
}
Set < String > subscribedServiceNames = getServices ( null ) ;
@Override
public void onApplicationEvent ( ServiceInstancesChangedEvent event ) {
Set < String > serviceNames = getServices ( url ) ;
String serviceName = event . getServiceName ( ) ;
if ( serviceNames . contains ( serviceName ) ) {
logger . debug (
"handle serviceInstanceChange of general service, serviceName = {}, subscribeUrl={}" ,
event . getServiceName ( ) , url . getServiceKey ( ) ) ;
try {
subscribeURLs ( url , serviceNames , listener ) ;
reConnectJobMap . remove ( serviceName ) ;
}
catch ( Exception e ) {
logger . warn ( String . format (
"subscribeURLs failed, serviceName = %s, try reSubscribe again" ,
serviceName ) , e ) ;
addReSubscribeMetadataJob ( serviceName , 0 ) ;
}
}
}
@Override
public String toString ( ) {
return "ServiceInstancesChangedEventListener:"
+ url . getServiceKey ( ) ;
}
} ) ;
}
void addReSubscribeMetadataJob ( String serviceName , int count ) {
if ( count > maxReSubscribeMetadataTimes ) {
logger . error (
"reSubscribe failed too many times, serviceName = {}, count = {}" ,
serviceName , count ) ;
if ( ! subscribedServiceNames . contains ( appName ) ) {
return ;
}
ReSubscribeMetadataJob job = new ReSubscribeMetadataJob ( serviceName , this , count ) ;
reConnectJobMap . put ( serviceName , job ) ;
reConnectPool . schedule ( job , reSubscribeMetadataIntervial , TimeUnit . SECONDS ) ;
}
void subscribeURLs ( URL url , Set < String > serviceNames , NotifyListener listener ) {
if ( instances . size ( ) = = 0 ) {
logger . warn ( "APP {} instance changed, size changed zero!!!" , appName ) ;
}
else {
logger . info ( "APP {} instance changed, size changed to {}" , appName ,
instances . size ( ) ) ;
}
// group by revision
Map < String , List < ServiceInstance > > newGroup = instances . stream ( )
. collect ( Collectors . groupingBy ( RevisionResolver : : getRevision ) ) ;
List < URL > subscribedURLs = new LinkedList < > ( ) ;
synchronized ( this ) {
serviceNames . forEach ( serviceName - > {
Map < String , List < ServiceInstance > > oldGroup = serviceRevisionInstanceMap
. computeIfAbsent ( appName , k - > new HashMap < > ( ) ) ;
subscribeURLs ( url , subscribedURLs , serviceName ,
( ) - > getServiceInstances ( serviceName ) ) ;
if ( serviceInstanceNotChanged ( oldGroup , newGroup ) ) {
logger . debug ( "APP {} instance changed, but nothing different" , appName ) ;
return ;
}
} ) ;
try {
// Notify all
notifyAllSubscribedURLs ( url , subscribedURLs , listener ) ;
}
// ensure that the service metadata is correct
refreshServiceMetadataInfo ( appName , instances ) ;
// then , refresh general service associated with current application
refreshGeneralServiceInfo ( appName , oldGroup , newGroup ) ;
private void registerServiceInstancesChangedListener ( URL url ,
ApplicationListener < ServiceInstancesChangedEvent > listener ) {
String listenerId = generateId ( url ) ;
if ( REGISTER_LISTENERS . add ( listenerId ) ) {
applicationContext . addApplicationListener ( listener ) ;
// mark process successful
reSubscribeManager . onRefreshSuccess ( event ) ;
}
catch ( Exception e ) {
logger . error ( String . format (
"APP %s instance changed, handler faild, try resubscribe" ,
appName ) , e ) ;
reSubscribeManager . onRefreshFail ( event ) ;
}
}
}
private void subscribeURLs ( URL subscribedURL , List < URL > subscribedURLs ,
String serviceName ,
Supplier < List < ServiceInstance > > serviceInstancesSupplier ) {
List < ServiceInstance > serviceInstances = serviceInstancesSupplier . get ( ) ;
subscribeURLs ( subscribedURL , subscribedURLs , serviceName , serviceInstances ) ;
}
private void refreshGeneralServiceInfo ( String appName ,
Map < String , List < ServiceInstance > > oldGroup ,
Map < String , List < ServiceInstance > > newGroup ) {
Set < URL > urls2refresh = new HashSet < > ( ) ;
private void subscribeURLs ( URL subscribedURL , List < URL > subscribedURLs ,
String serviceName , List < ServiceInstance > serviceInstances ) {
// compare with local
for ( String revision : oldGroup . keySet ( ) ) {
if ( CollectionUtils . isEmpty ( serviceInstances ) ) {
if ( logger . isWarnEnabled ( ) ) {
logger . warn ( format ( "There is no instance in service[name : %s]" ,
serviceName ) ) ;
if ( ! newGroup . containsKey ( revision ) ) {
// all instances of this list with revision has losted
urlSubscribeHandlerMap . forEach ( ( url , handler ) - > {
if ( handler . relatedWith ( appName , revision ) ) {
handler . removeAppNameWithRevision ( appName , revision ) ;
urls2refresh . add ( url ) ;
}
} ) ;
logger . debug ( "Subscription app {} revision {} has all losted" , appName ,
revision ) ;
}
}
else {
logger . debug ( "subscribe from serviceName = {}, size = {}" , serviceName ,
serviceInstances . size ( ) ) ;
}
List < URL > exportedURLs = getExportedURLs ( subscribedURL , serviceName ,
serviceInstances ) ;
/ * *
* Add the exported URLs from { @link MetadataService }
* /
subscribedURLs . addAll ( exportedURLs ) ;
}
for ( Map . Entry < String , List < ServiceInstance > > entry : newGroup . entrySet ( ) ) {
String revision = entry . getKey ( ) ;
List < ServiceInstance > instanceList = entry . getValue ( ) ;
private List < URL > getExportedURLs ( URL subscribedURL , String serviceName ,
List < ServiceInstance > serviceInstances ) {
if ( ! oldGroup . containsKey ( revision ) ) {
// this instance list of revision not exists
// should acquire urls
urlSubscribeHandlerMap . forEach (
( url , handler ) - > handler . init ( appName , revision , instanceList ) ) ;
}
List < ServiceInstance > validServiceInstances = filter ( serviceInstances ) ;
urlSubscribeHandlerMap . forEach ( ( url , handler ) - > {
if ( handler . relatedWith ( appName , revision ) ) {
urls2refresh . add ( url ) ;
}
} ) ;
// If there is no valid ServiceInstance, return empty result
if ( isEmpty ( validServiceInstances ) ) {
if ( logger . isWarnEnabled ( ) ) {
logger . warn (
"There is no instance from service[name : {}], and then Dubbo Service[key : {}] will not be "
+ "available , please make sure the further impact" ,
serviceName , subscribedURL . getServiceKey ( ) ) ;
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "Subscription app {} revision {} changed, instance list {}" ,
appName , revision ,
instanceList . stream ( ) . map (
instance - > instance . getHost ( ) + ":" + instance . getPort ( ) )
. collect ( Collectors . toList ( ) ) ) ;
}
return emptyList ( ) ;
}
List < URL > subscribedURLs = cloneExportedURLs ( subscribedURL , serviceInstances ) ;
// clear local service instances, help GC
validServiceInstances . clear ( ) ;
serviceRevisionInstanceMap . put ( appName , newGroup ) ;
return subscribedURLs ;
if ( urls2refresh . size ( ) = = 0 ) {
logger . debug ( "Subscription app {}, no urls will be refreshed" , appName ) ;
}
else {
logger . debug ( "Subscription app {}, the following url will be refresh:{}" ,
appName , urls2refresh . stream ( ) . map ( URL : : getServiceKey )
. collect ( Collectors . toList ( ) ) ) ;
for ( URL url : urls2refresh ) {
GenearalServiceSubscribeHandler handler = urlSubscribeHandlerMap . get ( url ) ;
if ( handler = = null ) {
logger . warn ( "Subscription app {}, can't find handler for service {}" ,
appName , url . getServiceKey ( ) ) ;
continue ;
}
handler . refresh ( ) ;
}
}
}
/ * *
* Clone the subscribed URLs based on the template URLs .
* @param subscribedURL the URL to be subscribed
* @param serviceInstances the list of { @link ServiceInstance service instances }
* @return non - null
* /
private List < URL > cloneExportedURLs ( URL subscribedURL ,
private void refreshServiceMetadataInfo ( String serviceName ,
List < ServiceInstance > serviceInstances ) {
MetadataServiceSubscribeHandler handler = metadataSubscribeHandlerMap
. get ( serviceName ) ;
List < URL > clonedExportedURLs = new LinkedList < > ( ) ;
serviceInstances . forEach ( serviceInstance - > {
String host = serviceInstance . getHost ( ) ;
getTemplateExportedURLs ( subscribedURL , serviceInstances ) . stream ( )
. map ( templateURL - > templateURL . removeParameter ( TIMESTAMP_KEY ) )
. map ( templateURL - > templateURL . removeParameter ( PID_KEY ) )
. map ( templateURL - > {
String protocol = templateURL . getProtocol ( ) ;
Integer port = repository . getDubboProtocolPort ( serviceInstance ,
protocol ) ;
// reserve tag
String tag = null ;
List < URL > urls = jsonUtils . toURLs ( serviceInstance . getMetadata ( )
. get ( "dubbo.metadata-service.urls" ) ) ;
if ( urls ! = null & & urls . size ( ) > 0 ) {
Map < String , String > parameters = urls . get ( 0 ) . getParameters ( ) ;
tag = parameters . get ( "dubbo.tag" ) ;
}
if ( Objects . equals ( templateURL . getHost ( ) , host )
& & Objects . equals ( templateURL . getPort ( ) , port ) ) { // use
// templateURL
// if
// equals
return templateURL ;
}
if ( port = = null ) {
if ( logger . isWarnEnabled ( ) ) {
logger . warn (
"The protocol[{}] port of Dubbo service instance[host : {}] "
+ "can't be resolved" ,
protocol , host ) ;
}
return null ;
}
else {
URLBuilder clonedURLBuilder = from ( templateURL ) // remove the
// parameters from
// the template
// URL
. setHost ( host ) // reset the host
. setPort ( port ) // reset the port
. addParameter ( "dubbo.tag" , tag ) ; // reset the tag
return clonedURLBuilder . build ( ) ;
}
} ) . filter ( Objects : : nonNull ) . forEach ( clonedExportedURLs : : add ) ;
} ) ;
return clonedExportedURLs ;
if ( handler = = null ) {
logger . warn ( "Subscription app {}, can't find metadata handler" , serviceName ) ;
return ;
}
handler . refresh ( serviceInstances ) ;
}
private List < URL > getTemplateExportedURLs ( URL subscribedURL ,
List < ServiceInstance > serviceInstances ) {
private boolean serviceInstanceNotChanged ( Map < String , List < ServiceInstance > > oldGroup ,
Map < String , List < ServiceInstance > > newGroup ) {
if ( newGroup . size ( ) ! = oldGroup . size ( ) ) {
return false ;
}
DubboMetadataService dubboMetadataService = getProxy ( serviceInstances ) ;
for ( Map . Entry < String , List < ServiceInstance > > entry : newGroup . entrySet ( ) ) {
String appName = entry . getKey ( ) ;
List < ServiceInstance > newInstances = entry . getValue ( ) ;
List < URL > templateExportedURLs = emptyList ( ) ;
if ( ! oldGroup . containsKey ( appName ) ) {
return false ;
}
if ( dubboMetadataService ! = null ) {
templateExportedURLs = getExportedURLs ( dubboMetadataService , subscribedURL ) ;
}
else {
if ( logger . isWarnEnabled ( ) ) {
logger . warn (
"The metadata of Dubbo service[key : {}] still can't be found, it could effect the further "
+ "Dubbo service invocation" ,
subscribedURL . getServiceKey ( ) ) ;
List < ServiceInstance > oldInstances = oldGroup . get ( appName ) ;
if ( newInstances . size ( ) ! = oldInstances . size ( ) ) {
return false ;
}
boolean matched = newInstances . stream ( ) . allMatch ( newInstance - > {
for ( ServiceInstance oldInstance : oldInstances ) {
if ( instanceSame ( newInstance , oldInstance ) ) {
return true ;
}
}
return false ;
} ) ;
if ( ! matched ) {
return false ;
}
}
return templateExportedURLs ;
return true ;
}
private DubboMetadataService getProxy ( List < ServiceInstance > serviceInstances ) {
return dubboMetadataConfigServiceProxy . getProxy ( serviceInstances ) ;
private boolean instanceSame ( ServiceInstance newInstance ,
ServiceInstance oldInstance ) {
if ( ! StringUtils . equals ( newInstance . getInstanceId ( ) ,
oldInstance . getInstanceId ( ) ) ) {
return false ;
}
if ( ! StringUtils . equals ( newInstance . getHost ( ) , oldInstance . getHost ( ) ) ) {
return false ;
}
if ( ! StringUtils . equals ( newInstance . getServiceId ( ) , oldInstance . getServiceId ( ) ) ) {
return false ;
}
if ( ! StringUtils . equals ( newInstance . getScheme ( ) , oldInstance . getScheme ( ) ) ) {
return false ;
}
if ( oldInstance . getPort ( ) ! = newInstance . getPort ( ) ) {
return false ;
}
if ( ! oldInstance . getMetadata ( ) . equals ( newInstance . getMetadata ( ) ) ) {
return false ;
}
return true ;
}
private List < ServiceInstance > filter ( Collection < ServiceInstance > serviceInstances ) {
@ -438,37 +448,16 @@ public class DubboCloudRegistry extends FailbackRegistry {
return metadata . containsKey ( METADATA_SERVICE_URLS_PROPERTY_NAME ) ;
}
Set < String > getServices ( URL url ) {
private Set < String > getServices ( URL url ) {
Set < String > subscribedServices = repository . getSubscribedServices ( ) ;
if ( subscribedServices . contains ( "*" ) ) {
subscribedServices = new HashSet < > ( discoveryClient . getServices ( ) ) ;
}
// TODO Add the filter feature
return subscribedServices ;
}
private void notifyAllSubscribedURLs ( URL url , List < URL > subscribedURLs ,
NotifyListener listener ) {
if ( isEmpty ( subscribedURLs ) ) {
// Add the EMPTY_PROTOCOL URL
subscribedURLs . add ( emptyURL ( url ) ) ;
// if (isDubboMetadataServiceURL(url)) {
// if meta service change, and serviceInstances is zero, will clean up
// information about this client
// String serviceName = url.getParameter(GROUP_KEY);
// repository.removeMetadataAndInitializedService(serviceName, url);
// }
}
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( "The subscribed URL[{}] will notify all URLs : {}" , url ,
subscribedURLs ) ;
}
// Notify all
listener . notify ( subscribedURLs ) ;
}
private List < ServiceInstance > getServiceInstances ( String serviceName ) {
List < ServiceInstance > getServiceInstances ( String serviceName ) {
return hasText ( serviceName ) ? doGetServiceInstances ( serviceName ) : emptyList ( ) ;
}
@ -485,108 +474,14 @@ public class DubboCloudRegistry extends FailbackRegistry {
return serviceInstances ;
}
private String generateId ( URL url ) {
return url . toString ( ) ;
}
private URL emptyURL ( URL url ) {
// issue : When the last service provider is closed, the client still periodically
// connects to the last provider.n
// fix https://github.com/alibaba/spring-cloud-alibaba/issues/1259
return from ( url ) . setProtocol ( EMPTY_PROTOCOL ) . removeParameter ( CATEGORY_KEY )
. build ( ) ;
}
private List < URL > getExportedURLs ( DubboMetadataService dubboMetadataService ,
URL subscribedURL ) {
String serviceInterface = subscribedURL . getServiceInterface ( ) ;
String group = subscribedURL . getParameter ( GROUP_KEY ) ;
String version = subscribedURL . getParameter ( VERSION_KEY ) ;
// The subscribed protocol may be null
String subscribedProtocol = subscribedURL . getParameter ( PROTOCOL_KEY ) ;
String exportedURLsJSON = dubboMetadataService . getExportedURLs ( serviceInterface ,
group , version ) ;
return jsonUtils . toURLs ( exportedURLsJSON ) . stream ( )
. filter ( exportedURL - > subscribedProtocol = = null
| | subscribedProtocol . equalsIgnoreCase ( exportedURL . getProtocol ( ) ) )
. collect ( Collectors . toList ( ) ) ;
}
private void subscribeDubboMetadataServiceURLs ( URL subscribedURL ,
NotifyListener listener ) {
subscribeDubboMetadataServiceURLs ( subscribedURL , listener ,
getServiceName ( subscribedURL ) ) ;
// Sync subscription
if ( containsProviderCategory ( subscribedURL ) ) {
registerServiceInstancesChangedListener ( subscribedURL ,
new ServiceInstanceChangeListener ( ) {
@Override
public int getOrder ( ) {
return Ordered . LOWEST_PRECEDENCE - 1 ;
}
@Override
public void onApplicationEvent (
ServiceInstancesChangedEvent event ) {
String sourceServiceName = event . getServiceName ( ) ;
List < ServiceInstance > serviceInstances = event
. getServiceInstances ( ) ;
String serviceName = getServiceName ( subscribedURL ) ;
if ( Objects . equals ( sourceServiceName , serviceName ) ) {
logger . debug (
"handle serviceInstanceChange of metadata service, serviceName = {}, subscribeUrl={}" ,
event . getServiceName ( ) ,
subscribedURL . getServiceKey ( ) ) ;
// only update serviceInstances of the specified
// serviceName
subscribeDubboMetadataServiceURLs ( subscribedURL , listener ,
sourceServiceName , serviceInstances ) ;
}
}
@Override
public String toString ( ) {
return "ServiceInstancesChangedEventListener:"
+ subscribedURL . getServiceKey ( ) ;
}
} ) ;
}
}
// the group of DubboMetadataService is current application name
private String getServiceName ( URL subscribedURL ) {
return subscribedURL . getParameter ( GROUP_KEY ) ;
}
private void subscribeDubboMetadataServiceURLs ( URL subscribedURL ,
NotifyListener listener , String serviceName ,
List < ServiceInstance > serviceInstances ) {
String serviceInterface = subscribedURL . getServiceInterface ( ) ;
String version = subscribedURL . getParameter ( VERSION_KEY ) ;
String protocol = subscribedURL . getParameter ( PROTOCOL_KEY ) ;
List < URL > urls = dubboMetadataUtils . getDubboMetadataServiceURLs ( serviceInstances ,
serviceInterface , version , protocol ) ;
notifyAllSubscribedURLs ( subscribedURL , urls , listener ) ;
}
private void subscribeDubboMetadataServiceURLs ( URL subscribedURL ,
NotifyListener listener , String serviceName ) {
List < ServiceInstance > serviceInstances = getServiceInstances ( serviceName ) ;
subscribeDubboMetadataServiceURLs ( subscribedURL , listener , serviceName ,
serviceInstances ) ;
}
private boolean containsProviderCategory ( URL subscribedURL ) {
String category = subscribedURL . getParameter ( CATEGORY_KEY ) ;
return category == null ? false : category . contains ( PROVIDER ) ;
return category ! = null & & category . contains ( PROVIDER ) ;
}
@Override
@ -603,16 +498,36 @@ public class DubboCloudRegistry extends FailbackRegistry {
return ADMIN_PROTOCOL . equals ( url . getProtocol ( ) ) ;
}
p ublic Map < URL , NotifyListener > getUrlNotifyListenerMap ( ) {
return urlNotifyListenerMap ;
p rotected boolean isDubboMetadataServiceURL ( URL url ) {
return DUBBO_METADATA_SERVICE_CLASS_NAME. equals ( url . getServiceInterface ( ) ) ;
}
p ublic Map < String , ReSubscribeMetadataJob > getReConnectJobMap ( ) {
return reConnectJobMap ;
p rotected boolean isConsumerServiceURL ( URL url ) {
return CONSUMER. equals ( url . getProtocol ( ) ) ;
}
protected boolean isDubboMetadataServiceURL ( URL url ) {
return DUBBO_METADATA_SERVICE_CLASS_NAME . equals ( url . getServiceInterface ( ) ) ;
public List < ServiceInstance > getServiceInstances ( Map < String , Set < String > > providers ) {
List < ServiceInstance > instances = new ArrayList < > ( ) ;
providers . forEach ( ( appName , revisions ) - > {
Map < String , List < ServiceInstance > > revisionMap = serviceRevisionInstanceMap
. get ( appName ) ;
if ( revisionMap = = null ) {
return ;
}
for ( String revision : revisions ) {
List < ServiceInstance > list = revisionMap . get ( revision ) ;
if ( list ! = null ) {
instances . addAll ( list ) ;
}
}
} ) ;
return instances ;
}
public Map < String , Map < String , List < ServiceInstance > > > getServiceRevisionInstanceMap ( ) {
return serviceRevisionInstanceMap ;
}
}