diff --git a/pom.xml b/pom.xml index 26cc9b9d7..55dd872aa 100644 --- a/pom.xml +++ b/pom.xml @@ -80,7 +80,7 @@ - 2.2.8.RELEASE + 2.2.9-SNAPSHOT Hoxton.SR12 diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index d471f7cac..059d85a87 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -18,7 +18,7 @@ Spring Cloud Alibaba Dependencies - 2.2.8.RELEASE + 2.2.9-SNAPSHOT 1.8.4 1.5.1 2.1.0 diff --git a/spring-cloud-alibaba-examples/nacos-example/nacos-discovery-example/nacos-discovery-consumer-sclb-example/src/main/java/com/alibaba/cloud/examples/ConsumerSCLBApplication.java b/spring-cloud-alibaba-examples/nacos-example/nacos-discovery-example/nacos-discovery-consumer-sclb-example/src/main/java/com/alibaba/cloud/examples/ConsumerSCLBApplication.java index 9cb57f9b5..f157897bf 100644 --- a/spring-cloud-alibaba-examples/nacos-example/nacos-discovery-example/nacos-discovery-consumer-sclb-example/src/main/java/com/alibaba/cloud/examples/ConsumerSCLBApplication.java +++ b/spring-cloud-alibaba-examples/nacos-example/nacos-discovery-example/nacos-discovery-consumer-sclb-example/src/main/java/com/alibaba/cloud/examples/ConsumerSCLBApplication.java @@ -105,7 +105,7 @@ public class ConsumerSCLBApplication { @Override public Mono> choose(Request request) { - log.info("random spring cloud loadbalacer active -.-"); + log.info("random spring cloud loadbalancer active -.-"); ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get().next().map(this::getInstanceResponse); diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index 93c6ef14d..3fe4aa790 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -40,6 +40,7 @@ rocketmq-example/rocketmq-sql-consume-example rocketmq-example/rocketmq-example-common rocketmq-example/rocketmq-tx-example + rocketmq-example/rocketmq-retrieable-consume-example spring-cloud-bus-rocketmq-example spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md index 53340b0b5..11c1407d9 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md +++ b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md @@ -26,7 +26,7 @@ Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间 下图是 Spring Cloud Stream 的架构设计。 -![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png) +![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/images/SCSt-with-binder.png) @@ -811,6 +811,92 @@ public class RocketMQTxApplication { } ``` +## 最大重试消费示例 + +- 重试消费消息:根据配置的重新消费的次数,服务端会根据客户端消费是否成功,进行重新推送消息。 + +### 创建Topic + +```sh +sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t retrieable +``` + +### 示例代码 + +**application.yml** + +```yaml +server: + port: 28089 +spring: + application: + name: rocketmq-retrieable-consume-example + cloud: + stream: + function: + definition: consumer; + rocketmq: + binder: + name-server: localhost:9876 + bindings: + producer-out-0: + producer: + group: output_1 + consumer-in-0: + consumer: + ## According to the configured number of `max-reconsume-times`, + ## the server will re-push the message according to whether the client's consumption is successful or not + push: + max-reconsume-times: 3 + bindings: + producer-out-0: + destination: retrieable + consumer-in-0: + destination: retrieable + group: retrieable-consumer + +logging: + level: + org.springframework.context.support: debug + +``` + +**code** + +```java +@SpringBootApplication +public class RocketMQRetrieableConsumeApplication { + + private static final Logger log = LoggerFactory + .getLogger(RocketMQRetrieableConsumeApplication.class); + + @Autowired + private StreamBridge streamBridge; + + public static void main(String[] args) { + SpringApplication.run(RocketMQRetrieableConsumeApplication.class, args); + } + + @Bean + public ApplicationRunner producer() { + return args -> { + Map headers = new HashMap<>(); + Message msg = new GenericMessage( + new SimpleMsg("Hello RocketMQ For Retrieable ."), headers); + streamBridge.send("producer-out-0", msg); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + // Mock Exception in consumer function. + throw new RuntimeException("mock exception."); + }; + } +} +``` + ## Endpoint 信息查看 Spring Boot 应用支持通过 Endpoint 来暴露相关信息,RocketMQ Stream Starter 也支持这一点。 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme.md b/spring-cloud-alibaba-examples/rocketmq-example/readme.md index dfd2680e0..f1f3c26d8 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/readme.md +++ b/spring-cloud-alibaba-examples/rocketmq-example/readme.md @@ -22,7 +22,7 @@ Binding is Bridge between the external messaging systems and application provide This is a overview of Spring Cloud Stream. -![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png) +![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/images/SCSt-with-binder.png) ## Preparation @@ -807,6 +807,92 @@ public class RocketMQTxApplication { } ``` +## Maximum Retry Consumption Example + +- Retry consumption message: According to the configured number of re-consumption, the server will re-push the message according to whether the client's consumption is successful or not. + +### Create topic + +```sh +sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t retrieable +``` + +### Example code + +**application.yml** + +```yaml +server: + port: 28089 +spring: + application: + name: rocketmq-retrieable-consume-example + cloud: + stream: + function: + definition: consumer; + rocketmq: + binder: + name-server: localhost:9876 + bindings: + producer-out-0: + producer: + group: output_1 + consumer-in-0: + consumer: + ## According to the configured number of `max-reconsume-times`, + ## the server will re-push the message according to whether the client's consumption is successful or not + push: + max-reconsume-times: 3 + bindings: + producer-out-0: + destination: retrieable + consumer-in-0: + destination: retrieable + group: retrieable-consumer + +logging: + level: + org.springframework.context.support: debug + +``` + +**code** + +```java +@SpringBootApplication +public class RocketMQRetrieableConsumeApplication { + + private static final Logger log = LoggerFactory + .getLogger(RocketMQRetrieableConsumeApplication.class); + + @Autowired + private StreamBridge streamBridge; + + public static void main(String[] args) { + SpringApplication.run(RocketMQRetrieableConsumeApplication.class, args); + } + + @Bean + public ApplicationRunner producer() { + return args -> { + Map headers = new HashMap<>(); + Message msg = new GenericMessage( + new SimpleMsg("Hello RocketMQ For Retrieable ."), headers); + streamBridge.send("producer-out-0", msg); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + // Mock Exception in consumer function. + throw new RuntimeException("mock exception."); + }; + } +} +``` + ## Endpoint Add dependency `spring-cloud-starter-stream-rocketmq` to your pom.xml file, and configure your endpoint security strategy. diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/pom.xml new file mode 100644 index 000000000..d54b928ce --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/pom.xml @@ -0,0 +1,57 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../pom.xml + + 4.0.0 + + + rocketmq-retrieable-consume-example + Spring Cloud Starter Stream Alibaba RocketMQ Retrieable Consume Example + Example demonstrating how to use rocketmq to produce, and retrieable consume. + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-json + + + com.alibaba.cloud + rocketmq-example-common + ${revision} + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/java/com/alibaba/cloud/examples/retrieable/RocketMQRetrieableConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/java/com/alibaba/cloud/examples/retrieable/RocketMQRetrieableConsumeApplication.java new file mode 100644 index 000000000..4ed32553a --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/java/com/alibaba/cloud/examples/retrieable/RocketMQRetrieableConsumeApplication.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.retrieable; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import com.alibaba.cloud.examples.common.SimpleMsg; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +/** + * RocketMQ Retrieable Consume Example . + * + * @author Palmer.Xu + */ +@SpringBootApplication +public class RocketMQRetrieableConsumeApplication { + + private static final Logger log = LoggerFactory + .getLogger(RocketMQRetrieableConsumeApplication.class); + + @Autowired + private StreamBridge streamBridge; + + public static void main(String[] args) { + SpringApplication.run(RocketMQRetrieableConsumeApplication.class, args); + } + + @Bean + public ApplicationRunner producer() { + return args -> { + Map headers = new HashMap<>(); + Message msg = new GenericMessage( + new SimpleMsg("Hello RocketMQ For Retrieable ."), headers); + streamBridge.send("producer-out-0", msg); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + throw new RuntimeException("mock exception."); + }; + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/resources/application.yml new file mode 100644 index 000000000..29b9d0187 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/resources/application.yml @@ -0,0 +1,32 @@ +server: + port: 28089 +spring: + application: + name: rocketmq-retrieable-consume-example + cloud: + stream: + function: + definition: consumer; + rocketmq: + binder: + name-server: localhost:9876 + bindings: + producer-out-0: + producer: + group: output_1 + consumer-in-0: + consumer: + ## According to the configured number of `max-reconsume-times`, + ## the server will re-push the message according to whether the client's consumption is successful or not + push: + max-reconsume-times: 3 + bindings: + producer-out-0: + destination: retrieable + consumer-in-0: + destination: retrieable + group: retrieable-consumer + +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-tx-example/src/main/java/com/alibaba/cloud/examples/tx/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-tx-example/src/main/java/com/alibaba/cloud/examples/tx/TransactionListenerImpl.java index 9697f2242..6c67a1b47 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-tx-example/src/main/java/com/alibaba/cloud/examples/tx/TransactionListenerImpl.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-tx-example/src/main/java/com/alibaba/cloud/examples/tx/TransactionListenerImpl.java @@ -30,7 +30,7 @@ import org.springframework.stereotype.Component; public class TransactionListenerImpl implements TransactionListener { /** - * Excute local transaction. + * Execute local transaction. * @param msg messages * @param arg message args * @return Transaction state diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java index 4d5b1bc87..bca0be7a8 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java @@ -17,7 +17,6 @@ package com.alibaba.cloud.nacos; import java.net.Inet4Address; - import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; @@ -168,7 +167,7 @@ public class NacosDiscoveryProperties { /** * choose IPv4 or IPv6,if you don't set it will choose IPv4. */ - private String ipType = "IPv4" ; + private String ipType = "IPv4"; /** * The port your want to register for your service instance, needn't to set it if the diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosServiceInstance.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosServiceInstance.java index 1db224499..2c087e8d4 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosServiceInstance.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosServiceInstance.java @@ -18,6 +18,7 @@ package com.alibaba.cloud.nacos; import java.net.URI; import java.util.Map; +import java.util.Objects; import org.springframework.cloud.client.DefaultServiceInstance; import org.springframework.cloud.client.ServiceInstance; @@ -87,4 +88,26 @@ public class NacosServiceInstance implements ServiceInstance { this.metadata = metadata; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NacosServiceInstance that = (NacosServiceInstance) o; + + return Objects.equals(this.serviceId, that.serviceId) + && Objects.equals(this.host, that.host) + && this.port == that.port + && this.secure == that.secure + && Objects.equals(this.metadata, that.metadata); + } + + @Override + public int hashCode() { + return (serviceId == null) ? 31 : (serviceId.hashCode() + 31); + } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/rest/SeataRestTemplateAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/rest/SeataRestTemplateAutoConfiguration.java index b9b587568..c8430e9fa 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/rest/SeataRestTemplateAutoConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/rest/SeataRestTemplateAutoConfiguration.java @@ -16,17 +16,8 @@ package com.alibaba.cloud.seata.rest; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import javax.annotation.PostConstruct; - -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.http.client.ClientHttpRequestInterceptor; -import org.springframework.web.client.RestTemplate; /** * @author xiaojing @@ -35,32 +26,14 @@ import org.springframework.web.client.RestTemplate; @Configuration(proxyBeanMethods = false) public class SeataRestTemplateAutoConfiguration { - @Autowired(required = false) - private Collection restTemplates; - - @Autowired - private SeataRestTemplateInterceptor seataRestTemplateInterceptor; - - @PostConstruct - public void init() { - if (this.restTemplates != null) { - for (RestTemplate restTemplate : restTemplates) { - List interceptors = new ArrayList( - restTemplate.getInterceptors()); - interceptors.add(this.seataRestTemplateInterceptor); - restTemplate.setInterceptors(interceptors); - } - } + @Bean + public SeataRestTemplateInterceptor seataRestTemplateInterceptor() { + return new SeataRestTemplateInterceptor(); } - @Configuration(proxyBeanMethods = false) - static class SeataRestTemplateInterceptorConfiguration { - - @Bean - public SeataRestTemplateInterceptor seataRestTemplateInterceptor() { - return new SeataRestTemplateInterceptor(); - } - + @Bean + public SeataRestTemplateInterceptorAfterPropertiesSet seataRestTemplateInterceptorConfiguration() { + return new SeataRestTemplateInterceptorAfterPropertiesSet(); } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/rest/SeataRestTemplateInterceptorAfterPropertiesSet.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/rest/SeataRestTemplateInterceptorAfterPropertiesSet.java new file mode 100644 index 000000000..efef0bd64 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-seata/src/main/java/com/alibaba/cloud/seata/rest/SeataRestTemplateInterceptorAfterPropertiesSet.java @@ -0,0 +1,51 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.seata.rest; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.client.ClientHttpRequestInterceptor; +import org.springframework.web.client.RestTemplate; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * @author ZhangZhi + */ +public class SeataRestTemplateInterceptorAfterPropertiesSet implements InitializingBean { + + @Autowired(required = false) + private Collection restTemplates; + + @Autowired + private SeataRestTemplateInterceptor seataRestTemplateInterceptor; + + @Override + public void afterPropertiesSet() { + if (this.restTemplates != null) { + for (RestTemplate restTemplate : restTemplates) { + List interceptors = new ArrayList<>( + restTemplate.getInterceptors()); + interceptors.add(this.seataRestTemplateInterceptor); + restTemplate.setInterceptors(interceptors); + } + } + } + +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/java/com/alibaba/cloud/sentinel/custom/SentinelAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/java/com/alibaba/cloud/sentinel/custom/SentinelAutoConfiguration.java index 5d80b7350..f51796826 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/java/com/alibaba/cloud/sentinel/custom/SentinelAutoConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/java/com/alibaba/cloud/sentinel/custom/SentinelAutoConfiguration.java @@ -147,7 +147,7 @@ public class SentinelAutoConfiguration { @ConditionalOnClass(name = "org.springframework.web.client.RestTemplate") @ConditionalOnProperty(name = "resttemplate.sentinel.enabled", havingValue = "true", matchIfMissing = true) - public SentinelBeanPostProcessor sentinelBeanPostProcessor( + public static SentinelBeanPostProcessor sentinelBeanPostProcessor( ApplicationContext applicationContext) { return new SentinelBeanPostProcessor(applicationContext); } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 97dd52e72..be24b764d 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-sentinel/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -92,7 +92,7 @@ { "name": "spring.cloud.sentinel.servlet.block-page", "type": "java.lang.String", - "description": "recommoned use spring.cloud.sentinel.block-page." + "description": "recommend use spring.cloud.sentinel.block-page." }, { "name": "spring.cloud.sentinel.flow.coldFactor", diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java index 13c2cd415..074ee1aa5 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java @@ -90,6 +90,8 @@ public final class RocketMQConsumerFactory { consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency()); consumer.setUnitName(consumerProperties.getUnitName()); + consumer.setMaxReconsumeTimes( + consumerProperties.getPush().getMaxReconsumeTimes()); return consumer; }