Merge remote-tracking branch 'upstream/2.2.x' into merge_2.2.x_2022
# Conflicts: # README-zh.md # pom.xml # spring-cloud-alibaba-dependencies/pom.xml # spring-cloud-alibaba-docs/src/main/asciidoc-zh/nacos-discovery.adoc # spring-cloud-alibaba-docs/src/main/asciidoc/nacos-discovery.adoc # spring-cloud-alibaba-examples/nacos-example/nacos-discovery-example/nacos-discovery-provider-example/src/main/java/com/alibaba/cloud/examples/ProviderApplication.java # spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-config/src/main/java/com/alibaba/cloud/nacos/client/NacosPropertySource.java # spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-config/src/main/resources/META-INF/spring.factories # spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java # spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/ribbon/NacosRibbonClientPropertyOverrideTests.java # spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/java/com/alibaba/cloud/sentinel/feign/SentinelFeign.java # spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.javapull/2400/head
commit
42f03eb1c8
@ -1,10 +1,11 @@
|
||||
server.port=18082
|
||||
spring.application.name=service-provider
|
||||
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
|
||||
#spring.cloud.nacos.discovery.instance-enabled=true
|
||||
|
||||
spring.cloud.nacos.username=nacos
|
||||
spring.cloud.nacos.password=nacos
|
||||
|
||||
management.endpoints.web.exposure.include=*
|
||||
management.endpoint.health.show-details=always
|
||||
server.port=18082
|
||||
spring.application.name=service-provider
|
||||
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
|
||||
spring.cloud.nacos.discovery.enabled=true
|
||||
#spring.cloud.nacos.discovery.instance-enabled=true
|
||||
|
||||
spring.cloud.nacos.username=nacos
|
||||
spring.cloud.nacos.password=nacos
|
||||
|
||||
management.endpoints.web.exposure.include=*
|
||||
management.endpoint.health.show-details=always
|
||||
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.nacos.logging;
|
||||
|
||||
import com.alibaba.nacos.client.logging.NacosLogging;
|
||||
|
||||
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.event.GenericApplicationListener;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.ResolvableType;
|
||||
|
||||
/**
|
||||
* Reload nacos log configuration file, after
|
||||
* {@link org.springframework.boot.context.logging.LoggingApplicationListener}.
|
||||
*
|
||||
* @author mai.jh
|
||||
*/
|
||||
public class NacosLoggingListener implements GenericApplicationListener {
|
||||
|
||||
@Override
|
||||
public boolean supportsEventType(ResolvableType resolvableType) {
|
||||
Class<?> type = resolvableType.getRawClass();
|
||||
if (type != null) {
|
||||
return ApplicationEnvironmentPreparedEvent.class.isAssignableFrom(type);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationEvent applicationEvent) {
|
||||
NacosLogging.getInstance().loadConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.HIGHEST_PRECEDENCE + 21;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.nacos.discovery.logging;
|
||||
|
||||
import com.alibaba.nacos.client.logging.NacosLogging;
|
||||
|
||||
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.event.GenericApplicationListener;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.ResolvableType;
|
||||
|
||||
/**
|
||||
* Reload nacos log configuration file, after
|
||||
* {@link org.springframework.boot.context.logging.LoggingApplicationListener}.
|
||||
*
|
||||
* @author mai.jh
|
||||
*/
|
||||
public class NacosLoggingListener implements GenericApplicationListener {
|
||||
|
||||
@Override
|
||||
public boolean supportsEventType(ResolvableType resolvableType) {
|
||||
Class<?> type = resolvableType.getRawClass();
|
||||
if (type != null) {
|
||||
return ApplicationEnvironmentPreparedEvent.class.isAssignableFrom(type);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ApplicationEvent applicationEvent) {
|
||||
NacosLogging.getInstance().loadConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.HIGHEST_PRECEDENCE + 21;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright 2013-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.nacos.ribbon;
|
||||
|
||||
import com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration;
|
||||
import com.netflix.loadbalancer.ConfigurationBasedServerList;
|
||||
import com.netflix.loadbalancer.Server;
|
||||
import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.commons.util.UtilAutoConfiguration;
|
||||
import org.springframework.cloud.netflix.archaius.ArchaiusAutoConfiguration;
|
||||
import org.springframework.cloud.netflix.ribbon.RibbonClients;
|
||||
import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
/**
|
||||
* @author liujunjie
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = NacosRibbonClientPropertyOverrideTests.TestConfiguration.class,
|
||||
properties = { "spring.cloud.nacos.server-addr=127.0.0.1:8848",
|
||||
"spring.cloud.nacos.username=nacos", "spring.cloud.nacos.password=nacos",
|
||||
"spring.cloud.nacos.discovery.port=18080",
|
||||
"spring.cloud.nacos.discovery.service=remoteApp",
|
||||
"localApp.ribbon.NIWSServerListClassName="
|
||||
+ "com.netflix.loadbalancer.ConfigurationBasedServerList",
|
||||
"localApp.ribbon.listOfServers=127.0.0.1:19090",
|
||||
"localApp.ribbon.ServerListRefreshInterval=15000" })
|
||||
public class NacosRibbonClientPropertyOverrideTests {
|
||||
|
||||
@Autowired
|
||||
private SpringClientFactory factory;
|
||||
|
||||
@Test
|
||||
public void serverListOverridesToTest() {
|
||||
ConfigurationBasedServerList.class
|
||||
.cast(getLoadBalancer("localApp").getServerListImpl());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void serverListRemoteTest() {
|
||||
NacosServerList.class.cast(getLoadBalancer("remoteApp").getServerListImpl());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private ZoneAwareLoadBalancer<Server> getLoadBalancer(String name) {
|
||||
return (ZoneAwareLoadBalancer<Server>) this.factory.getLoadBalancer(name);
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@RibbonClients
|
||||
@EnableAutoConfiguration
|
||||
@ImportAutoConfiguration({ UtilAutoConfiguration.class,
|
||||
PropertyPlaceholderAutoConfiguration.class, ArchaiusAutoConfiguration.class,
|
||||
RibbonNacosAutoConfiguration.class, NacosDiscoveryClientConfiguration.class })
|
||||
protected static class TestConfiguration {
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.sentinel;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import com.alibaba.csp.sentinel.adapter.spring.webmvc.SentinelWebInterceptor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
|
||||
|
||||
/**
|
||||
* @author: chao.wu
|
||||
*/
|
||||
public class SentinelWebMvcConfigurer implements WebMvcConfigurer {
|
||||
|
||||
private static final Logger log = LoggerFactory
|
||||
.getLogger(SentinelWebMvcConfigurer.class);
|
||||
|
||||
@Autowired
|
||||
private SentinelProperties sentinelProperties;
|
||||
|
||||
@Autowired
|
||||
private Optional<SentinelWebInterceptor> sentinelWebInterceptorOptional;
|
||||
|
||||
@Override
|
||||
public void addInterceptors(InterceptorRegistry registry) {
|
||||
if (!sentinelWebInterceptorOptional.isPresent()) {
|
||||
return;
|
||||
}
|
||||
SentinelProperties.Filter filterConfig = sentinelProperties.getFilter();
|
||||
registry.addInterceptor(sentinelWebInterceptorOptional.get())
|
||||
.order(filterConfig.getOrder())
|
||||
.addPathPatterns(filterConfig.getUrlPatterns());
|
||||
log.info(
|
||||
"[Sentinel Starter] register SentinelWebInterceptor with urlPatterns: {}.",
|
||||
filterConfig.getUrlPatterns());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.sentinel.feign;
|
||||
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
|
||||
/**
|
||||
* Record FeignClientFactoryBean to threadlocal, so that SentinelFeign can get it when
|
||||
* creating SentinelInvocationHandler.
|
||||
*
|
||||
* @see com.alibaba.cloud.sentinel.feign.SentinelFeign.Builder
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
@Aspect
|
||||
public class SentinelTargeterAspect {
|
||||
|
||||
private static final ThreadLocal<Object> FEIGN_CLIENT_FACTORY_BEAN = new ThreadLocal<>();
|
||||
|
||||
public static Object getFeignClientFactoryBean() {
|
||||
return FEIGN_CLIENT_FACTORY_BEAN.get();
|
||||
}
|
||||
|
||||
@Around("execution(* org.springframework.cloud.openfeign.Targeter.target(..))")
|
||||
public Object process(ProceedingJoinPoint pjp) throws Throwable {
|
||||
Object factory = pjp.getArgs()[0];
|
||||
try {
|
||||
FEIGN_CLIENT_FACTORY_BEAN.set(factory);
|
||||
return pjp.proceed();
|
||||
}
|
||||
finally {
|
||||
FEIGN_CLIENT_FACTORY_BEAN.remove();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.sidecar;
|
||||
|
||||
/**
|
||||
* @author yuhuangbin
|
||||
*/
|
||||
public interface CustomHealthCheckHandler {
|
||||
|
||||
void handler(String applicationName, SidecarInstanceInfo sidecarInstanceInfo);
|
||||
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.metadata;
|
||||
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.dubbo.common.logger.Logger;
|
||||
import org.apache.dubbo.common.logger.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
|
||||
import static com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
/**
|
||||
* Copy from org.apache.dubbo.metadata.RevisionResolver.
|
||||
*
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public final class RevisionResolver {
|
||||
|
||||
/**
|
||||
* The param key in url.
|
||||
*/
|
||||
public static final String SCA_REVSION_KEY = "sca_revision";
|
||||
|
||||
private static final String EMPTY_REVISION = "0";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RevisionResolver.class);
|
||||
|
||||
private static final char[] hexDigits = { '0', '1', '2', '3', '4', '5', '6', '7', '8',
|
||||
'9', 'A', 'B', 'C', 'D', 'E', 'F' };
|
||||
|
||||
private static MessageDigest mdInst;
|
||||
|
||||
static {
|
||||
try {
|
||||
mdInst = MessageDigest.getInstance("MD5");
|
||||
}
|
||||
catch (NoSuchAlgorithmException e) {
|
||||
logger.error("Failed to calculate metadata revision", e);
|
||||
}
|
||||
}
|
||||
|
||||
private RevisionResolver() {
|
||||
|
||||
}
|
||||
|
||||
public static String getEmptyRevision() {
|
||||
return EMPTY_REVISION;
|
||||
}
|
||||
|
||||
public static String calRevision(String metadata) {
|
||||
mdInst.update(metadata.getBytes(UTF_8));
|
||||
byte[] md5 = mdInst.digest();
|
||||
|
||||
int j = md5.length;
|
||||
char[] str = new char[j * 2];
|
||||
int k = 0;
|
||||
for (byte byte0 : md5) {
|
||||
str[k++] = hexDigits[byte0 >>> 4 & 0xf];
|
||||
str[k++] = hexDigits[byte0 & 0xf];
|
||||
}
|
||||
return new String(str);
|
||||
}
|
||||
|
||||
public static String getRevision(ServiceInstance instance) {
|
||||
Map<String, String> metadata = instance.getMetadata();
|
||||
String revision = metadata.get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME);
|
||||
|
||||
if (revision == null) {
|
||||
revision = RevisionResolver.getEmptyRevision();
|
||||
}
|
||||
return revision;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,232 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.metadata;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.common.compiler.support.ClassUtils;
|
||||
import org.apache.dubbo.common.utils.StringUtils;
|
||||
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPARATOR;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
|
||||
|
||||
/**
|
||||
* Copy from org.apache.dubbo.metadata.MetadataInfo.ServiceInfo.
|
||||
*
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class ServiceInfo implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -258557978718735302L;
|
||||
|
||||
private String name;
|
||||
|
||||
private String group;
|
||||
|
||||
private String version;
|
||||
|
||||
private String protocol;
|
||||
|
||||
private String path; // most of the time, path is the same with the interface name.
|
||||
|
||||
private Map<String, String> params;
|
||||
|
||||
// params configured on consumer side,
|
||||
private transient Map<String, String> consumerParams;
|
||||
|
||||
// service + group + version
|
||||
private transient String serviceKey;
|
||||
|
||||
// service + group + version + protocol
|
||||
private transient String matchKey;
|
||||
|
||||
private transient URL url;
|
||||
|
||||
private static final Set<String> IGNORE_KEYS = new HashSet<>();
|
||||
static {
|
||||
IGNORE_KEYS.add(TIMESTAMP_KEY);
|
||||
IGNORE_KEYS.add(PID_KEY);
|
||||
IGNORE_KEYS.add(INTERFACE_KEY);
|
||||
IGNORE_KEYS.add(METHODS_KEY);
|
||||
}
|
||||
|
||||
public ServiceInfo(URL url) {
|
||||
this(url.getServiceInterface(), url.getParameter(GROUP_KEY),
|
||||
url.getParameter(VERSION_KEY), url.getProtocol(), url.getPath(), null);
|
||||
|
||||
this.url = url;
|
||||
Map<String, String> params = new TreeMap<>();
|
||||
url.getParameters().forEach((k, v) -> {
|
||||
if (IGNORE_KEYS.contains(k)) {
|
||||
return;
|
||||
}
|
||||
params.put(k, v);
|
||||
});
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
public ServiceInfo(String name, String group, String version, String protocol,
|
||||
String path, Map<String, String> params) {
|
||||
this.name = name;
|
||||
this.group = group;
|
||||
this.version = version;
|
||||
this.protocol = protocol;
|
||||
this.path = path;
|
||||
this.params = params == null ? new HashMap<>() : params;
|
||||
|
||||
this.serviceKey = URL.buildKey(name, group, version);
|
||||
this.matchKey = buildMatchKey();
|
||||
}
|
||||
|
||||
public String getMatchKey() {
|
||||
if (matchKey != null) {
|
||||
return matchKey;
|
||||
}
|
||||
buildMatchKey();
|
||||
return matchKey;
|
||||
}
|
||||
|
||||
private String buildMatchKey() {
|
||||
matchKey = getServiceKey();
|
||||
if (StringUtils.isNotEmpty(protocol)) {
|
||||
matchKey = getServiceKey() + GROUP_CHAR_SEPARATOR + protocol;
|
||||
}
|
||||
return matchKey;
|
||||
}
|
||||
|
||||
public String getServiceKey() {
|
||||
if (serviceKey != null) {
|
||||
return serviceKey;
|
||||
}
|
||||
this.serviceKey = URL.buildKey(name, group, version);
|
||||
return serviceKey;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getGroup() {
|
||||
return group;
|
||||
}
|
||||
|
||||
public void setGroup(String group) {
|
||||
this.group = group;
|
||||
}
|
||||
|
||||
public String getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public void setVersion(String version) {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public void setPath(String path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public Map<String, String> getParams() {
|
||||
if (params == null) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
public void setParams(Map<String, String> params) {
|
||||
this.params = params;
|
||||
}
|
||||
|
||||
public String getParameter(String key) {
|
||||
if (consumerParams != null) {
|
||||
String value = consumerParams.get(key);
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return params.get(key);
|
||||
}
|
||||
|
||||
public String toDescString() {
|
||||
return this.getMatchKey() + getMethodSignaturesString() + getParams();
|
||||
}
|
||||
|
||||
private String getMethodSignaturesString() {
|
||||
SortedSet<String> methodStrings = new TreeSet<>();
|
||||
|
||||
Method[] methods = ClassUtils.forName(name).getMethods();
|
||||
for (Method method : methods) {
|
||||
methodStrings.add(method.toString());
|
||||
}
|
||||
return methodStrings.toString();
|
||||
}
|
||||
|
||||
public URL getUrl() {
|
||||
return url;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (!(obj instanceof ServiceInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ServiceInfo serviceInfo = (ServiceInfo) obj;
|
||||
return this.getMatchKey().equals(serviceInfo.getMatchKey())
|
||||
&& this.getParams().equals(serviceInfo.getParams());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(getMatchKey(), getParams());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "service{" + "name='" + name + "'," + "group='" + group + "',"
|
||||
+ "version='" + version + "'," + "protocol='" + protocol + "',"
|
||||
+ "params=" + params + "," + "consumerParams=" + consumerParams + "}";
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.registry.NotifyListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.dubbo.common.URLBuilder.from;
|
||||
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
|
||||
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
|
||||
import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public abstract class AbstractServiceSubscribeHandler {
|
||||
|
||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
protected final URL url;
|
||||
|
||||
protected final NotifyListener listener;
|
||||
|
||||
protected final DubboCloudRegistry registry;
|
||||
|
||||
public AbstractServiceSubscribeHandler(URL url, NotifyListener listener,
|
||||
DubboCloudRegistry registry) {
|
||||
this.url = url;
|
||||
this.listener = listener;
|
||||
this.registry = registry;
|
||||
}
|
||||
|
||||
protected void notifyAllSubscribedURLs(URL url, List<URL> subscribedURLs,
|
||||
NotifyListener listener) {
|
||||
|
||||
if (isEmpty(subscribedURLs)) {
|
||||
// Add the EMPTY_PROTOCOL URL
|
||||
listener.notify(Collections.singletonList(emptyURL(url)));
|
||||
// if (isDubboMetadataServiceURL(url)) {
|
||||
// if meta service change, and serviceInstances is zero, will clean up
|
||||
// information about this client
|
||||
// String serviceName = url.getParameter(GROUP_KEY);
|
||||
// repository.removeMetadataAndInitializedService(serviceName, url);
|
||||
// }
|
||||
}
|
||||
else {
|
||||
// Notify all
|
||||
listener.notify(subscribedURLs);
|
||||
}
|
||||
}
|
||||
|
||||
private URL emptyURL(URL url) {
|
||||
// issue : When the last service provider is closed, the client still periodically
|
||||
// connects to the last provider.n
|
||||
// fix https://github.com/alibaba/spring-cloud-alibaba/issues/1259
|
||||
return from(url).setProtocol(EMPTY_PROTOCOL).removeParameter(CATEGORY_KEY)
|
||||
.build();
|
||||
}
|
||||
|
||||
private final AtomicBoolean inited = new AtomicBoolean(false);
|
||||
|
||||
public void init() {
|
||||
if (inited.compareAndSet(false, true)) {
|
||||
doInit();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doInit();
|
||||
|
||||
}
|
@ -0,0 +1,280 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import com.alibaba.cloud.dubbo.metadata.RevisionResolver;
|
||||
import com.alibaba.cloud.dubbo.metadata.repository.DubboServiceMetadataRepository;
|
||||
import com.alibaba.cloud.dubbo.service.DubboMetadataService;
|
||||
import com.alibaba.cloud.dubbo.service.DubboMetadataServiceProxy;
|
||||
import com.alibaba.cloud.dubbo.util.JSONUtils;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.common.URLBuilder;
|
||||
import org.apache.dubbo.registry.NotifyListener;
|
||||
import org.apache.dubbo.rpc.RpcContext;
|
||||
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
|
||||
import static com.alibaba.cloud.dubbo.metadata.RevisionResolver.SCA_REVSION_KEY;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.apache.dubbo.common.URLBuilder.from;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class GenearalServiceSubscribeHandler extends AbstractServiceSubscribeHandler {
|
||||
|
||||
/**
|
||||
* the provider which can provide service of the url. {appName, [revisions]}
|
||||
*/
|
||||
private final Map<String, Set<String>> providers = new HashMap<>();
|
||||
|
||||
private final Map<String, URL> urlTemplateMap = new HashMap<>();
|
||||
|
||||
private final JSONUtils jsonUtils;
|
||||
|
||||
private final DubboServiceMetadataRepository repository;
|
||||
|
||||
private final DubboMetadataServiceProxy dubboMetadataConfigServiceProxy;
|
||||
|
||||
public GenearalServiceSubscribeHandler(URL url, NotifyListener listener,
|
||||
DubboCloudRegistry registry, DubboServiceMetadataRepository repository,
|
||||
JSONUtils jsonUtils,
|
||||
DubboMetadataServiceProxy dubboMetadataConfigServiceProxy) {
|
||||
super(url, listener, registry);
|
||||
this.repository = repository;
|
||||
this.jsonUtils = jsonUtils;
|
||||
this.dubboMetadataConfigServiceProxy = dubboMetadataConfigServiceProxy;
|
||||
}
|
||||
|
||||
public boolean relatedWith(String appName, String revision) {
|
||||
Set<String> list = providers.get(appName);
|
||||
if (list != null && list.size() > 0) {
|
||||
if (list.contains(revision)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void removeAppNameWithRevision(String appName, String revision) {
|
||||
Set<String> list = providers.get(appName);
|
||||
if (list != null) {
|
||||
list.remove(revision);
|
||||
if (list.size() == 0) {
|
||||
providers.remove(appName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void addAppNameWithRevision(String appName, String revision) {
|
||||
Set<String> set = providers.computeIfAbsent(appName, k -> new HashSet<>());
|
||||
set.add(revision);
|
||||
}
|
||||
|
||||
public synchronized void doInit() {
|
||||
logger.debug("Subscription interface {}, GenearalServiceSubscribeHandler init",
|
||||
url.getServiceKey());
|
||||
Map<String, Map<String, List<ServiceInstance>>> map = registry
|
||||
.getServiceRevisionInstanceMap();
|
||||
for (Map.Entry<String, Map<String, List<ServiceInstance>>> entry : map
|
||||
.entrySet()) {
|
||||
String appName = entry.getKey();
|
||||
Map<String, List<ServiceInstance>> revisionMap = entry.getValue();
|
||||
|
||||
for (Map.Entry<String, List<ServiceInstance>> revisionEntity : revisionMap
|
||||
.entrySet()) {
|
||||
String revision = revisionEntity.getKey();
|
||||
List<ServiceInstance> instances = revisionEntity.getValue();
|
||||
init(appName, revision, instances);
|
||||
}
|
||||
}
|
||||
refresh();
|
||||
}
|
||||
|
||||
public void init(String appName, String revision,
|
||||
List<ServiceInstance> instanceList) {
|
||||
List<URL> urls = getTemplateExportedURLs(url, revision, instanceList);
|
||||
if (urls != null && urls.size() > 0) {
|
||||
addAppNameWithRevision(appName, revision);
|
||||
setUrlTemplate(appName, revision, urls);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void refresh() {
|
||||
List<URL> urls = getProviderURLs();
|
||||
notifyAllSubscribedURLs(url, urls, listener);
|
||||
}
|
||||
|
||||
private List<URL> getProviderURLs() {
|
||||
List<ServiceInstance> instances = registry.getServiceInstances(providers);
|
||||
|
||||
logger.debug("Subscription interfece {}, providers {}, total {}",
|
||||
url.getServiceKey(), providers, instances.size());
|
||||
|
||||
if (instances.size() == 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return cloneExportedURLs(instances);
|
||||
}
|
||||
|
||||
void setUrlTemplate(String appName, String revision, List<URL> urls) {
|
||||
if (urls == null || urls.size() == 0) {
|
||||
return;
|
||||
}
|
||||
String key = getAppRevisionKey(appName, revision);
|
||||
if (urlTemplateMap.containsKey(key)) {
|
||||
return;
|
||||
}
|
||||
urlTemplateMap.put(key, urls.get(0));
|
||||
}
|
||||
|
||||
private String getAppRevisionKey(String appName, String revision) {
|
||||
return appName + "@" + revision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone the subscribed URLs based on the template URLs.
|
||||
* @param serviceInstances the list of
|
||||
* {@link org.springframework.cloud.client.ServiceInstance service instances}
|
||||
* @return
|
||||
*/
|
||||
List<URL> cloneExportedURLs(List<ServiceInstance> serviceInstances) {
|
||||
|
||||
List<URL> urlsCloneTo = new ArrayList<>();
|
||||
serviceInstances.forEach(serviceInstance -> {
|
||||
|
||||
String host = serviceInstance.getHost();
|
||||
String appName = serviceInstance.getServiceId();
|
||||
String revision = RevisionResolver.getRevision(serviceInstance);
|
||||
|
||||
URL template = urlTemplateMap.get(getAppRevisionKey(appName, revision));
|
||||
|
||||
Stream.of(template)
|
||||
.map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
|
||||
.map(templateURL -> templateURL.removeParameter(PID_KEY))
|
||||
.map(templateURL -> {
|
||||
String protocol = templateURL.getProtocol();
|
||||
Integer port = repository.getDubboProtocolPort(serviceInstance,
|
||||
protocol);
|
||||
|
||||
// reserve tag
|
||||
String tag = null;
|
||||
List<URL> urls = jsonUtils.toURLs(serviceInstance.getMetadata()
|
||||
.get("dubbo.metadata-service.urls"));
|
||||
if (urls != null && urls.size() > 0) {
|
||||
Map<String, String> parameters = urls.get(0).getParameters();
|
||||
tag = parameters.get("dubbo.tag");
|
||||
}
|
||||
|
||||
if (Objects.equals(templateURL.getHost(), host)
|
||||
&& Objects.equals(templateURL.getPort(), port)) { // use
|
||||
// templateURL
|
||||
// if
|
||||
// equals
|
||||
return templateURL;
|
||||
}
|
||||
|
||||
if (port == null) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"The protocol[{}] port of Dubbo service instance[host : {}] "
|
||||
+ "can't be resolved",
|
||||
protocol, host);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
URLBuilder clonedURLBuilder = from(templateURL) // remove the
|
||||
// parameters from
|
||||
// the template
|
||||
// URL
|
||||
.setHost(host) // reset the host
|
||||
.setPort(port) // reset the port
|
||||
.addParameter("dubbo.tag", tag); // reset the tag
|
||||
|
||||
return clonedURLBuilder.build();
|
||||
}
|
||||
|
||||
}).filter(Objects::nonNull).forEach(urlsCloneTo::add);
|
||||
});
|
||||
return urlsCloneTo;
|
||||
}
|
||||
|
||||
private List<URL> getTemplateExportedURLs(URL subscribedURL, String revision,
|
||||
List<ServiceInstance> serviceInstances) {
|
||||
|
||||
DubboMetadataService dubboMetadataService = getProxy(serviceInstances);
|
||||
|
||||
List<URL> templateExportedURLs = emptyList();
|
||||
|
||||
if (dubboMetadataService != null) {
|
||||
templateExportedURLs = getExportedURLs(dubboMetadataService, revision,
|
||||
subscribedURL);
|
||||
}
|
||||
else {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(
|
||||
"The metadata of Dubbo service[key : {}] still can't be found, it could effect the further "
|
||||
+ "Dubbo service invocation",
|
||||
subscribedURL.getServiceKey());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return templateExportedURLs;
|
||||
}
|
||||
|
||||
private DubboMetadataService getProxy(List<ServiceInstance> serviceInstances) {
|
||||
return dubboMetadataConfigServiceProxy.getProxy(serviceInstances);
|
||||
}
|
||||
|
||||
private List<URL> getExportedURLs(DubboMetadataService dubboMetadataService,
|
||||
String revision, URL subscribedURL) {
|
||||
String serviceInterface = subscribedURL.getServiceInterface();
|
||||
String group = subscribedURL.getParameter(GROUP_KEY);
|
||||
String version = subscribedURL.getParameter(VERSION_KEY);
|
||||
|
||||
RpcContext.getContext().setAttachment(SCA_REVSION_KEY, revision);
|
||||
String exportedURLsJSON = dubboMetadataService.getExportedURLs(serviceInterface,
|
||||
group, version);
|
||||
|
||||
// The subscribed protocol may be null
|
||||
String subscribedProtocol = subscribedURL.getParameter(PROTOCOL_KEY);
|
||||
return jsonUtils.toURLs(exportedURLsJSON).stream()
|
||||
.filter(exportedURL -> subscribedProtocol == null
|
||||
|| subscribedProtocol.equalsIgnoreCase(exportedURL.getProtocol()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.cloud.dubbo.util.DubboMetadataUtils;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.registry.NotifyListener;
|
||||
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class MetadataServiceSubscribeHandler extends AbstractServiceSubscribeHandler {
|
||||
|
||||
private final String appName;
|
||||
|
||||
private final DubboMetadataUtils dubboMetadataUtils;
|
||||
|
||||
public MetadataServiceSubscribeHandler(String appName, URL url,
|
||||
NotifyListener listener, DubboCloudRegistry registry,
|
||||
DubboMetadataUtils dubboMetadataUtils) {
|
||||
super(url, listener, registry);
|
||||
this.appName = appName;
|
||||
this.dubboMetadataUtils = dubboMetadataUtils;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit() {
|
||||
logger.debug("Subscription app {} MetadataService handler init", appName);
|
||||
List<ServiceInstance> serviceInstances = registry.getServiceInstances(appName);
|
||||
subscribeDubboMetadataServiceURLs(url, listener, serviceInstances);
|
||||
}
|
||||
|
||||
public void refresh(List<ServiceInstance> serviceInstances) {
|
||||
logger.debug("Subscription app {}, instance changed, new size = {}", appName,
|
||||
serviceInstances.size());
|
||||
subscribeDubboMetadataServiceURLs(url, listener, serviceInstances);
|
||||
}
|
||||
|
||||
private void subscribeDubboMetadataServiceURLs(URL subscribedURL,
|
||||
NotifyListener listener, List<ServiceInstance> serviceInstances) {
|
||||
|
||||
logger.debug("Subscription app {}, service instance changed to size {}", appName,
|
||||
serviceInstances.size());
|
||||
|
||||
String serviceInterface = subscribedURL.getServiceInterface();
|
||||
String version = subscribedURL.getParameter(VERSION_KEY);
|
||||
String protocol = subscribedURL.getParameter(PROTOCOL_KEY);
|
||||
|
||||
List<URL> urls = dubboMetadataUtils.getDubboMetadataServiceURLs(serviceInstances,
|
||||
serviceInterface, version, protocol);
|
||||
|
||||
notifyAllSubscribedURLs(subscribedURL, urls, listener);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.alibaba.cloud.dubbo.env.DubboCloudProperties;
|
||||
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class ReSubscribeManager {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(ReSubscribeManager.class);
|
||||
|
||||
private final Map<String, ReSubscribeMetadataJob> reConnectJobMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final ScheduledThreadPoolExecutor reConnectPool = new ScheduledThreadPoolExecutor(
|
||||
5);
|
||||
|
||||
private final DubboCloudRegistry registry;
|
||||
|
||||
private final DubboCloudProperties properties;
|
||||
|
||||
public ReSubscribeManager(DubboCloudRegistry registry) {
|
||||
this.registry = registry;
|
||||
this.properties = registry.getBean(DubboCloudProperties.class);
|
||||
|
||||
reConnectPool.setKeepAliveTime(10, TimeUnit.MINUTES);
|
||||
reConnectPool.allowCoreThreadTimeOut(true);
|
||||
}
|
||||
|
||||
public void onRefreshSuccess(ServiceInstancesChangedEvent event) {
|
||||
reConnectJobMap.remove(event.getServiceName());
|
||||
}
|
||||
|
||||
public void onRefreshFail(ServiceInstancesChangedEvent event) {
|
||||
String serviceName = event.getServiceName();
|
||||
|
||||
int count = 1;
|
||||
|
||||
if (event instanceof FakeServiceInstancesChangedEvent) {
|
||||
count = ((FakeServiceInstancesChangedEvent) event).getCount() + 1;
|
||||
}
|
||||
|
||||
if (count >= properties.getMaxReSubscribeMetadataTimes()) {
|
||||
logger.error(
|
||||
"reSubscribe failed too many times, serviceName = {}, count = {}",
|
||||
serviceName, count);
|
||||
return;
|
||||
}
|
||||
|
||||
ReSubscribeMetadataJob job = new ReSubscribeMetadataJob(serviceName, count);
|
||||
reConnectPool.schedule(job, properties.getReSubscribeMetadataIntervial(),
|
||||
TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private final class ReSubscribeMetadataJob implements Runnable {
|
||||
|
||||
private final String serviceName;
|
||||
|
||||
private final int errorCounts;
|
||||
|
||||
private ReSubscribeMetadataJob(String serviceName, int errorCounts) {
|
||||
this.errorCounts = errorCounts;
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (!reConnectJobMap.containsKey(serviceName)
|
||||
|| reConnectJobMap.get(serviceName) != this) {
|
||||
return;
|
||||
}
|
||||
List<ServiceInstance> list = registry.getServiceInstances(serviceName);
|
||||
FakeServiceInstancesChangedEvent event = new FakeServiceInstancesChangedEvent(
|
||||
serviceName, list, errorCounts);
|
||||
registry.onApplicationEvent(event);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final class FakeServiceInstancesChangedEvent
|
||||
extends ServiceInstancesChangedEvent {
|
||||
|
||||
private static final long serialVersionUID = -2832478604601472915L;
|
||||
|
||||
private final int count;
|
||||
|
||||
private FakeServiceInstancesChangedEvent(String serviceName,
|
||||
List<ServiceInstance> serviceInstances, int count) {
|
||||
super(serviceName, serviceInstances);
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.registry;
|
||||
|
||||
import com.alibaba.cloud.dubbo.registry.event.ServiceInstancesChangedEvent;
|
||||
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.core.Ordered;
|
||||
|
||||
/**
|
||||
* The interface of ServiceInstanceChange event Listener.
|
||||
*
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
* @see ServiceInstancesChangedEvent
|
||||
* @see Ordered
|
||||
* @see ApplicationListener
|
||||
*/
|
||||
public interface ServiceInstanceChangeListener
|
||||
extends ApplicationListener<ServiceInstancesChangedEvent>, Ordered {
|
||||
|
||||
}
|
@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.dubbo.service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.cloud.commons.lang.StringUtils;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.rpc.Invocation;
|
||||
import org.apache.dubbo.rpc.Invoker;
|
||||
import org.apache.dubbo.rpc.RpcException;
|
||||
import org.apache.dubbo.rpc.cluster.Router;
|
||||
import org.apache.dubbo.rpc.cluster.RouterFactory;
|
||||
import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
|
||||
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import static com.alibaba.cloud.dubbo.metadata.RevisionResolver.SCA_REVSION_KEY;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
|
||||
*/
|
||||
public class MetadataServiceRevisionRouterFactory implements RouterFactory {
|
||||
|
||||
@Override
|
||||
public Router getRouter(URL url) {
|
||||
return new AbstractRouter() {
|
||||
@Override
|
||||
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url,
|
||||
Invocation invocation) throws RpcException {
|
||||
if (CollectionUtils.isEmpty(invokers)) {
|
||||
return invokers;
|
||||
}
|
||||
|
||||
if (!DubboMetadataService.class.getName()
|
||||
.equalsIgnoreCase(url.getServiceInterface())) {
|
||||
return invokers;
|
||||
}
|
||||
|
||||
String revision = invocation.getAttachment(SCA_REVSION_KEY);
|
||||
|
||||
if (StringUtils.isEmpty(revision)) {
|
||||
return invokers;
|
||||
}
|
||||
|
||||
List<Invoker<T>> list = new ArrayList<>(invokers.size());
|
||||
|
||||
for (Invoker<T> invoker : invokers) {
|
||||
if (StringUtils.equals(revision,
|
||||
invoker.getUrl().getParameter(SCA_REVSION_KEY))) {
|
||||
list.add(invoker);
|
||||
}
|
||||
}
|
||||
|
||||
return list;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1 @@
|
||||
revisionRouter=com.alibaba.cloud.dubbo.service.MetadataServiceRevisionRouterFactory
|
@ -1,51 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
* @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a>
|
||||
*/
|
||||
public final class RocketMQBinderConstants {
|
||||
|
||||
/**
|
||||
* Header key for RocketMQ Transactional Args.
|
||||
*/
|
||||
public static final String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
|
||||
|
||||
/**
|
||||
* Default NameServer value.
|
||||
*/
|
||||
public static final String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
|
||||
|
||||
/**
|
||||
* Default group for SCS RocketMQ Binder.
|
||||
*/
|
||||
public static final String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
|
||||
|
||||
/**
|
||||
* RocketMQ re-consume times.
|
||||
*/
|
||||
public static final String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
|
||||
|
||||
private RocketMQBinderConstants() {
|
||||
throw new AssertionError("Must not instantiate constant utility class");
|
||||
}
|
||||
|
||||
}
|
@ -1,89 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public final class RocketMQBinderUtils {
|
||||
|
||||
private RocketMQBinderUtils() {
|
||||
|
||||
}
|
||||
|
||||
public static RocketMQBinderConfigurationProperties mergeProperties(
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||
RocketMQProperties rocketMQProperties) {
|
||||
RocketMQBinderConfigurationProperties result = new RocketMQBinderConfigurationProperties();
|
||||
if (StringUtils.isEmpty(rocketMQProperties.getNameServer())) {
|
||||
result.setNameServer(rocketBinderConfigurationProperties.getNameServer());
|
||||
}
|
||||
else {
|
||||
result.setNameServer(
|
||||
Arrays.asList(rocketMQProperties.getNameServer().split(";")));
|
||||
}
|
||||
if (rocketMQProperties.getProducer() == null
|
||||
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) {
|
||||
result.setAccessKey(rocketBinderConfigurationProperties.getAccessKey());
|
||||
}
|
||||
else {
|
||||
result.setAccessKey(rocketMQProperties.getProducer().getAccessKey());
|
||||
}
|
||||
if (rocketMQProperties.getProducer() == null
|
||||
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getSecretKey())) {
|
||||
result.setSecretKey(rocketBinderConfigurationProperties.getSecretKey());
|
||||
}
|
||||
else {
|
||||
result.setSecretKey(rocketMQProperties.getProducer().getSecretKey());
|
||||
}
|
||||
if (rocketMQProperties.getProducer() == null || StringUtils
|
||||
.isEmpty(rocketMQProperties.getProducer().getCustomizedTraceTopic())) {
|
||||
result.setCustomizedTraceTopic(
|
||||
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
|
||||
}
|
||||
else {
|
||||
result.setCustomizedTraceTopic(
|
||||
rocketMQProperties.getProducer().getCustomizedTraceTopic());
|
||||
}
|
||||
if (rocketMQProperties.getProducer() != null
|
||||
&& rocketMQProperties.getProducer().isEnableMsgTrace()) {
|
||||
result.setEnableMsgTrace(Boolean.TRUE);
|
||||
}
|
||||
else {
|
||||
result.setEnableMsgTrace(
|
||||
rocketBinderConfigurationProperties.isEnableMsgTrace());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public static String getNameServerStr(List<String> nameServerList) {
|
||||
if (CollectionUtils.isEmpty(nameServerList)) {
|
||||
return null;
|
||||
}
|
||||
return String.join(";", nameServerList);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQConfigBeanPostProcessor;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
|
||||
import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.converter.CompositeMessageConverter;
|
||||
|
||||
@Configuration
|
||||
public class ExtendedBindingHandlerMappingsProviderConfiguration {
|
||||
|
||||
@Bean
|
||||
public MappingsProvider rocketExtendedPropertiesDefaultMappingsProvider() {
|
||||
return () -> {
|
||||
Map<ConfigurationPropertyName, ConfigurationPropertyName> mappings = new HashMap<>();
|
||||
mappings.put(
|
||||
ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.bindings"),
|
||||
ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.default"));
|
||||
mappings.put(
|
||||
ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.streams"),
|
||||
ConfigurationPropertyName
|
||||
.of("spring.cloud.stream.rocketmq.streams.default"));
|
||||
return mappings;
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RocketMQConfigBeanPostProcessor rocketMQConfigBeanPostProcessor() {
|
||||
return new RocketMQConfigBeanPostProcessor();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* if you want to customize a bean, please use this BeanName {@code RocketMQMessageConverter.DEFAULT_NAME}.
|
||||
* @return
|
||||
*/
|
||||
@Bean(RocketMQMessageConverter.DEFAULT_NAME)
|
||||
@ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
|
||||
public CompositeMessageConverter rocketMQMessageConverter() {
|
||||
return new RocketMQMessageConverter().getMessageConverter();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681 .
|
||||
*
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@EnableConfigurationProperties({ RocketMQExtendedBindingProperties.class,
|
||||
RocketMQBinderConfigurationProperties.class })
|
||||
public class RocketMQBinderAutoConfiguration {
|
||||
|
||||
@Autowired
|
||||
private RocketMQExtendedBindingProperties extendedBindingProperties;
|
||||
|
||||
@Autowired
|
||||
private RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
@Bean
|
||||
@ConditionalOnEnabledHealthIndicator("rocketmq")
|
||||
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
|
||||
public RocketMQBinderHealthIndicator rocketMQBinderHealthIndicator() {
|
||||
return new RocketMQBinderHealthIndicator();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RocketMQTopicProvisioner rocketMQTopicProvisioner() {
|
||||
return new RocketMQTopicProvisioner();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RocketMQMessageChannelBinder rocketMQMessageChannelBinder(
|
||||
RocketMQTopicProvisioner provisioningProvider) {
|
||||
return new RocketMQMessageChannelBinder(rocketBinderConfigurationProperties,
|
||||
extendedBindingProperties, provisioningProvider);
|
||||
}
|
||||
|
||||
}
|
@ -1,81 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
|
||||
/**
|
||||
* @author Timur Valiev
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@Import({ RocketMQAutoConfiguration.class,
|
||||
RocketMQBinderHealthIndicatorAutoConfiguration.class })
|
||||
@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
|
||||
RocketMQExtendedBindingProperties.class })
|
||||
public class RocketMQBinderAutoConfiguration {
|
||||
|
||||
private final RocketMQExtendedBindingProperties extendedBindingProperties;
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
@Autowired(required = false)
|
||||
private RocketMQProperties rocketMQProperties = new RocketMQProperties();
|
||||
|
||||
@Autowired
|
||||
public RocketMQBinderAutoConfiguration(
|
||||
RocketMQExtendedBindingProperties extendedBindingProperties,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
|
||||
this.extendedBindingProperties = extendedBindingProperties;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RocketMQTopicProvisioner provisioningProvider() {
|
||||
return new RocketMQTopicProvisioner();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RocketMQMessageChannelBinder rocketMessageChannelBinder(
|
||||
RocketMQTopicProvisioner provisioningProvider,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
|
||||
provisioningProvider, extendedBindingProperties,
|
||||
rocketBinderConfigurationProperties, rocketMQProperties,
|
||||
instrumentationManager);
|
||||
binder.setExtendedBindingProperties(extendedBindingProperties);
|
||||
return binder;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public InstrumentationManager instrumentationManager() {
|
||||
return new InstrumentationManager();
|
||||
}
|
||||
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
|
||||
|
||||
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ConditionalOnClass(Endpoint.class)
|
||||
public class RocketMQBinderHealthIndicatorAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnEnabledHealthIndicator("rocketmq")
|
||||
public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() {
|
||||
return new RocketMQBinderHealthIndicator();
|
||||
}
|
||||
|
||||
}
|
@ -1,102 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.config;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
|
||||
import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
|
||||
import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
|
||||
import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@AutoConfigureAfter(RocketMQAutoConfiguration.class)
|
||||
@ConditionalOnMissingBean(DefaultMQProducer.class)
|
||||
public class RocketMQComponent4BinderAutoConfiguration {
|
||||
|
||||
private final Environment environment;
|
||||
|
||||
public RocketMQComponent4BinderAutoConfiguration(Environment environment) {
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(DefaultMQProducer.class)
|
||||
public DefaultMQProducer defaultMQProducer() {
|
||||
DefaultMQProducer producer;
|
||||
String configNameServer = environment.resolveRequiredPlaceholders(
|
||||
"${spring.cloud.stream.rocketmq.binder.name-server:${rocketmq.producer.name-server:}}");
|
||||
String ak = environment.resolveRequiredPlaceholders(
|
||||
"${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}");
|
||||
String sk = environment.resolveRequiredPlaceholders(
|
||||
"${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}");
|
||||
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
|
||||
producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP,
|
||||
new AclClientRPCHook(new SessionCredentials(ak, sk)));
|
||||
producer.setVipChannelEnabled(false);
|
||||
}
|
||||
else {
|
||||
producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP);
|
||||
}
|
||||
if (StringUtils.isEmpty(configNameServer)) {
|
||||
configNameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
|
||||
}
|
||||
producer.setNamesrvAddr(configNameServer);
|
||||
return producer;
|
||||
}
|
||||
|
||||
@Bean(destroyMethod = "destroy")
|
||||
@ConditionalOnMissingBean
|
||||
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
|
||||
ObjectMapper objectMapper) {
|
||||
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
|
||||
rocketMQTemplate.setProducer(mqProducer);
|
||||
rocketMQTemplate.setObjectMapper(objectMapper);
|
||||
return rocketMQTemplate;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnBean(RocketMQTemplate.class)
|
||||
@ConditionalOnMissingBean(TransactionHandlerRegistry.class)
|
||||
public TransactionHandlerRegistry transactionHandlerRegistry(
|
||||
RocketMQTemplate template) {
|
||||
return new TransactionHandlerRegistry(template);
|
||||
}
|
||||
|
||||
@Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME)
|
||||
@ConditionalOnBean(TransactionHandlerRegistry.class)
|
||||
public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(
|
||||
TransactionHandlerRegistry transactionHandlerRegistry) {
|
||||
return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,100 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.constant;
|
||||
|
||||
import org.apache.rocketmq.common.message.MessageConst;
|
||||
|
||||
/**
|
||||
* @author zkzlx
|
||||
*/
|
||||
public class RocketMQConst extends MessageConst {
|
||||
|
||||
/**
|
||||
* Default NameServer value.
|
||||
*/
|
||||
public static final String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
|
||||
|
||||
/**
|
||||
* Default group for SCS RocketMQ Binder.
|
||||
*/
|
||||
public static final String DEFAULT_GROUP = "binder_default_group_name";
|
||||
|
||||
/**
|
||||
* user args for SCS RocketMQ Binder.
|
||||
*/
|
||||
public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS";
|
||||
|
||||
/**
|
||||
* It is mainly provided for conversion between rocketMq-message and Spring-message,
|
||||
* and parameters are passed through HEADERS.
|
||||
*/
|
||||
public static class Headers {
|
||||
|
||||
/**
|
||||
* keys for SCS RocketMQ Headers.
|
||||
*/
|
||||
public static final String KEYS = MessageConst.PROPERTY_KEYS;
|
||||
|
||||
/**
|
||||
* tags for SCS RocketMQ Headers.
|
||||
*/
|
||||
public static final String TAGS = MessageConst.PROPERTY_TAGS;
|
||||
|
||||
/**
|
||||
* topic for SCS RocketMQ Headers.
|
||||
*/
|
||||
public static final String TOPIC = "MQ_TOPIC";
|
||||
|
||||
/**
|
||||
* The ID of the message.
|
||||
*/
|
||||
public static final String MESSAGE_ID = "MQ_MESSAGE_ID";
|
||||
|
||||
/**
|
||||
* The timestamp that the message producer invokes the message sending API.
|
||||
*/
|
||||
public static final String BORN_TIMESTAMP = "MQ_BORN_TIMESTAMP";
|
||||
|
||||
/**
|
||||
* The IP and port number of the message producer.
|
||||
*/
|
||||
public static final String BORN_HOST = "MQ_BORN_HOST";
|
||||
|
||||
/**
|
||||
* Message flag, MQ is not processed and is available for use by applications.
|
||||
*/
|
||||
public static final String FLAG = "MQ_FLAG";
|
||||
|
||||
/**
|
||||
* Message consumption queue ID.
|
||||
*/
|
||||
public static final String QUEUE_ID = "MQ_QUEUE_ID";
|
||||
|
||||
/**
|
||||
* Message system Flag, such as whether or not to compress, whether or not to
|
||||
* transactional messages.
|
||||
*/
|
||||
public static final String SYS_FLAG = "MQ_SYS_FLAG";
|
||||
|
||||
/**
|
||||
* The transaction ID of the transaction message.
|
||||
*/
|
||||
public static final String TRANSACTION_ID = "MQ_TRANSACTION_ID";
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,465 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.consuming;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
|
||||
import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
||||
import org.apache.rocketmq.acl.common.SessionCredentials;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.UtilAll;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.apache.rocketmq.remoting.RPCHook;
|
||||
import org.apache.rocketmq.spring.annotation.ConsumeMode;
|
||||
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||
import org.apache.rocketmq.spring.annotation.SelectorType;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
|
||||
import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
|
||||
import org.apache.rocketmq.spring.support.RocketMQUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
|
||||
|
||||
/**
|
||||
* A class that Listen on rocketmq message.
|
||||
* <p>
|
||||
* this class will delegate {@link RocketMQListener} to handle message
|
||||
*
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
* @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a>
|
||||
* @see RocketMQListener
|
||||
*/
|
||||
public class RocketMQListenerBindingContainer
|
||||
implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
|
||||
|
||||
private final static Logger log = LoggerFactory
|
||||
.getLogger(RocketMQListenerBindingContainer.class);
|
||||
|
||||
private long suspendCurrentQueueTimeMillis = 1000;
|
||||
|
||||
/**
|
||||
* Message consume retry strategy<br>
|
||||
* -1,no retry,put into DLQ directly<br>
|
||||
* 0,broker control retry frequency<br>
|
||||
* >0,client control retry frequency.
|
||||
*/
|
||||
private int delayLevelWhenNextConsume = 0;
|
||||
|
||||
private List<String> nameServer;
|
||||
|
||||
private String consumerGroup;
|
||||
|
||||
private String topic;
|
||||
|
||||
private int consumeThreadMax = 64;
|
||||
|
||||
private String charset = "UTF-8";
|
||||
|
||||
private RocketMQListener rocketMQListener;
|
||||
|
||||
private RocketMQHeaderMapper headerMapper;
|
||||
|
||||
private DefaultMQPushConsumer consumer;
|
||||
|
||||
private boolean running;
|
||||
|
||||
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
|
||||
|
||||
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
|
||||
|
||||
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
|
||||
|
||||
// The following properties came from RocketMQConsumerProperties.
|
||||
private ConsumeMode consumeMode;
|
||||
|
||||
private SelectorType selectorType;
|
||||
|
||||
private String selectorExpression;
|
||||
|
||||
private MessageModel messageModel;
|
||||
|
||||
public RocketMQListenerBindingContainer(
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
|
||||
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
|
||||
RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
|
||||
this.rocketMQConsumerProperties = rocketMQConsumerProperties;
|
||||
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
|
||||
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
|
||||
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
|
||||
? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
|
||||
if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
|
||||
this.selectorType = SelectorType.TAG;
|
||||
this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
|
||||
}
|
||||
else {
|
||||
this.selectorType = SelectorType.SQL92;
|
||||
this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
|
||||
}
|
||||
this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
|
||||
? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setupMessageListener(RocketMQListener<?> rocketMQListener) {
|
||||
this.rocketMQListener = rocketMQListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
this.setRunning(false);
|
||||
if (Objects.nonNull(consumer)) {
|
||||
consumer.shutdown();
|
||||
}
|
||||
log.info("container destroyed, {}", this.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
initRocketMQPushConsumer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
stop();
|
||||
callback.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
if (this.isRunning()) {
|
||||
throw new IllegalStateException(
|
||||
"container already running. " + this.toString());
|
||||
}
|
||||
|
||||
try {
|
||||
consumer.start();
|
||||
}
|
||||
catch (MQClientException e) {
|
||||
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
|
||||
}
|
||||
this.setRunning(true);
|
||||
|
||||
log.info("running container: {}", this.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (this.isRunning()) {
|
||||
if (Objects.nonNull(consumer)) {
|
||||
consumer.shutdown();
|
||||
}
|
||||
setRunning(false);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
private void setRunning(boolean running) {
|
||||
this.running = running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
private void initRocketMQPushConsumer() throws MQClientException {
|
||||
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
|
||||
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
|
||||
Assert.notNull(nameServer, "Property 'nameServer' is required");
|
||||
Assert.notNull(topic, "Property 'topic' is required");
|
||||
|
||||
String ak = rocketBinderConfigurationProperties.getAccessKey();
|
||||
String sk = rocketBinderConfigurationProperties.getSecretKey();
|
||||
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
|
||||
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
|
||||
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
|
||||
new AllocateMessageQueueAveragely(),
|
||||
rocketBinderConfigurationProperties.isEnableMsgTrace(),
|
||||
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
|
||||
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
|
||||
topic + "|" + UtilAll.getPid()));
|
||||
consumer.setVipChannelEnabled(false);
|
||||
}
|
||||
else {
|
||||
consumer = new DefaultMQPushConsumer(consumerGroup,
|
||||
rocketBinderConfigurationProperties.isEnableMsgTrace(),
|
||||
rocketBinderConfigurationProperties.getCustomizedTraceTopic());
|
||||
}
|
||||
|
||||
consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
|
||||
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
|
||||
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
|
||||
|
||||
switch (messageModel) {
|
||||
case BROADCASTING:
|
||||
consumer.setMessageModel(
|
||||
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
|
||||
break;
|
||||
case CLUSTERING:
|
||||
consumer.setMessageModel(
|
||||
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
|
||||
}
|
||||
|
||||
switch (selectorType) {
|
||||
case TAG:
|
||||
consumer.subscribe(topic, selectorExpression);
|
||||
break;
|
||||
case SQL92:
|
||||
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
|
||||
}
|
||||
|
||||
switch (consumeMode) {
|
||||
case ORDERLY:
|
||||
consumer.setMessageListener(new DefaultMessageListenerOrderly());
|
||||
break;
|
||||
case CONCURRENTLY:
|
||||
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
|
||||
}
|
||||
|
||||
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
|
||||
((RocketMQPushConsumerLifecycleListener) rocketMQListener)
|
||||
.prepareStart(consumer);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
|
||||
+ '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
|
||||
+ ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
|
||||
+ ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
|
||||
+ messageModel + '}';
|
||||
}
|
||||
|
||||
public long getSuspendCurrentQueueTimeMillis() {
|
||||
return suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
|
||||
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
|
||||
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
|
||||
}
|
||||
|
||||
public int getDelayLevelWhenNextConsume() {
|
||||
return delayLevelWhenNextConsume;
|
||||
}
|
||||
|
||||
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
|
||||
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
|
||||
}
|
||||
|
||||
public List<String> getNameServer() {
|
||||
return nameServer;
|
||||
}
|
||||
|
||||
public void setNameServer(List<String> nameServer) {
|
||||
this.nameServer = nameServer;
|
||||
}
|
||||
|
||||
public String getConsumerGroup() {
|
||||
return consumerGroup;
|
||||
}
|
||||
|
||||
public void setConsumerGroup(String consumerGroup) {
|
||||
this.consumerGroup = consumerGroup;
|
||||
}
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public int getConsumeThreadMax() {
|
||||
return consumeThreadMax;
|
||||
}
|
||||
|
||||
public void setConsumeThreadMax(int consumeThreadMax) {
|
||||
this.consumeThreadMax = consumeThreadMax;
|
||||
}
|
||||
|
||||
public String getCharset() {
|
||||
return charset;
|
||||
}
|
||||
|
||||
public void setCharset(String charset) {
|
||||
this.charset = charset;
|
||||
}
|
||||
|
||||
public RocketMQListener getRocketMQListener() {
|
||||
return rocketMQListener;
|
||||
}
|
||||
|
||||
public void setRocketMQListener(RocketMQListener rocketMQListener) {
|
||||
this.rocketMQListener = rocketMQListener;
|
||||
}
|
||||
|
||||
public DefaultMQPushConsumer getConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
public void setConsumer(DefaultMQPushConsumer consumer) {
|
||||
this.consumer = consumer;
|
||||
}
|
||||
|
||||
public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() {
|
||||
return rocketMQConsumerProperties;
|
||||
}
|
||||
|
||||
public ConsumeMode getConsumeMode() {
|
||||
return consumeMode;
|
||||
}
|
||||
|
||||
public SelectorType getSelectorType() {
|
||||
return selectorType;
|
||||
}
|
||||
|
||||
public String getSelectorExpression() {
|
||||
return selectorExpression;
|
||||
}
|
||||
|
||||
public MessageModel getMessageModel() {
|
||||
return messageModel;
|
||||
}
|
||||
|
||||
public RocketMQHeaderMapper getHeaderMapper() {
|
||||
return headerMapper;
|
||||
}
|
||||
|
||||
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
|
||||
this.headerMapper = headerMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert rocketmq {@link MessageExt} to Spring {@link Message}.
|
||||
* @param messageExt the rocketmq message
|
||||
* @return the converted Spring {@link Message}
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Message convertToSpringMessage(MessageExt messageExt) {
|
||||
|
||||
// add reconsume-times header to messageExt
|
||||
int reconsumeTimes = messageExt.getReconsumeTimes();
|
||||
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
|
||||
String.valueOf(reconsumeTimes));
|
||||
Message message = RocketMQUtil.convertToSpringMessage(messageExt);
|
||||
return MessageBuilder.fromMessage(message)
|
||||
.copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerConcurrently
|
||||
implements MessageListenerConcurrently {
|
||||
|
||||
@SuppressWarnings({ "unchecked", "Duplicates" })
|
||||
@Override
|
||||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeConcurrentlyContext context) {
|
||||
for (MessageExt messageExt : msgs) {
|
||||
log.debug("received msg: {}", messageExt);
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
rocketMQListener.onMessage(convertToSpringMessage(messageExt));
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.debug("consume {} message key:[{}] cost: {} ms",
|
||||
messageExt.getMsgId(), messageExt.getKeys(), costTime);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("consume message failed. messageExt:{}", messageExt, e);
|
||||
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
}
|
||||
}
|
||||
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
|
||||
|
||||
@SuppressWarnings({ "unchecked", "Duplicates" })
|
||||
@Override
|
||||
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
|
||||
ConsumeOrderlyContext context) {
|
||||
for (MessageExt messageExt : msgs) {
|
||||
log.debug("received msg: {}", messageExt);
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
rocketMQListener.onMessage(convertToSpringMessage(messageExt));
|
||||
long costTime = System.currentTimeMillis() - now;
|
||||
log.info("consume {} message key:[{}] cost: {} ms",
|
||||
messageExt.getMsgId(), messageExt.getKeys(), costTime);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn("consume message failed. messageExt:{}", messageExt, e);
|
||||
context.setSuspendCurrentQueueTimeMillis(
|
||||
suspendCurrentQueueTimeMillis);
|
||||
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
|
||||
}
|
||||
}
|
||||
|
||||
return ConsumeOrderlyStatus.SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,62 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.consuming;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.rocketmq.common.message.MessageQueue;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQMessageQueueChooser {
|
||||
|
||||
private volatile int queueIndex = 0;
|
||||
|
||||
private volatile List<MessageQueue> messageQueues;
|
||||
|
||||
public MessageQueue choose() {
|
||||
return messageQueues.get(queueIndex);
|
||||
}
|
||||
|
||||
public int requeue() {
|
||||
if (queueIndex - 1 < 0) {
|
||||
this.queueIndex = messageQueues.size() - 1;
|
||||
}
|
||||
else {
|
||||
this.queueIndex = this.queueIndex - 1;
|
||||
}
|
||||
return this.queueIndex;
|
||||
}
|
||||
|
||||
public void increment() {
|
||||
this.queueIndex = (this.queueIndex + 1) % messageQueues.size();
|
||||
}
|
||||
|
||||
public void reset(Set<MessageQueue> queueSet) {
|
||||
this.messageQueues = null;
|
||||
this.messageQueues = new ArrayList<>(queueSet);
|
||||
this.queueIndex = 0;
|
||||
}
|
||||
|
||||
public List<MessageQueue> getMessageQueues() {
|
||||
return messageQueues;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.convert;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.messaging.converter.ByteArrayMessageConverter;
|
||||
import org.springframework.messaging.converter.CompositeMessageConverter;
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.converter.StringMessageConverter;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME} .
|
||||
*
|
||||
* @author zkzlx
|
||||
*/
|
||||
public class RocketMQMessageConverter {
|
||||
|
||||
/**
|
||||
* if you want to customize a bean, please use the BeanName.
|
||||
*/
|
||||
public static final String DEFAULT_NAME = "com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter";
|
||||
|
||||
private static final boolean JACKSON_PRESENT;
|
||||
|
||||
private static final boolean FASTJSON_PRESENT;
|
||||
|
||||
static {
|
||||
ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
|
||||
JACKSON_PRESENT = ClassUtils
|
||||
.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader)
|
||||
&& ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator",
|
||||
classLoader);
|
||||
FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader)
|
||||
&& ClassUtils.isPresent(
|
||||
"com.alibaba.fastjson.support.config.FastJsonConfig",
|
||||
classLoader);
|
||||
}
|
||||
|
||||
private CompositeMessageConverter messageConverter;
|
||||
|
||||
public RocketMQMessageConverter() {
|
||||
List<MessageConverter> messageConverters = new ArrayList<>();
|
||||
ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
|
||||
byteArrayMessageConverter.setContentTypeResolver(null);
|
||||
messageConverters.add(byteArrayMessageConverter);
|
||||
messageConverters.add(new StringMessageConverter());
|
||||
if (JACKSON_PRESENT) {
|
||||
messageConverters.add(new MappingJackson2MessageConverter());
|
||||
}
|
||||
if (FASTJSON_PRESENT) {
|
||||
try {
|
||||
messageConverters.add((MessageConverter) ClassUtils.forName(
|
||||
"com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
|
||||
ClassUtils.getDefaultClassLoader()).newInstance());
|
||||
}
|
||||
catch (ClassNotFoundException | IllegalAccessException
|
||||
| InstantiationException ignored) {
|
||||
// ignore this exception
|
||||
}
|
||||
}
|
||||
messageConverter = new CompositeMessageConverter(messageConverters);
|
||||
}
|
||||
|
||||
public CompositeMessageConverter getMessageConverter() {
|
||||
return messageConverter;
|
||||
}
|
||||
|
||||
public void setMessageConverter(CompositeMessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.custom;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
|
||||
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListener;
|
||||
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
|
||||
import org.apache.rocketmq.client.hook.SendMessageHook;
|
||||
import org.apache.rocketmq.client.producer.MessageQueueSelector;
|
||||
import org.apache.rocketmq.client.producer.SendCallback;
|
||||
import org.apache.rocketmq.client.producer.TransactionListener;
|
||||
|
||||
import org.springframework.messaging.converter.CompositeMessageConverter;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Gets the beans configured in the configuration file.
|
||||
*
|
||||
* @author junboXiang
|
||||
*/
|
||||
public final class RocketMQBeanContainerCache {
|
||||
|
||||
private RocketMQBeanContainerCache() {
|
||||
}
|
||||
|
||||
private static final Class<?>[] CLASSES = new Class[] {
|
||||
CompositeMessageConverter.class, AllocateMessageQueueStrategy.class,
|
||||
MessageQueueSelector.class, MessageListener.class, TransactionListener.class,
|
||||
SendCallback.class, CheckForbiddenHook.class, SendMessageHook.class,
|
||||
ErrorAcknowledgeHandler.class };
|
||||
|
||||
private static final Map<String, Object> BEANS_CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
static void putBean(String beanName, Object beanObj) {
|
||||
BEANS_CACHE.put(beanName, beanObj);
|
||||
}
|
||||
|
||||
static Class<?>[] getClassAry() {
|
||||
return CLASSES;
|
||||
}
|
||||
|
||||
public static <T> T getBean(String beanName, Class<T> clazz) {
|
||||
return getBean(beanName, clazz, null);
|
||||
}
|
||||
|
||||
public static <T> T getBean(String beanName, Class<T> clazz, T defaultObj) {
|
||||
if (StringUtils.isEmpty(beanName)) {
|
||||
return defaultObj;
|
||||
}
|
||||
Object obj = BEANS_CACHE.get(beanName);
|
||||
if (null == obj) {
|
||||
return defaultObj;
|
||||
}
|
||||
if (clazz.isAssignableFrom(obj.getClass())) {
|
||||
return (T) obj;
|
||||
}
|
||||
return defaultObj;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.custom;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
|
||||
/**
|
||||
* find RocketMQ bean by annotations.
|
||||
*
|
||||
* @author junboXiang
|
||||
*
|
||||
*/
|
||||
public class RocketMQConfigBeanPostProcessor implements BeanPostProcessor {
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName)
|
||||
throws BeansException {
|
||||
Stream.of(RocketMQBeanContainerCache.getClassAry()).forEach(clazz -> {
|
||||
if (clazz.isAssignableFrom(bean.getClass())) {
|
||||
RocketMQBeanContainerCache.putBean(beanName, bean);
|
||||
}
|
||||
});
|
||||
return bean;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.extend;
|
||||
|
||||
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
/**
|
||||
* @author zkzlx
|
||||
*/
|
||||
public interface ErrorAcknowledgeHandler {
|
||||
|
||||
/**
|
||||
* Ack state handling, including receive, reject, and retry, when a consumption
|
||||
* exception occurs.
|
||||
* @param message message
|
||||
* @return see {@link Status}
|
||||
*/
|
||||
Status handler(Message<?> message);
|
||||
|
||||
}
|
@ -1,176 +0,0 @@
|
||||
/*
|
||||
* Copyright 2013-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.cloud.stream.binder.rocketmq.integration;
|
||||
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
|
||||
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.integration.endpoint.MessageProducerSupport;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.retry.RecoveryCallback;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryListener;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
|
||||
*/
|
||||
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
|
||||
|
||||
private static final Logger log = LoggerFactory
|
||||
.getLogger(RocketMQInboundChannelAdapter.class);
|
||||
|
||||
private RetryTemplate retryTemplate;
|
||||
|
||||
private RecoveryCallback<? extends Object> recoveryCallback;
|
||||
|
||||
private RocketMQListenerBindingContainer rocketMQListenerContainer;
|
||||
|
||||
private final ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties;
|
||||
|
||||
private final InstrumentationManager instrumentationManager;
|
||||
|
||||
public RocketMQInboundChannelAdapter(
|
||||
RocketMQListenerBindingContainer rocketMQListenerContainer,
|
||||
ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties,
|
||||
InstrumentationManager instrumentationManager) {
|
||||
this.rocketMQListenerContainer = rocketMQListenerContainer;
|
||||
this.consumerProperties = consumerProperties;
|
||||
this.instrumentationManager = instrumentationManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onInit() {
|
||||
if (consumerProperties == null
|
||||
|| !consumerProperties.getExtension().getEnabled()) {
|
||||
return;
|
||||
}
|
||||
super.onInit();
|
||||
if (this.retryTemplate != null) {
|
||||
Assert.state(getErrorChannel() == null,
|
||||
"Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
|
||||
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
|
||||
+ "send an error message when retries are exhausted");
|
||||
}
|
||||
|
||||
BindingRocketMQListener listener = new BindingRocketMQListener();
|
||||
rocketMQListenerContainer.setRocketMQListener(listener);
|
||||
|
||||
if (retryTemplate != null) {
|
||||
this.retryTemplate.registerListener(listener);
|
||||
}
|
||||
|
||||
try {
|
||||
rocketMQListenerContainer.afterPropertiesSet();
|
||||
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
|
||||
throw new IllegalArgumentException(
|
||||
"rocketMQListenerContainer init error: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
instrumentationManager.addHealthInstrumentation(
|
||||
new Instrumentation(rocketMQListenerContainer.getTopic()
|
||||
+ rocketMQListenerContainer.getConsumerGroup()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
if (consumerProperties == null
|
||||
|| !consumerProperties.getExtension().getEnabled()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
rocketMQListenerContainer.start();
|
||||
instrumentationManager
|
||||
.getHealthInstrumentation(rocketMQListenerContainer.getTopic()
|
||||
+ rocketMQListenerContainer.getConsumerGroup())
|
||||
.markStartedSuccessfully();
|
||||
}
|
||||
catch (Exception e) {
|
||||
instrumentationManager
|
||||
.getHealthInstrumentation(rocketMQListenerContainer.getTopic()
|
||||
+ rocketMQListenerContainer.getConsumerGroup())
|
||||
.markStartFailed(e);
|
||||
log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
|
||||
throw new MessagingException(MessageBuilder.withPayload(
|
||||
"RocketMQTemplate startup failed, Caused by " + e.getMessage())
|
||||
.build(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
rocketMQListenerContainer.stop();
|
||||
}
|
||||
|
||||
public void setRetryTemplate(RetryTemplate retryTemplate) {
|
||||
this.retryTemplate = retryTemplate;
|
||||
}
|
||||
|
||||
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
|
||||
this.recoveryCallback = recoveryCallback;
|
||||
}
|
||||
|
||||
protected class BindingRocketMQListener
|
||||
implements RocketMQListener<Message>, RetryListener {
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
|
||||
if (enableRetry) {
|
||||
RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> {
|
||||
RocketMQInboundChannelAdapter.this.sendMessage(message);
|
||||
return null;
|
||||
}, (RecoveryCallback<Object>) RocketMQInboundChannelAdapter.this.recoveryCallback);
|
||||
}
|
||||
else {
|
||||
RocketMQInboundChannelAdapter.this.sendMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> boolean open(RetryContext context,
|
||||
RetryCallback<T, E> callback) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> void close(RetryContext context,
|
||||
RetryCallback<T, E> callback, Throwable throwable) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, E extends Throwable> void onError(RetryContext context,
|
||||
RetryCallback<T, E> callback, Throwable throwable) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue