Merge 2.2.x to 2022.

pull/2400/head
Freeman Lau 3 years ago
parent 42f03eb1c8
commit 3451906c2b

@ -90,7 +90,6 @@
<curator.version>4.0.1</curator.version>
<!-- Apache RocketMQ -->
<rocketmq.starter.version>2.0.4</rocketmq.starter.version>
<rocketmq.version>4.6.1</rocketmq.version>
<!-- Maven Plugin Versions -->
@ -180,11 +179,6 @@
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.rocketmq</groupId>-->
<!-- <artifactId>rocketmq-spring-boot-starter</artifactId>-->
<!-- <version>${rocketmq.starter.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>

@ -222,7 +222,7 @@ spring.cloud.sentinel.datasource.ds1.file.rule-type=flow
#spring.cloud.sentinel.datasource.ds1.file.converter-class=org.springframework.cloud.alibaba.cloud.examples.JsonFlowRuleListConverter
#spring.cloud.sentinel.datasource.ds1.file.rule-type=flow
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=localhost:8848
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=127.0.0.1:8848
spring.cloud.sentinel.datasource.ds2.nacos.data-id=sentinel
spring.cloud.sentinel.datasource.ds2.nacos.group-id=DEFAULT_GROUP
spring.cloud.sentinel.datasource.ds2.nacos.data-type=json

@ -89,7 +89,7 @@ spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:

@ -225,7 +225,7 @@ spring.cloud.sentinel.datasource.ds1.file.rule-type=flow
#spring.cloud.sentinel.datasource.ds1.file.converter-class=JsonFlowRuleListConverter
#spring.cloud.sentinel.datasource.ds1.file.rule-type=flow
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=localhost:8848
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=127.0.0.1:8848
spring.cloud.sentinel.datasource.ds2.nacos.data-id=sentinel
spring.cloud.sentinel.datasource.ds2.nacos.group-id=DEFAULT_GROUP
spring.cloud.sentinel.datasource.ds2.nacos.data-type=json

@ -24,12 +24,6 @@
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

@ -46,7 +46,7 @@ spring:
# 2.4.0 新增配置 spring.config.import
config:
import:
- optional:nacos:localhost:8848
- optional:nacos:127.0.0.1:8848
```
3. 在 nacos 创建 test.yml

@ -7,9 +7,12 @@ spring:
nacos:
config:
group: DEFAULT_GROUP
server-addr: localhost:8848
server-addr: 127.0.0.1:8848
config:
import:
- optional:nacos:test.yml
- optional:nacos:test01.yml?group=group_02
- optional:nacos:test02.yml?group=group_03&refreshEnabled=false
logging:
level:
com.alibaba.nacos: debug

@ -77,9 +77,10 @@ public class ProviderApplication {
@GetMapping("/divide")
public String divide(@RequestParam Integer a, @RequestParam Integer b) {
if(b == 0) {
if (b == 0) {
return String.valueOf(0);
} else {
}
else {
return String.valueOf(a / b);
}
}

@ -1,11 +1,12 @@
server.port=18082
spring.application.name=service-provider
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
spring.cloud.nacos.discovery.enabled=true
#spring.cloud.nacos.discovery.instance-enabled=true
spring.cloud.nacos.username=nacos
spring.cloud.nacos.password=nacos
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
server.port=18082
spring.application.name=service-provider
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
spring.cloud.nacos.discovery.enabled=true
#spring.cloud.nacos.discovery.instance-enabled=true
spring.cloud.nacos.username=nacos
spring.cloud.nacos.password=nacos
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

@ -6,7 +6,7 @@ spring:
username: nacos
password: nacos
discovery:
server-addr: localhost:8848
server-addr: 127.0.0.1:8848
config:
discovery:
enabled: true

@ -9,7 +9,7 @@ spring:
username: nacos
password: nacos
discovery:
server-addr: localhost:8848
server-addr: 127.0.0.1:8848
config:
server:
git:

@ -1,6 +1,6 @@
spring.application.name=account-service
server.port=18084
spring.cloud.nacos.discovery.server-addr=localhost:8848
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
spring.datasource.name="accountDataSource"
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource

@ -1,6 +1,6 @@
server.port=18081
spring.application.name=business-service
spring.cloud.nacos.discovery.server-addr=localhost:8848
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
# The following configuration can be omitted.
#feign.hystrix.enabled=true

@ -1,6 +1,6 @@
spring.application.name=order-service
server.port=18083
spring.cloud.nacos.discovery.server-addr=localhost:8848
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
spring.datasource.name="orderDataSource"
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource

@ -1,6 +1,6 @@
spring.application.name=storage-service
server.port=18082
spring.cloud.nacos.discovery.server-addr=localhost:8848
spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
spring.datasource.name="storageDataSource"
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource

@ -6,6 +6,6 @@ spring:
cloud:
nacos:
config:
server-addr: localhost:8848
server-addr: 127.0.0.1:8848
name: sentinel-circuitbreaker-rules.yml
file-extension: yml

@ -206,7 +206,7 @@ Sentinel starter 整合了目前存在的几类 ReadableDataSource。只需要
spring.cloud.sentinel.datasource.ds1.file.file=classpath: degraderule.json
spring.cloud.sentinel.datasource.ds1.file.data-type=json
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=localhost:8848
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=127.0.0.1:8848
spring.cloud.sentinel.datasource.ds2.nacos.dataId=sentinel
spring.cloud.sentinel.datasource.ds2.nacos.groupId=DEFAULT_GROUP
spring.cloud.sentinel.datasource.ds2.nacos.data-type=json

@ -181,7 +181,7 @@ If you want to define `FileRefreshableDataSource` and `NacosDataSource`, see the
spring.cloud.sentinel.datasource.ds1.file.file=classpath: degraderule.json
spring.cloud.sentinel.datasource.ds1.file.data-type=json
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=localhost:8848
spring.cloud.sentinel.datasource.ds2.nacos.server-addr=127.0.0.1:8848
spring.cloud.sentinel.datasource.ds2.nacos.dataId=sentinel
spring.cloud.sentinel.datasource.ds2.nacos.groupId=DEFAULT_GROUP
spring.cloud.sentinel.datasource.ds2.nacos.data-type=json

@ -61,7 +61,7 @@ public class NacosDataSourceProperties extends AbstractDataSourceProperties {
if (StringUtils.isEmpty(serverAddr)) {
serverAddr = this.getEnv().getProperty(
"spring.cloud.sentinel.datasource.nacos.server-addr",
"localhost:8848");
"127.0.0.1:8848");
}
}

@ -37,7 +37,7 @@ public class NacosDataSourceFactoryBeanTests {
private String groupId = "DEFAULT_GROUP";
private String serverAddr = "localhost:8848";
private String serverAddr = "127.0.0.1:8848";
private String contextPath = "/my-nacos";

@ -44,15 +44,15 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import static com.alibaba.cloud.circuitbreaker.sentinel.ReactiveSentinelCircuitBreakerIntegrationTest.Application;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* @author Ryan Baxter
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT,
classes = ReactiveSentinelCircuitBreakerIntegrationTest.Application.class,
properties = { "spring.cloud.discovery.client.health-indicator.enabled=false" })
@SpringBootTest(webEnvironment = RANDOM_PORT, classes = Application.class, properties = {
"spring.cloud.discovery.client.health-indicator.enabled=false" })
@DirtiesContext
public class ReactiveSentinelCircuitBreakerIntegrationTest {
@ -60,7 +60,7 @@ public class ReactiveSentinelCircuitBreakerIntegrationTest {
private int port = 0;
@Autowired
private ReactiveSentinelCircuitBreakerIntegrationTest.Application.DemoControllerService service;
private Application.DemoControllerService service;
@Before
public void setup() {

@ -107,7 +107,7 @@ public class NacosConfigProperties {
.resolvePlaceholders("${spring.cloud.nacos.config.server-addr:}");
if (StringUtils.isEmpty(serverAddr)) {
serverAddr = environment.resolvePlaceholders(
"${spring.cloud.nacos.server-addr:localhost:8848}");
"${spring.cloud.nacos.server-addr:127.0.0.1:8848}");
}
this.setServerAddr(serverAddr);
}
@ -442,19 +442,19 @@ public class NacosConfigProperties {
* @return string
*/
@Deprecated
@DeprecatedConfigurationProperty(
reason = "replaced to NacosConfigProperties#sharedConfigs and not use it at the same time.",
replacement = PREFIX + ".shared-configs[x]")
@DeprecatedConfigurationProperty(reason = "replaced to NacosConfigProperties#sharedConfigs and not use it at the same time.", replacement = PREFIX
+ ".shared-configs[x]")
public String getSharedDataids() {
return null == getSharedConfigs() ? null : getSharedConfigs().stream()
.map(Config::getDataId).collect(Collectors.joining(COMMAS));
return null == getSharedConfigs() ? null
: getSharedConfigs().stream().map(Config::getDataId)
.collect(Collectors.joining(COMMAS));
}
/**
* recommend to use {@link NacosConfigProperties#sharedConfigs} and not use it at the
* same time .
* @param sharedDataids the dataids for configurable multiple shared configurations ,
* multiple separated by commas .
* multiple separated by commas .
*/
@Deprecated
public void setSharedDataids(String sharedDataids) {
@ -472,9 +472,8 @@ public class NacosConfigProperties {
* @return string
*/
@Deprecated
@DeprecatedConfigurationProperty(
reason = "replaced to NacosConfigProperties#sharedConfigs and not use it at the same time.",
replacement = PREFIX + ".shared-configs[x].refresh")
@DeprecatedConfigurationProperty(reason = "replaced to NacosConfigProperties#sharedConfigs and not use it at the same time.", replacement = PREFIX
+ ".shared-configs[x].refresh")
public String getRefreshableDataids() {
return null == getSharedConfigs() ? null
: getSharedConfigs().stream().filter(Config::isRefresh)
@ -520,9 +519,8 @@ public class NacosConfigProperties {
* @return extensionConfigs
*/
@Deprecated
@DeprecatedConfigurationProperty(
reason = "replaced to NacosConfigProperties#extensionConfigs and not use it at the same time .",
replacement = PREFIX + ".extension-configs[x]")
@DeprecatedConfigurationProperty(reason = "replaced to NacosConfigProperties#extensionConfigs and not use it at the same time .", replacement = PREFIX
+ ".extension-configs[x]")
public List<Config> getExtConfig() {
return this.getExtensionConfigs();
}
@ -578,7 +576,8 @@ public class NacosConfigProperties {
int index = endpoint.indexOf(":");
properties.put(ENDPOINT, endpoint.substring(0, index));
properties.put(ENDPOINT_PORT, endpoint.substring(index + 1));
} else {
}
else {
properties.put(ENDPOINT, endpoint);
}
@ -617,11 +616,10 @@ public class NacosConfigProperties {
+ ", enableRemoteSyncConfig=" + enableRemoteSyncConfig + ", endpoint='"
+ endpoint + '\'' + ", namespace='" + namespace + '\'' + ", accessKey='"
+ accessKey + '\'' + ", secretKey='" + secretKey + '\''
+ ", ramRoleName='" + ramRoleName + '\''
+ ", contextPath='" + contextPath + '\'' + ", clusterName='" + clusterName
+ '\'' + ", name='" + name + '\'' + '\'' + ", shares=" + sharedConfigs
+ ", extensions=" + extensionConfigs + ", refreshEnabled="
+ refreshEnabled + '}';
+ ", ramRoleName='" + ramRoleName + '\'' + ", contextPath='" + contextPath
+ '\'' + ", clusterName='" + clusterName + '\'' + ", name='" + name + '\''
+ '\'' + ", shares=" + sharedConfigs + ", extensions=" + extensionConfigs
+ ", refreshEnabled=" + refreshEnabled + '}';
}
public static class Config {

@ -83,31 +83,35 @@ public class NacosPropertySource extends MapPropertySource {
}
}
Map<String, Object> sourceMap = new LinkedHashMap<>();
List<PropertySource<?>> otherTypePropertySources = new ArrayList<>();
for (PropertySource<?> propertySource : propertySources) {
if (propertySource == null) {
continue;
}
if (propertySource instanceof MapPropertySource) {
// If the Nacos configuration file uses "---" to separate property name,
// propertySources will be multiple documents, and every document is a map.
// see org.springframework.boot.env.YamlPropertySourceLoader#load
MapPropertySource mapPropertySource = (MapPropertySource) propertySource;
Map<String, Object> source = mapPropertySource.getSource();
sourceMap.putAll(source);
} else {
otherTypePropertySources.add(propertySource);
}
}
// Other property sources which is not instanceof MapPropertySource will be put as it is,
// and the internal elements cannot be directly retrieved,
// so the user needs to implement the retrieval logic by himself
if (!otherTypePropertySources.isEmpty()) {
sourceMap.put(String.join(NacosConfigProperties.COMMAS, dataId, group), otherTypePropertySources);
}
return sourceMap;
Map<String, Object> sourceMap = new LinkedHashMap<>();
List<PropertySource<?>> otherTypePropertySources = new ArrayList<>();
for (PropertySource<?> propertySource : propertySources) {
if (propertySource == null) {
continue;
}
if (propertySource instanceof MapPropertySource) {
// If the Nacos configuration file uses "---" to separate property name,
// propertySources will be multiple documents, and every document is a
// map.
// see org.springframework.boot.env.YamlPropertySourceLoader#load
MapPropertySource mapPropertySource = (MapPropertySource) propertySource;
Map<String, Object> source = mapPropertySource.getSource();
sourceMap.putAll(source);
}
else {
otherTypePropertySources.add(propertySource);
}
}
// Other property sources which is not instanceof MapPropertySource will be put as
// it is,
// and the internal elements cannot be directly retrieved,
// so the user needs to implement the retrieval logic by himself
if (!otherTypePropertySources.isEmpty()) {
sourceMap.put(String.join(NacosConfigProperties.COMMAS, dataId, group),
otherTypePropertySources);
}
return sourceMap;
}
public String getGroup() {

@ -3,7 +3,7 @@
{
"name": "spring.cloud.nacos.server-addr",
"type": "java.lang.String",
"defaultValue": "localhost:8848",
"defaultValue": "127.0.0.1:8848",
"description": "nacos server address."
},
{

@ -38,7 +38,7 @@ public class NacosConfigAutoConfigurationTest {
assertThat(BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
NacosConfigProperties.class).length).isEqualTo(1);
assertThat(context.getBean(NacosConfigProperties.class).getServerAddr())
.isEqualTo("localhost:8848");
.isEqualTo("127.0.0.1:8848");
context.close();
}

@ -28,7 +28,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* @author <a href="mailto:lyuzb@lyuzb.com">lyuzb</a>
@ -36,9 +35,9 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen
@RunWith(SpringRunner.class)
@SpringBootTest(
classes = NacosConfigPropertiesServerAddressBothLevelTests.TestConfig.class,
properties = { "spring.cloud.nacos.config.server-addr=321,321,321,321:8848",
"spring.cloud.nacos.server-addr=123.123.123.123:8848" },
webEnvironment = RANDOM_PORT)
properties = { "spring.cloud.nacos.config.server-addr=321.321.321.321:8848",
"spring.cloud.nacos.server-addr=123.123.123.123:8848" }
)
public class NacosConfigPropertiesServerAddressBothLevelTests {
@Autowired
@ -46,7 +45,7 @@ public class NacosConfigPropertiesServerAddressBothLevelTests {
@Test
public void testGetServerAddr() {
assertThat(properties.getServerAddr()).isEqualTo("321,321,321,321:8848");
assertThat(properties.getServerAddr()).isEqualTo("321.321.321.321:8848");
}
@Configuration

@ -555,7 +555,8 @@ public class NacosDiscoveryProperties {
watchDelay, logName, service, weight, clusterName, group,
namingLoadCacheAtStart, metadata, registerEnabled, ip, networkInterface,
port, secure, accessKey, secretKey, heartBeatInterval, heartBeatTimeout,
ipDeleteTimeout, instanceEnabled, ephemeral, failureToleranceEnabled, failFast);
ipDeleteTimeout, instanceEnabled, ephemeral, failureToleranceEnabled,
failFast);
}
@Override
@ -574,7 +575,7 @@ public class NacosDiscoveryProperties {
+ heartBeatInterval + ", heartBeatTimeout=" + heartBeatTimeout
+ ", ipDeleteTimeout=" + ipDeleteTimeout + ", instanceEnabled="
+ instanceEnabled + ", ephemeral=" + ephemeral
+ ", failureToleranceEnabled=" + failureToleranceEnabled + '}';
+ ", failureToleranceEnabled=" + failureToleranceEnabled + '}'
+ ", ipDeleteTimeout=" + ipDeleteTimeout + ", failFast=" + failFast + '}';
}
@ -585,7 +586,7 @@ public class NacosDiscoveryProperties {
.resolvePlaceholders("${spring.cloud.nacos.discovery.server-addr:}");
if (StringUtils.isEmpty(serverAddr)) {
serverAddr = env.resolvePlaceholders(
"${spring.cloud.nacos.server-addr:localhost:8848}");
"${spring.cloud.nacos.server-addr:127.0.0.1:8848}");
}
this.setServerAddr(serverAddr);
}

@ -2,7 +2,7 @@
{
"name": "spring.cloud.nacos.server-addr",
"type": "java.lang.String",
"defaultValue": "localhost:8848",
"defaultValue": "127.0.0.1:8848",
"description": "nacos server address."
},
{

@ -29,18 +29,17 @@ import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationC
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
import static com.alibaba.cloud.nacos.NacosDiscoveryPropertiesServerAddressBothLevelTests.TestConfig;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* @author <a href="mailto:lyuzb@lyuzb.com">lyuzb</a>
*/
@RunWith(SpringRunner.class)
@SpringBootTest(
classes = NacosDiscoveryPropertiesServerAddressBothLevelTests.TestConfig.class,
properties = { "spring.cloud.nacos.discovery.server-addr=321.321.321.321:8848",
"spring.cloud.nacos.server-addr=123.123.123.123:8848" },
webEnvironment = RANDOM_PORT)
@SpringBootTest(classes = TestConfig.class, properties = {
"spring.application.name=app",
"spring.cloud.nacos.discovery.server-addr=321.321.321.321:8848",
"spring.cloud.nacos.server-addr=123.123.123.123:8848" })
public class NacosDiscoveryPropertiesServerAddressBothLevelTests {
@Autowired

@ -29,19 +29,17 @@ import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationC
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
import static com.alibaba.cloud.nacos.NacosDiscoveryPropertiesServerAddressTopLevelTests.TestConfig;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* @author <a href="mailto:lyuzb@lyuzb.com">lyuzb</a>
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(
classes = NacosDiscoveryPropertiesServerAddressTopLevelTests.TestConfig.class,
properties = { "spring.cloud.nacos.server-addr=123.123.123.123:8848" },
webEnvironment = RANDOM_PORT)
@SpringBootTest(classes = TestConfig.class, properties = {
"spring.application.name=app",
"spring.cloud.nacos.server-addr=123.123.123.123:8848" })
public class NacosDiscoveryPropertiesServerAddressTopLevelTests {
@Autowired

@ -1,82 +0,0 @@
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.nacos.ribbon;
import com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration;
import com.netflix.loadbalancer.ConfigurationBasedServerList;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ZoneAwareLoadBalancer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.commons.util.UtilAutoConfiguration;
import org.springframework.cloud.netflix.archaius.ArchaiusAutoConfiguration;
import org.springframework.cloud.netflix.ribbon.RibbonClients;
import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author liujunjie
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = NacosRibbonClientPropertyOverrideTests.TestConfiguration.class,
properties = { "spring.cloud.nacos.server-addr=127.0.0.1:8848",
"spring.cloud.nacos.username=nacos", "spring.cloud.nacos.password=nacos",
"spring.cloud.nacos.discovery.port=18080",
"spring.cloud.nacos.discovery.service=remoteApp",
"localApp.ribbon.NIWSServerListClassName="
+ "com.netflix.loadbalancer.ConfigurationBasedServerList",
"localApp.ribbon.listOfServers=127.0.0.1:19090",
"localApp.ribbon.ServerListRefreshInterval=15000" })
public class NacosRibbonClientPropertyOverrideTests {
@Autowired
private SpringClientFactory factory;
@Test
public void serverListOverridesToTest() {
ConfigurationBasedServerList.class
.cast(getLoadBalancer("localApp").getServerListImpl());
}
@Test
public void serverListRemoteTest() {
NacosServerList.class.cast(getLoadBalancer("remoteApp").getServerListImpl());
}
@SuppressWarnings("unchecked")
private ZoneAwareLoadBalancer<Server> getLoadBalancer(String name) {
return (ZoneAwareLoadBalancer<Server>) this.factory.getLoadBalancer(name);
}
@Configuration
@RibbonClients
@EnableAutoConfiguration
@ImportAutoConfiguration({ UtilAutoConfiguration.class,
PropertyPlaceholderAutoConfiguration.class, ArchaiusAutoConfiguration.class,
RibbonNacosAutoConfiguration.class, NacosDiscoveryClientConfiguration.class })
protected static class TestConfiguration {
}
}

@ -25,8 +25,6 @@ import feign.Contract;
import feign.Feign;
import feign.InvocationHandlerFactory;
import feign.Target;
import feign.hystrix.FallbackFactory;
import feign.hystrix.HystrixFeign;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
@ -81,41 +79,41 @@ public final class SentinelFeign {
@Override
public InvocationHandler create(Target target,
Map<Method, MethodHandler> dispatch) {
// using reflect get fallback and fallbackFactory properties from
// FeignClientFactoryBean because FeignClientFactoryBean is a package
// level class, we can not use it in our package
Object feignClientFactoryBean = SentinelTargeterAspect
.getFeignClientFactoryBean();
if (feignClientFactoryBean != null) {
Class fallback = (Class) getFieldValue(feignClientFactoryBean,
"fallback");
Class fallbackFactory = (Class) getFieldValue(
feignClientFactoryBean, "fallbackFactory");
String beanName = (String) getFieldValue(feignClientFactoryBean,
"contextId");
if (!StringUtils.hasText(beanName)) {
beanName = (String) getFieldValue(feignClientFactoryBean,
"name");
}
Object fallbackInstance;
FallbackFactory fallbackFactoryInstance;
// check fallback and fallbackFactory properties
if (void.class != fallback) {
fallbackInstance = getFromContext(beanName, "fallback",
fallback, target.type());
return new SentinelInvocationHandler(target, dispatch,
new FallbackFactory.Default(fallbackInstance));
}
if (void.class != fallbackFactory) {
fallbackFactoryInstance = (FallbackFactory) getFromContext(
beanName, "fallbackFactory", fallbackFactory,
FallbackFactory.class);
return new SentinelInvocationHandler(target, dispatch,
fallbackFactoryInstance);
}
GenericApplicationContext gctx = (GenericApplicationContext) Builder.this.applicationContext;
BeanDefinition def = gctx.getBeanDefinition(target.type().getName());
/*
* Due to the change of the initialization sequence,
* BeanFactory.getBean will cause a circular dependency. So
* FeignClientFactoryBean can only be obtained from BeanDefinition
*/
FeignClientFactoryBean feignClientFactoryBean = (FeignClientFactoryBean) def
.getAttribute("feignClientsRegistrarFactoryBean");
Class fallback = feignClientFactoryBean.getFallback();
Class fallbackFactory = feignClientFactoryBean.getFallbackFactory();
String beanName = feignClientFactoryBean.getContextId();
if (!StringUtils.hasText(beanName)) {
beanName = (String) getFieldValue(feignClientFactoryBean, "name");
}
Object fallbackInstance;
FallbackFactory fallbackFactoryInstance;
// check fallback and fallbackFactory properties
if (void.class != fallback) {
fallbackInstance = getFromContext(beanName, "fallback", fallback,
target.type());
return new SentinelInvocationHandler(target, dispatch,
new FallbackFactory.Default(fallbackInstance));
}
if (void.class != fallbackFactory) {
fallbackFactoryInstance = (FallbackFactory) getFromContext(
beanName, "fallbackFactory", fallbackFactory,
FallbackFactory.class);
return new SentinelInvocationHandler(target, dispatch,
fallbackFactoryInstance);
}
return new SentinelInvocationHandler(target, dispatch);
}

@ -41,11 +41,4 @@ public class SentinelFeignAutoConfiguration {
return SentinelFeign.builder();
}
@Bean
@ConditionalOnProperty(name = "feign.sentinel.enabled")
@ConditionalOnClass(name = "org.springframework.cloud.openfeign.Targeter")
public SentinelTargeterAspect sentinelTargeterAspect() {
return new SentinelTargeterAspect();
}
}

@ -1,51 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.sentinel.feign;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
/**
* Record FeignClientFactoryBean to threadlocal, so that SentinelFeign can get it when
* creating SentinelInvocationHandler.
*
* @see com.alibaba.cloud.sentinel.feign.SentinelFeign.Builder
* @author <a href="mailto:chenxilzx1@gmail.com">theonefx</a>
*/
@Aspect
public class SentinelTargeterAspect {
private static final ThreadLocal<Object> FEIGN_CLIENT_FACTORY_BEAN = new ThreadLocal<>();
public static Object getFeignClientFactoryBean() {
return FEIGN_CLIENT_FACTORY_BEAN.get();
}
@Around("execution(* org.springframework.cloud.openfeign.Targeter.target(..))")
public Object process(ProceedingJoinPoint pjp) throws Throwable {
Object factory = pjp.getArgs()[0];
try {
FEIGN_CLIENT_FACTORY_BEAN.set(factory);
return pjp.proceed();
}
finally {
FEIGN_CLIENT_FACTORY_BEAN.remove();
}
}
}

@ -80,12 +80,11 @@ import static org.springframework.util.ObjectUtils.isEmpty;
@Configuration(proxyBeanMethods = false)
@Import({ DubboServiceRegistrationEventPublishingAspect.class,
DubboBootstrapStartCommandLineRunner.class })
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter(name = { EUREKA_CLIENT_AUTO_CONFIGURATION_CLASS_NAME,
CONSUL_AUTO_SERVICE_AUTO_CONFIGURATION_CLASS_NAME,
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" },
value = { DubboMetadataAutoConfiguration.class })
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }, value = {
DubboMetadataAutoConfiguration.class })
public class DubboServiceRegistrationAutoConfiguration {
/**
@ -304,13 +303,10 @@ public class DubboServiceRegistrationAutoConfiguration {
return;
}
NewService newService = consulRegistration.getService();
if (consulDiscoveryProperties.isTagsAsMetadata()) {
for (Map.Entry<String, String> entry : serviceMetadata.entrySet()) {
attAsTag(newService.getTags(), entry.getKey(), entry.getValue());
}
}
else {
newService.getMeta().putAll(serviceMetadata);
// properties `tagsAsMetadata` in tagsAsMetadata is deprecated, and default
// value is true.
for (Map.Entry<String, String> entry : serviceMetadata.entrySet()) {
attAsTag(newService.getTags(), entry.getKey(), entry.getValue());
}
}

@ -167,7 +167,6 @@ public class GenearalServiceSubscribeHandler extends AbstractServiceSubscribeHan
* Clone the subscribed URLs based on the template URLs.
* @param serviceInstances the list of
* {@link org.springframework.cloud.client.ServiceInstance service instances}
* @return
*/
List<URL> cloneExportedURLs(List<ServiceInstance> serviceInstances) {

@ -55,7 +55,6 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration {
/**
* if you want to customize a bean, please use this BeanName {@code RocketMQMessageConverter.DEFAULT_NAME}.
* @return
*/
@Bean(RocketMQMessageConverter.DEFAULT_NAME)
@ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })

@ -1,302 +0,0 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq.integration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
private final static Logger log = LoggerFactory
.getLogger(RocketMQMessageHandler.class);
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
private MessageChannel sendFailureChannel;
private final RocketMQTemplate rocketMQTemplate;
private RocketMQHeaderMapper headerMapper;
private final Boolean transactional;
private final String destination;
private final String groupName;
private final InstrumentationManager instrumentationManager;
private boolean sync = false;
private volatile boolean running = false;
private ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
private MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor;
public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination,
String groupName, Boolean transactional,
InstrumentationManager instrumentationManager,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
MessageConverterConfigurer.PartitioningInterceptor partitioningInterceptor) {
this.rocketMQTemplate = rocketMQTemplate;
this.destination = destination;
this.groupName = groupName;
this.transactional = transactional;
this.instrumentationManager = instrumentationManager;
this.producerProperties = producerProperties;
this.partitioningInterceptor = partitioningInterceptor;
}
@Override
public void start() {
if (!transactional) {
instrumentationManager
.addHealthInstrumentation(new Instrumentation(destination));
try {
rocketMQTemplate.afterPropertiesSet();
instrumentationManager.getHealthInstrumentation(destination)
.markStartedSuccessfully();
}
catch (Exception e) {
instrumentationManager.getHealthInstrumentation(destination)
.markStartFailed(e);
log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
throw new MessagingException(MessageBuilder.withPayload(
"RocketMQTemplate startup failed, Caused by " + e.getMessage())
.build(), e);
}
}
if (producerProperties.isPartitioned()) {
try {
List<MessageQueue> messageQueues = rocketMQTemplate.getProducer()
.fetchPublishMessageQueues(destination);
if (producerProperties.getPartitionCount() != messageQueues.size()) {
logger.info(String.format(
"The partition count of topic '%s' will change from '%s' to '%s'",
destination, producerProperties.getPartitionCount(),
messageQueues.size()));
producerProperties.setPartitionCount(messageQueues.size());
partitioningInterceptor
.setPartitionCount(producerProperties.getPartitionCount());
}
}
catch (MQClientException e) {
logger.error(e, "fetch publish message queues fail");
}
}
running = true;
}
@Override
public void stop() {
if (!transactional) {
rocketMQTemplate.destroy();
}
running = false;
}
@Override
public boolean isRunning() {
return running;
}
@Override
protected void handleMessageInternal(
org.springframework.messaging.Message<?> message) {
try {
// issue 737 fix
Map<String, String> jsonHeaders = headerMapper
.fromHeaders(message.getHeaders());
message = org.springframework.messaging.support.MessageBuilder
.fromMessage(message).copyHeaders(jsonHeaders).build();
final StringBuilder topicWithTags = new StringBuilder(destination);
String tags = Optional
.ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("")
.toString();
if (!StringUtils.isEmpty(tags)) {
topicWithTags.append(":").append(tags);
}
SendResult sendRes = null;
if (transactional) {
sendRes = rocketMQTemplate.sendMessageInTransaction(groupName,
topicWithTags.toString(), message, message.getHeaders()
.get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
log.debug("transactional send to topic " + topicWithTags + " " + sendRes);
}
else {
int delayLevel = 0;
try {
Object delayLevelObj = message.getHeaders()
.getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
if (delayLevelObj instanceof Number) {
delayLevel = ((Number) delayLevelObj).intValue();
}
else if (delayLevelObj instanceof String) {
delayLevel = Integer.parseInt((String) delayLevelObj);
}
}
catch (Exception e) {
// ignore
}
boolean needSelectQueue = message.getHeaders()
.containsKey(BinderHeaders.PARTITION_HEADER);
if (sync) {
if (needSelectQueue) {
sendRes = rocketMQTemplate.syncSendOrderly(
topicWithTags.toString(), message, "",
rocketMQTemplate.getProducer().getSendMsgTimeout());
}
else {
sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(),
message,
rocketMQTemplate.getProducer().getSendMsgTimeout(),
delayLevel);
}
log.debug("sync send to topic " + topicWithTags + " " + sendRes);
}
else {
Message<?> finalMessage = message;
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.debug("async send to topic " + topicWithTags + " "
+ sendResult);
}
@Override
public void onException(Throwable e) {
log.error("RocketMQ Message hasn't been sent. Caused by "
+ e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(
RocketMQMessageHandler.this.errorMessageStrategy
.buildErrorMessage(new MessagingException(
finalMessage, e), null));
}
}
};
if (needSelectQueue) {
rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(),
message, "", sendCallback,
rocketMQTemplate.getProducer().getSendMsgTimeout());
}
else {
rocketMQTemplate.asyncSend(topicWithTags.toString(), message,
sendCallback);
}
}
}
if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
if (getSendFailureChannel() != null) {
this.getSendFailureChannel().send(message);
}
else {
throw new MessagingException(message,
new MQClientException("message hasn't been sent", null));
}
}
}
catch (Exception e) {
log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(this.errorMessageStrategy
.buildErrorMessage(new MessagingException(message, e), null));
}
else {
throw new MessagingException(message, e);
}
}
}
/**
* Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
* to this channel with a payload of a {@link MessagingException} with the failed
* message and cause.
* @param sendFailureChannel the failure channel.
* @since 0.2.2
*/
public void setSendFailureChannel(MessageChannel sendFailureChannel) {
this.sendFailureChannel = sendFailureChannel;
}
/**
* Set the error message strategy implementation to use when sending error messages
* after send failures. Cannot be null.
* @param errorMessageStrategy the implementation.
* @since 0.2.2
*/
public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
this.errorMessageStrategy = errorMessageStrategy;
}
public MessageChannel getSendFailureChannel() {
return sendFailureChannel;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public RocketMQHeaderMapper getHeaderMapper() {
return headerMapper;
}
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}
}

@ -125,7 +125,7 @@ public final class RocketMQConsumerFactory {
null == rpcHook && consumerProperties.getVipChannelEnabled());
consumer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
if(null != allocateMessageQueueStrategy) {
if (null != allocateMessageQueueStrategy) {
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueStrategy);
}
consumer.setNamesrvAddr(consumerProperties.getNameServer());

@ -142,9 +142,9 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport
* The actual execution of a user-defined input consumption service method.
* @param messageExtList rocket mq message list
* @param failSupplier {@link ConsumeConcurrentlyStatus} or
* {@link ConsumeOrderlyStatus}
* {@link ConsumeOrderlyStatus}
* @param sucSupplier {@link ConsumeConcurrentlyStatus} or
* {@link ConsumeOrderlyStatus}
* {@link ConsumeOrderlyStatus}
* @param <R> object
* @return R
*/
@ -195,7 +195,8 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport
throw new MessagingException(MessageBuilder.withPayload(
"DefaultMQPushConsumer init failed, Caused by " + e.getMessage())
.build(), e);
}finally {
}
finally {
InstrumentationManager.addHealthInstrumentation(instrumentation);
}
}

@ -46,7 +46,7 @@ public class RocketMQAckCallback implements AcknowledgmentCallback {
private final MessageQueue messageQueue;
public RocketMQAckCallback(DefaultLitePullConsumer consumer,
MessageQueue messageQueue,MessageExt messageExt) {
MessageQueue messageQueue, MessageExt messageExt) {
this.messageExt = messageExt;
this.consumer = consumer;
this.messageQueue = messageQueue;

@ -91,11 +91,12 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
// this.consumer.setPullBatchSize(1);
this.consumer.subscribe(topic, messageSelector);
this.consumer.setAutoCommit(false);
//register TopicMessageQueueChangeListener for messageQueuesForTopic
consumer.registerTopicMessageQueueChangeListener(topic, messageQueuesForTopic::put);
// register TopicMessageQueueChangeListener for messageQueuesForTopic
consumer.registerTopicMessageQueueChangeListener(topic,
messageQueuesForTopic::put);
this.consumer.start();
//Initialize messageQueuesForTopic immediately
messageQueuesForTopic.put(topic,consumer.fetchMessageQueues(topic));
// Initialize messageQueuesForTopic immediately
messageQueuesForTopic.put(topic, consumer.fetchMessageQueues(topic));
instrumentation.markStartedSuccessfully();
}
catch (MQClientException e) {
@ -108,9 +109,9 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
this.running = true;
}
private MessageQueue acquireCurrentMessageQueue(String topic,int queueId) {
Collection<MessageQueue> messageQueueSet = messageQueuesForTopic.get(topic);
if(CollectionUtils.isEmpty(messageQueueSet)){
private MessageQueue acquireCurrentMessageQueue(String topic, int queueId) {
Collection<MessageQueue> messageQueueSet = messageQueuesForTopic.get(topic);
if (CollectionUtils.isEmpty(messageQueueSet)) {
return null;
}
for (MessageQueue messageQueue : messageQueueSet) {
@ -151,7 +152,8 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
if (null == messageExt) {
return null;
}
MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(),messageExt.getQueueId());
MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(),
messageExt.getQueueId());
if (messageQueue == null) {
throw new IllegalArgumentException(
"The message queue is not in assigned list");
@ -160,7 +162,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
.convertMessage2Spring(messageExt);
return MessageBuilder.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
new RocketMQAckCallback(this.consumer,messageQueue, messageExt))
new RocketMQAckCallback(this.consumer, messageQueue, messageExt))
.build();
}

@ -100,8 +100,10 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler
// Use the default if the partition is on and no customization is available.
this.messageQueueSelector = RocketMQBeanContainerCache.getBean(
mqProducerProperties.getMessageQueueSelector(),
MessageQueueSelector.class, extendedProducerProperties.isPartitioned()
? new PartitionMessageQueueSelector() : null);
MessageQueueSelector.class,
extendedProducerProperties.isPartitioned()
? new PartitionMessageQueueSelector()
: null);
}
@Override
@ -169,23 +171,25 @@ public class RocketMQProducerMessageHandler extends AbstractMessageHandler
}
((TransactionMQProducer) defaultMQProducer)
.setTransactionListener(transactionListener);
if(log.isDebugEnabled()){
log.debug("send transaction message ->{}" , mqMessage);
if (log.isDebugEnabled()) {
log.debug("send transaction message ->{}", mqMessage);
}
sendResult = defaultMQProducer.sendMessageInTransaction(mqMessage,
message.getHeaders().get(RocketMQConst.USER_TRANSACTIONAL_ARGS));
}
else {
if(log.isDebugEnabled()){
log.debug("send message ->{}" , mqMessage);
if (log.isDebugEnabled()) {
log.debug("send message ->{}", mqMessage);
}
sendResult = this.send(mqMessage, this.messageQueueSelector,
message.getHeaders(), message);
}
log.info("the message has sent,message={},sendResult={}",mqMessage,sendResult);
log.info("the message has sent,message={},sendResult={}", mqMessage,
sendResult);
if (sendResult == null
|| !SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.error("message send fail.SendStatus is not OK.the message={}",mqMessage);
log.error("message send fail.SendStatus is not OK.the message={}",
mqMessage);
this.doFail(message, new MessagingException(
"message send fail.SendStatus is not OK."));
}

Loading…
Cancel
Save