diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index 93c6ef14d..3fe4aa790 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -40,6 +40,7 @@ rocketmq-example/rocketmq-sql-consume-example rocketmq-example/rocketmq-example-common rocketmq-example/rocketmq-tx-example + rocketmq-example/rocketmq-retrieable-consume-example spring-cloud-bus-rocketmq-example spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md index 53340b0b5..b6af10afc 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md +++ b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md @@ -811,6 +811,92 @@ public class RocketMQTxApplication { } ``` +## 最大重试消费示例 + +- 重试消费消息:根据配置的重新消费的次数,服务端会根据客户端消费是否成功,进行重新推送消息。 + +### 创建Topic + +```sh +sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t retrieable +``` + +### 示例代码 + +**application.yml** + +```yaml +server: + port: 28089 +spring: + application: + name: rocketmq-retrieable-consume-example + cloud: + stream: + function: + definition: consumer; + rocketmq: + binder: + name-server: localhost:9876 + bindings: + producer-out-0: + producer: + group: output_1 + consumer-in-0: + consumer: + ## According to the configured number of `max-reconsume-times`, + ## the server will re-push the message according to whether the client's consumption is successful or not + push: + max-reconsume-times: 3 + bindings: + producer-out-0: + destination: retrieable + consumer-in-0: + destination: retrieable + group: retrieable-consumer + +logging: + level: + org.springframework.context.support: debug + +``` + +**code** + +```java +@SpringBootApplication +public class RocketMQRetrieableConsumeApplication { + + private static final Logger log = LoggerFactory + .getLogger(RocketMQRetrieableConsumeApplication.class); + + @Autowired + private StreamBridge streamBridge; + + public static void main(String[] args) { + SpringApplication.run(RocketMQRetrieableConsumeApplication.class, args); + } + + @Bean + public ApplicationRunner producer() { + return args -> { + Map headers = new HashMap<>(); + Message msg = new GenericMessage( + new SimpleMsg("Hello RocketMQ For Retrieable ."), headers); + streamBridge.send("producer-out-0", msg); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + // Mock Exception in consumer function. + throw new RuntimeException("mock exception."); + }; + } +} +``` + ## Endpoint 信息查看 Spring Boot 应用支持通过 Endpoint 来暴露相关信息,RocketMQ Stream Starter 也支持这一点。 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme.md b/spring-cloud-alibaba-examples/rocketmq-example/readme.md index dfd2680e0..aaaf45bf2 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/readme.md +++ b/spring-cloud-alibaba-examples/rocketmq-example/readme.md @@ -807,6 +807,92 @@ public class RocketMQTxApplication { } ``` +## Maximum Retry Consumption Example + +- Retry consumption message: According to the configured number of re-consumption, the server will re-push the message according to whether the client's consumption is successful or not. + +### Create topic + +```sh +sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t retrieable +``` + +### Example code + +**application.yml** + +```yaml +server: + port: 28089 +spring: + application: + name: rocketmq-retrieable-consume-example + cloud: + stream: + function: + definition: consumer; + rocketmq: + binder: + name-server: localhost:9876 + bindings: + producer-out-0: + producer: + group: output_1 + consumer-in-0: + consumer: + ## According to the configured number of `max-reconsume-times`, + ## the server will re-push the message according to whether the client's consumption is successful or not + push: + max-reconsume-times: 3 + bindings: + producer-out-0: + destination: retrieable + consumer-in-0: + destination: retrieable + group: retrieable-consumer + +logging: + level: + org.springframework.context.support: debug + +``` + +**code** + +```java +@SpringBootApplication +public class RocketMQRetrieableConsumeApplication { + + private static final Logger log = LoggerFactory + .getLogger(RocketMQRetrieableConsumeApplication.class); + + @Autowired + private StreamBridge streamBridge; + + public static void main(String[] args) { + SpringApplication.run(RocketMQRetrieableConsumeApplication.class, args); + } + + @Bean + public ApplicationRunner producer() { + return args -> { + Map headers = new HashMap<>(); + Message msg = new GenericMessage( + new SimpleMsg("Hello RocketMQ For Retrieable ."), headers); + streamBridge.send("producer-out-0", msg); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + // Mock Exception in consumer function. + throw new RuntimeException("mock exception."); + }; + } +} +``` + ## Endpoint Add dependency `spring-cloud-starter-stream-rocketmq` to your pom.xml file, and configure your endpoint security strategy. diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/pom.xml new file mode 100644 index 000000000..d54b928ce --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/pom.xml @@ -0,0 +1,57 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../pom.xml + + 4.0.0 + + + rocketmq-retrieable-consume-example + Spring Cloud Starter Stream Alibaba RocketMQ Retrieable Consume Example + Example demonstrating how to use rocketmq to produce, and retrieable consume. + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-json + + + com.alibaba.cloud + rocketmq-example-common + ${revision} + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/java/com/alibaba/cloud/examples/retrieable/RocketMQRetrieableConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/java/com/alibaba/cloud/examples/retrieable/RocketMQRetrieableConsumeApplication.java new file mode 100644 index 000000000..4ed32553a --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/java/com/alibaba/cloud/examples/retrieable/RocketMQRetrieableConsumeApplication.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.retrieable; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import com.alibaba.cloud.examples.common.SimpleMsg; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +/** + * RocketMQ Retrieable Consume Example . + * + * @author Palmer.Xu + */ +@SpringBootApplication +public class RocketMQRetrieableConsumeApplication { + + private static final Logger log = LoggerFactory + .getLogger(RocketMQRetrieableConsumeApplication.class); + + @Autowired + private StreamBridge streamBridge; + + public static void main(String[] args) { + SpringApplication.run(RocketMQRetrieableConsumeApplication.class, args); + } + + @Bean + public ApplicationRunner producer() { + return args -> { + Map headers = new HashMap<>(); + Message msg = new GenericMessage( + new SimpleMsg("Hello RocketMQ For Retrieable ."), headers); + streamBridge.send("producer-out-0", msg); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + throw new RuntimeException("mock exception."); + }; + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/resources/application.yml new file mode 100644 index 000000000..29b9d0187 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-retrieable-consume-example/src/main/resources/application.yml @@ -0,0 +1,32 @@ +server: + port: 28089 +spring: + application: + name: rocketmq-retrieable-consume-example + cloud: + stream: + function: + definition: consumer; + rocketmq: + binder: + name-server: localhost:9876 + bindings: + producer-out-0: + producer: + group: output_1 + consumer-in-0: + consumer: + ## According to the configured number of `max-reconsume-times`, + ## the server will re-push the message according to whether the client's consumption is successful or not + push: + max-reconsume-times: 3 + bindings: + producer-out-0: + destination: retrieable + consumer-in-0: + destination: retrieable + group: retrieable-consumer + +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java index 13c2cd415..074ee1aa5 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java @@ -90,6 +90,8 @@ public final class RocketMQConsumerFactory { consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency()); consumer.setUnitName(consumerProperties.getUnitName()); + consumer.setMaxReconsumeTimes( + consumerProperties.getPush().getMaxReconsumeTimes()); return consumer; }