diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index b69903cb0..a5c21cc8a 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -27,7 +27,7 @@ 4.0.1 1.0.5 2.16.0 - 4.3.1 + 2.0.2 2.1.6 2.6.5 0.2.1.RELEASE @@ -107,8 +107,8 @@ org.apache.rocketmq - rocketmq-client - ${rocketmq.version} + rocketmq-spring-boot-starter + ${rocketmq.starter.version} diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc index 3724fd411..23776ee86 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc @@ -112,139 +112,165 @@ messageChannel.send(MessageBuilder.withPayload("simple msg").build()); 这段代码所有的消息类都是 `spring-messaging` 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。 -**Spring Cloud Stream 底层也是基于这段代码去做了各种抽象。** +**Spring Cloud Stream 底层基于这段代码去做了各种抽象。** -### Spring Cloud Alibaba RocketMQ Binder 实现原理 -.RocketMQ Binder处理流程 -image::https://cdn.nlark.com/lark/0/2018/png/64647/1543560843558-24525bf4-1d0e-4e10-be5f-bdde7127f6e6.png[] +### 如何使用 Spring Cloud Alibaba RocketMQ Binder ### +如果要在您的项目中引入 RocketMQ Binder,需要引入如下 maven 依赖: -RocketMQ Binder 的核心主要就是这3个类:`RocketMQMessageChannelBinder`,`RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 +```xml + + org.springframework.cloud + spring-cloud-stream-binder-rocketmq + +``` -`RocketMQMessageChannelBinder` 是个标准的 Binder 实现,其内部构建 `RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 +或者可以使用 Spring Cloud Stream RocketMQ Starter: -`RocketMQMessageHandler` 用于 RocketMQ `Producer` 的启动以及消息的发送,其内部会根据 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类,去创建 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`。 +```xml + + org.springframework.cloud + spring-cloud-starter-stream-rocketmq + +``` -在构造 `org.apache.rocketmq.common.message.Message` 的过程中会根据 `org.springframework.messaging.Message` 的 Header 构造成 `RocketMQMessageHeaderAccessor`。然后再根据 `RocketMQMessageHeaderAccessor` 中的一些属性,比如 tags、keys、flag等属性设置到 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message` 中。 +### Spring Cloud Alibaba RocketMQ Binder 实现 -`RocketMQInboundChannelAdapter` 用于 RocketMQ `Consumer` 的启动以及消息的接收。其内部还支持 https://github.com/spring-projects/spring-retry[spring-retry] 的使用。 +RocketMQ Binder 的实现依赖于 https://github.com/apache/rocketmq-spring[RocketMQ-Spring] 框架。 -在消费消息的时候可以从 Header 中获取 `Acknowledgement` 并进行一些设置。 +RocketMQ-Spring 框架是 RocketMQ 与 Spring Boot 的整合,RocketMQ Spring 主要提供了 3 个特性: -比如使用 `MessageListenerConcurrently` 进行异步消费的时候,可以设置延迟消费: +1. 使用 `RocketMQTemplate` 用来统一发送消息,包括同步、异步发送消息和事务消息 +2. `@RocketMQTransactionListener` 注解用来处理事务消息的监听和回查 +3. `@RocketMQMessageListener` 注解用来消费消息 -```java -@StreamListener("input") -public void receive(Message message) { - RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); - Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); - acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER); - acknowledgement.setConsumeConcurrentlyDelayLevel(1); -} -``` +RocketMQ Binder 的核心类 RocketMQMessageChannelBinder 实现了 Spring Cloud Stream 规范,内部构建会 `RocketMQInboundChannelAdapter` 和 `RocketMQMessageHandler`。 -比如使用 `MessageListenerOrderly` 进行顺序消费的时候,可以设置延迟消费: +`RocketMQMessageHandler` 会基于 Binding 配置构造 `RocketMQTemplate`,`RocketMQTemplate` 内部会把 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类转换成 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`,然后发送出去。 -```java -@StreamListener("input") -public void receive(Message message) { - RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); - Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); - acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT); - acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000); -} -``` +`RocketMQInboundChannelAdapter` 也会基于 Binding 配置构造 `RocketMQListenerBindingContainer`,`RocketMQListenerBindingContainer` 内部会启动 RocketMQ `Consumer` 接收消息。 -Provider端支持的配置: - -:frame: topbot -[width="60%",options="header"] -|==== -^|配置项 ^|含义 ^| 默认值 -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|是否启用producer|true -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|消息发送的最大字节数|0(大于0才会生效,RocketMQ 默认值为4M = 1024 * 1024 * 4) -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|是否启用 `TransactionMQProducer` 发送事务消息|false -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|事务消息对应的 `org.apache.rocketmq.client.producer.LocalTransactionExecuter` 接口实现类 全类名。 比如 `org.test.MyExecuter`| -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|事务消息对应的 `org.apache.rocketmq.client.producer.TransactionCheckListener` 接口实现类 全类名。 比如 `org.test.MyTransactionCheckListener`| -|==== - -Consumer端支持的配置: - -:frame: topbot -[width="60%",options="header"] -|==== -^|配置项 ^|含义| 默认值 -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|是否启用consumer|true -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer订阅只有包括这些tags的topic消息。多个标签之间使用 "\|\|" 分割(不填表示不进行tags的过滤,订阅所有消息)| -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer订阅满足sql要求的topic消息(如果同时配置了tags内容,sql的优先级更高)| -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|Consumer是否是广播模式|false -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|顺序消费 or 异步消费|false -|==== - -### Endpoint支持 - -在使用Endpoint特性之前需要在 Maven 中添加 `spring-boot-starter-actuator` 依赖,并在配置中允许 Endpoints 的访问。 - -* Spring Boot 1.x 中添加配置 `management.security.enabled=false`。暴露的 endpoint 路径为 `/rocketmq_binder` -* Spring Boot 2.x 中添加配置 `management.endpoints.web.exposure.include=*`。暴露的 endpoint 路径为 `/actuator/rocketmq-binder` - -Endpoint 会统计消息最后一次发送的数据,消息发送成功或失败的次数,消息消费成功或失败的次数等数据。 - -```json -{ - "runtime": { - "lastSend.timestamp": 1542786623915 - }, - "metrics": { - "scs-rocketmq.consumer.test-topic.totalConsumed": { - "count": 11 - }, - "scs-rocketmq.consumer.test-topic.totalConsumedFailures": { - "count": 0 - }, - "scs-rocketmq.producer.test-topic.totalSentFailures": { - "count": 0 - }, - "scs-rocketmq.consumer.test-topic.consumedPerSecond": { - "count": 11, - "fifteenMinuteRate": 0.012163847780107841, - "fiveMinuteRate": 0.03614605351360527, - "meanRate": 0.3493213353657594, - "oneMinuteRate": 0.17099243039490175 - }, - "scs-rocketmq.producer.test-topic.totalSent": { - "count": 5 - }, - "scs-rocketmq.producer.test-topic.sentPerSecond": { - "count": 5, - "fifteenMinuteRate": 0.005540151995103271, - "fiveMinuteRate": 0.01652854617838251, - "meanRate": 0.10697493212602836, - "oneMinuteRate": 0.07995558537067671 - }, - "scs-rocketmq.producer.test-topic.sentFailuresPerSecond": { - "count": 0, - "fifteenMinuteRate": 0.0, - "fiveMinuteRate": 0.0, - "meanRate": 0.0, - "oneMinuteRate": 0.0 - }, - "scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": { - "count": 0, - "fifteenMinuteRate": 0.0, - "fiveMinuteRate": 0.0, - "meanRate": 0.0, - "oneMinuteRate": 0.0 - } - } -} -``` +NOTE: 在使用 RocketMQ Binder 的同时也可以配置 rocketmq.** 用于触发 RocketMQ Spring 相关的 AutoConfiguration + +`RocketMQHeaders` 中定义了很多 Header 常量,在发送消息的时候可以设置到 Spring Message 的 Header 中,用于触发 RocketMQ 相关的 feature: -注意:要想查看统计数据需要在pom里加上 https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core[metrics-core依赖]。如若不加,endpoint 将会显示 warning 信息而不会显示统计信息: +```java +MessageBuilder builder = MessageBuilder.withPayload(msg) + .setHeader(RocketMQHeaders.TAGS, "binder") + .setHeader(RocketMQHeaders.MESSAGE_ID, "my-msg-id") + .setHeader("DELAY", "1"); +Message message = builder.build(); +output().send(message); +``` -```json -{ - "warning": "please add metrics-core dependency, we use it for metrics" -} -``` \ No newline at end of file +### 配置选项 + +#### RocketMQ Binder Properties + +spring.cloud.stream.rocketmq.binder.name-server:: +RocketMQ NameServer 地址。 ++ +Default: `127.0.0.1:9876`. +spring.cloud.stream.rocketmq.binder.access-key:: +阿里云账号 AccessKey。 ++ +Default: null. +spring.cloud.stream.rocketmq.binder.secret-key:: +阿里云账号 SecretKey。 ++ +Default: null. +spring.cloud.stream.rocketmq.binder.enable-msg-trace:: +是否为 Producer 和 Consumer 开启消息轨迹功能 ++ +Default: `true`. +spring.cloud.stream.rocketmq.binder.customized-trace-topic:: +消息轨迹开启后存储的 topic 名称。 ++ +Default: `RMQ_SYS_TRACE_TOPIC`. + + +#### RocketMQ Consumer Properties + +下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings..consumer.` 为前缀的 RocketMQ Consumer 相关的配置。 + +enable:: +是否启用 Consumer。 ++ +默认值: `true`. +tags:: +Consumer 基于 TAGS 订阅,多个 tag 以 `||` 分割。 ++ +默认值: empty. +sql:: +Consumer 基于 SQL 订阅。 ++ +默认值: empty. +broadcasting:: +Consumer 是否是广播消费模式。如果想让所有的订阅者都能接收到消息,可以使用广播模式。 ++ +默认值: `false`. +orderly:: +Consumer 是否同步消费消息模式。 ++ +默认值: `false`. +delayLevelWhenNextConsume:: +异步消费消息模式下消费失败重试策略: +* -1,不重复,直接放入死信队列 +* 0,broker 控制重试策略 +* >0,client 控制重试策略 ++ +默认值: `0`. +suspendCurrentQueueTimeMillis:: +同步消费消息模式下消费失败后再次消费的时间间隔。 ++ +默认值: `1000`. + +#### RocketMQ Provider Properties + +下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings..producer.` 为前缀的 RocketMQ Producer 相关的配置。 + +enable:: +是否启用 Producer。 ++ +默认值: `true`. +group:: +Producer group name。 ++ +默认值: empty. +maxMessageSize:: +消息发送的最大字节数。 ++ +默认值: `8249344`. +transactional:: +是否发送事务消息。 ++ +默认值: `false`. +sync:: +是否使用同步得方式发送消息。 ++ +默认值: `false`. +vipChannelEnabled:: +是否在 Vip Channel 上发送消息。 ++ +默认值: `true`. +sendMessageTimeout:: +发送消息的超时时间(毫秒)。 ++ +默认值: `3000`. +compressMessageBodyThreshold:: +消息体压缩阀值(当消息体超过 4k 的时候会被压缩)。 ++ +默认值: `4096`. +retryTimesWhenSendFailed:: +在同步发送消息的模式下,消息发送失败的重试次数。 ++ +默认值: `2`. +retryTimesWhenSendAsyncFailed:: +在异步发送消息的模式下,消息发送失败的重试次数。 ++ +默认值: `2`. +retryNextServer:: +消息发送失败的情况下是否重试其它的 broker。 ++ +默认值: `false`. diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc index 81e36a945..d8406138e 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc @@ -114,137 +114,162 @@ All the message types in this code are provided by the `spring-messaging`module. **The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.** -### How Spring Cloud Alibaba RocketMQ Binder Works +### How to use Spring Cloud Alibaba RocketMQ Binder ### -.RocketMQ Binder Workflow -image::https://cdn.nlark.com/lark/0/2018/png/64647/1543560843558-24525bf4-1d0e-4e10-be5f-bdde7127f6e6.png[] +For using the Spring Cloud Alibaba RocketMQ Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates: +```xml + + org.springframework.cloud + spring-cloud-stream-binder-rocketmq + +``` -The core of RocketMQ Binder are the 3 classes below: `RocketMQMessageChannelBinder`,`RocketMQInboundChannelAdapter` and `RocketMQMessageHandler`. +Alternatively, you can also use the Spring Cloud Stream RocketMQ Starter: -`RocketMQMessageChannelBinder` is a standard implementation of Binder. It contains `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` as its internal constructions. +```xml + + org.springframework.cloud + spring-cloud-starter-stream-rocketmq + +``` -`RocketMQMessageHandler` is used to start RocketMQ `Producer` and send messages. It creates message type of RocketMQ `org.apache.rocketmq.common.message.Message` based on the message type of `org.springframework.messaging.Message` in the `spring-messaging` module. +### How Spring Cloud Alibaba RocketMQ Binder Works -When constructing `org.apache.rocketmq.common.message.Message`, it constructs `RocketMQMessageHeaderAccessor` based on the header of `org.springframework.messaging.Message`. Then, based on some of the properties in `RocketMQMessageHeaderAccessor` , it sets some of message attributes such as tags, keys, and flag in `org.apache.rocketmq.common.message.Message` of RocketMQ. +The implementation of RocketMQ Binder depend on the https://github.com/apache/rocketmq-spring[RocketMQ-Spring] framework. -`RocketMQInboundChannelAdapter` is used to start RocketMQ `Consumer` and receive messages. It also support the usage of https://github.com/spring-projects/spring-retry[spring-retry]. +RocketMQ Spring framework is an integration of RocketMQ and Spring Boot. It provides three main features: -You can also obtain `Acknowledgement` from the Header and make some configurations. +1. `RocketMQTemplate`: Sending messages, including synchronous, asynchronous, and transactional messages. +2. `@RocketMQTransactionListener`: Listen and check for transaction messages. +3. `@RocketMQMessageListener`: Consume messages. -For example, you can set delayed message consumption when `MessageListenerConcurrently` is used for asynchronous message consumption: +`RocketMQMessageChannelBinder` is a standard implementation of Binder, it will build `RocketMQInboundChannelAdapter` and `RocketMQMessageHandler` internally. -```java -@StreamListener("input") -public void receive(Message message) { - RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); - Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); - acknowledgement.setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.RECONSUME_LATER); - acknowledgement.setConsumeConcurrentlyDelayLevel(1); -} -``` +`RocketMQMessageHandler` will construct `RocketMQTemplate` based on the Binding configuration. `RocketMQTemplate` will convert the `org.springframework.messaging.Message` message class of `spring-messaging` module to the RocketMQ message class `org.apache.rocketmq.common .message.Message` internally, then send it out. -You can also set delayed message consumption when `MessageListenerOrderly` is used for consuming ordered messages. +`RocketMQInboundChannelAdapter` will also construct `RocketMQListenerBindingContainer` based on the Binding configuration, and `RocketMQListenerBindingContainer` will start the RocketMQ `Consumer` to receive the messages. -```java -@StreamListener("input") -public void receive(Message message) { - RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message); - Acknowledgement acknowledgement = headerAccessor.getAcknowledgement(message); - acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT); - acknowledgement.setConsumeOrderlySuspendCurrentQueueTimeMill(5000); -} -``` +NOTE: RocketMQ Binder Application can also be used to configure rocketmq.** to trigger RocketMQ Spring related AutoConfiguration -Supported Configurations of Provider: - -:frame: topbot -[width="60%",options="header"] -|==== -^|Configuration ^|Description ^| Default Value -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.enabled`|Whether to use producer|true -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.max-message-size`|Maximum bytes of messages sent|0(Take effect only when it’s bigger than 0. The default value of RocketMQ is 4M = 1024 * 1024 * 4) -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transactional`|Whether to use `TransactionMQProducer` to send transaction messages|false -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.executer`|Full class name of the interface implementation class related to `org.apache.rocketmq.client.producer.LocalTransactionExecuter` For example, `org.test.MyExecuter`| -|`spring.cloud.stream.rocketmq.bindings.your-output-binding.producer.transaction-check-listener`|Full class name of the interface implementation class related to `org.apache.rocketmq.client.producer.TransactionCheckListener` For example, `org.test.MyTransactionCheckListener`| -|==== - -Supported Configurations of Consumer: - -:frame: topbot -[width="60%",options="header"] -|==== -^|Configuration ^|Description| Default Value -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.enabled`|Whether to use consumer|true -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.tags`|Consumer will only subscribe to messages with these tags Tags are separated by "\|\|" (If not specified, it means the consumer subscribes to all messages)| -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.sql`|Consumer subscribes to the messages as requested in the SQL(If tags are also specified, SQL has a higher priority than tags.)| -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.broadcasting`|If the consumer uses the broadcasting mode|false -|`spring.cloud.stream.rocketmq.bindings.your-input-binding.consumer.orderly`|Ordered message consumption or asychronous consumption|false -|==== - -### Endpoint Support - -Before you use the Endpoint feature, please add the `spring-boot-starter-actuator` dependency in Maven, and enable access of Endpoints in your configuration. - -* Add `management.security.enabled=false`in Spring Boot 1.x. The exposed endpoint path is `/rocketmq_binder` -* Add `management.endpoints.web.exposure.include=*`in Spring Boot 2.x. The exposed endpoint path is `/actuator/rocketmq-binder` - -Endpoint will collects data about the last message that is sent, the number of successes or failures of message sending, and the number of successes of failures of message consumption. - -```json -{ - "runtime": { - "lastSend.timestamp": 1542786623915 - }, - "metrics": { - "scs-rocketmq.consumer.test-topic.totalConsumed": { - "count": 11 - }, - "scs-rocketmq.consumer.test-topic.totalConsumedFailures": { - "count": 0 - }, - "scs-rocketmq.producer.test-topic.totalSentFailures": { - "count": 0 - }, - "scs-rocketmq.consumer.test-topic.consumedPerSecond": { - "count": 11, - "fifteenMinuteRate": 0.012163847780107841, - "fiveMinuteRate": 0.03614605351360527, - "meanRate": 0.3493213353657594, - "oneMinuteRate": 0.17099243039490175 - }, - "scs-rocketmq.producer.test-topic.totalSent": { - "count": 5 - }, - "scs-rocketmq.producer.test-topic.sentPerSecond": { - "count": 5, - "fifteenMinuteRate": 0.005540151995103271, - "fiveMinuteRate": 0.01652854617838251, - "meanRate": 0.10697493212602836, - "oneMinuteRate": 0.07995558537067671 - }, - "scs-rocketmq.producer.test-topic.sentFailuresPerSecond": { - "count": 0, - "fifteenMinuteRate": 0.0, - "fiveMinuteRate": 0.0, - "meanRate": 0.0, - "oneMinuteRate": 0.0 - }, - "scs-rocketmq.consumer.test-topic.consumedFailuresPerSecond": { - "count": 0, - "fifteenMinuteRate": 0.0, - "fiveMinuteRate": 0.0, - "meanRate": 0.0, - "oneMinuteRate": 0.0 - } - } -} -``` +The headers defined in `RocketMQHeaders`, which can be set to the header of spring message when sending a message to trigger the RocketMQ related feature: -Note: To view statistics, add the https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core[metrics-core dependency] in POM. If not added, the endpoint will return warning instead of statistics: +```java +MessageBuilder builder = MessageBuilder.withPayload(msg) + .setHeader(RocketMQHeaders.TAGS, "binder") + .setHeader(RocketMQHeaders.MESSAGE_ID, "my-msg-id") + .setHeader("DELAY", "1"); +Message message = builder.build(); +output().send(message); +``` -```json -{ - "warning": "please add metrics-core dependency, we use it for metrics" -} -``` \ No newline at end of file +### Configuration Options + +#### RocketMQ Binder Properties + +spring.cloud.stream.rocketmq.binder.name-server:: +The name server of RocketMQ Server. ++ +Default: `127.0.0.1:9876`. +spring.cloud.stream.rocketmq.binder.access-key:: +The AccessKey of Alibaba Cloud Account. ++ +Default: null. +spring.cloud.stream.rocketmq.binder.secret-key:: +The SecretKey of Alibaba Cloud Account. ++ +Default: null. +spring.cloud.stream.rocketmq.binder.enable-msg-trace:: +Enable Message Trace feature for all producers and consumers. ++ +Default: `true`. +spring.cloud.stream.rocketmq.binder.customized-trace-topic:: +The trace topic for message trace. ++ +Default: `RMQ_SYS_TRACE_TOPIC`. + + +#### RocketMQ Consumer Properties + +The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings..consumer.`. + +enable:: +Enable Consumer Binding. ++ +Default: `true`. +tags:: +Consumer subscription tags expression, tags split by `||`. ++ +Default: empty. +sql:: +Consumer subscription sql expression. ++ +Default: empty. +broadcasting:: +Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. ++ +Default: `false`. +orderly:: +Receiving message concurrently or orderly. ++ +Default: `false`. +delayLevelWhenNextConsume:: +Message consume retry strategy for concurrently consume: +* -1,no retry,put into DLQ directly +* 0,broker control retry frequency +* >0,client control retry frequency ++ +Default: `0`. +suspendCurrentQueueTimeMillis:: +Time interval of message consume retry for orderly consume. ++ +Default: `1000`. + +#### RocketMQ Provider Properties + +The following properties are available for RocketMQ producers only and must be prefixed with `spring.cloud.stream.rocketmq.bindings..producer.`. + +enable:: +Enable Producer Binding. ++ +Default: `true`. +group:: +Producer group name. ++ +Default: empty. +maxMessageSize:: +Maximum allowed message size in bytes. ++ +Default: `8249344`. +transactional:: +Send Transactional Message. ++ +Default: `false`. +sync:: +Send message in synchronous mode. ++ +Default: `false`. +vipChannelEnabled:: +Send message with vip channel. ++ +Default: `true`. +sendMessageTimeout:: +Millis of send message timeout. ++ +Default: `3000`. +compressMessageBodyThreshold:: +Compress message body threshold, namely, message body larger than 4k will be compressed on default. ++ +Default: `4096`. +retryTimesWhenSendFailed:: +Maximum number of retry to perform internally before claiming sending failure in synchronous mode. ++ +Default: `2`. +retryTimesWhenSendAsyncFailed:: +Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. ++ +Default: `2`. +retryNextServer:: +Indicate whether to retry another broker on sending failure internally. ++ +Default: `false`. \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index bedfeff25..96f455570 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -34,7 +34,8 @@ fescar-example/storage-service fescar-example/account-service acm-example/acm-local-example - rocketmq-example + rocketmq-example/rocketmq-consume-example + rocketmq-example/rocketmq-produce-example sms-example spring-cloud-bus-rocketmq-example schedulerx-example/schedulerx-simple-task-example diff --git a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml similarity index 83% rename from spring-cloud-alibaba-examples/rocketmq-example/pom.xml rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml index 84ef50291..cbdf8bca2 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/pom.xml @@ -6,14 +6,14 @@ org.springframework.cloud spring-cloud-alibaba-examples 0.2.2.BUILD-SNAPSHOT - ../pom.xml + ../../pom.xml 4.0.0 - rocketmq-example + rocketmq-consume-example jar - Example demonstrating how to use rocketmq + Example demonstrating how to use rocketmq consume @@ -32,11 +32,6 @@ spring-boot-starter-actuator - - io.dropwizard.metrics - metrics-core - - diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java similarity index 100% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java similarity index 69% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java index a486ebc05..35fc25a6b 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/ReceiveService.java @@ -25,14 +25,9 @@ public class ReceiveService { System.out.println("input3 receive: " + foo); } - @StreamListener("input1") - public void receiveInput1Again(String receiveMsg) { - System.out.println("input1 receive again: " + receiveMsg); + @StreamListener("input4") + public void receiveTransactionalMsg(String transactionMsg) { + System.out.println("input4 receive transaction msg: " + transactionMsg); } - @StreamListener("input4") - public void receiveTransactionalMsg(String transactionMsg) { - System.out.println("input4 receive transaction msg: " + transactionMsg); - } - } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java new file mode 100644 index 000000000..252049192 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQConsumerApplication.java @@ -0,0 +1,36 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQConsumerApplication.MySink; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.messaging.SubscribableChannel; + +/** + * @author Jim + */ +@SpringBootApplication +@EnableBinding({ MySink.class }) +public class RocketMQConsumerApplication { + + public interface MySink { + + @Input("input1") + SubscribableChannel input1(); + + @Input("input2") + SubscribableChannel input2(); + + @Input("input3") + SubscribableChannel input3(); + + @Input("input4") + SubscribableChannel input4(); + } + + public static void main(String[] args) { + SpringApplication.run(RocketMQConsumerApplication.class, args); + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties similarity index 53% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties index 3bfe511fa..ec27539c8 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/resources/application.properties +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-consume-example/src/main/resources/application.properties @@ -1,21 +1,9 @@ -spring.cloud.stream.default-binder=rocketmq - -spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 - -spring.cloud.stream.bindings.output1.destination=test-topic -spring.cloud.stream.bindings.output1.content-type=application/json - -spring.cloud.stream.bindings.output2.destination=TransactionTopic -spring.cloud.stream.bindings.output2.content-type=application/json -spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true -spring.cloud.stream.rocketmq.bindings.output2.producer.executer=org.springframework.cloud.alibaba.cloud.examples.MyTransactionExecuter -spring.cloud.stream.rocketmq.bindings.output2.producer.transaction-check-listener=org.springframework.cloud.alibaba.cloud.examples.MyTransactionCheckListener +spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain spring.cloud.stream.bindings.input1.group=test-group1 spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true -spring.cloud.stream.bindings.input1.consumer.maxAttempts=1 spring.cloud.stream.bindings.input2.destination=test-topic spring.cloud.stream.bindings.input2.content-type=text/plain @@ -30,15 +18,15 @@ spring.cloud.stream.bindings.input3.content-type=application/json spring.cloud.stream.bindings.input3.group=test-group3 spring.cloud.stream.rocketmq.bindings.input3.consumer.tags=tagObj spring.cloud.stream.bindings.input3.consumer.concurrency=20 -spring.cloud.stream.bindings.input3.consumer.maxAttempts=1 spring.cloud.stream.bindings.input4.destination=TransactionTopic spring.cloud.stream.bindings.input4.content-type=text/plain spring.cloud.stream.bindings.input4.group=transaction-group -spring.cloud.stream.bindings.input4.consumer.concurrency=210 +spring.cloud.stream.bindings.input4.consumer.concurrency=5 -spring.application.name=rocketmq-example +spring.application.name=rocketmq-consume-example -server.port=28081 +server.port=28082 -management.endpoints.web.exposure.include=* \ No newline at end of file +management.endpoints.web.exposure.include=* +management.endpoint.health.show-details=always \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml new file mode 100644 index 000000000..426a330c7 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/pom.xml @@ -0,0 +1,54 @@ + + + + + org.springframework.cloud + spring-cloud-alibaba-examples + 0.2.2.BUILD-SNAPSHOT + ../../pom.xml + + 4.0.0 + + + rocketmq-produce-example + jar + Example demonstrating how to use rocketmq produce + + + + + org.springframework.cloud + spring-cloud-starter-stream-rocketmq + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java new file mode 100644 index 000000000..e98b6a10a --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/Foo.java @@ -0,0 +1,39 @@ +package org.springframework.cloud.alibaba.cloud.examples; + +/** + * @author Jim + */ +public class Foo { + + private int id; + private String bar; + + public Foo() { + } + + public Foo(int id, String bar) { + this.id = id; + this.bar = bar; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getBar() { + return bar; + } + + public void setBar(String bar) { + this.bar = bar; + } + + @Override + public String toString() { + return "Foo{" + "id=" + id + ", bar='" + bar + '\'' + '}'; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java similarity index 66% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java index f736a0e5b..bd157be03 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/RocketMQProduceApplication.java @@ -4,36 +4,18 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySink; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQProduceApplication.MySource; import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.context.annotation.Bean; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.SubscribableChannel; /** * @author Jim */ @SpringBootApplication -@EnableBinding({ MySource.class, MySink.class }) -public class RocketMQApplication { - - public interface MySink { - - @Input("input1") - SubscribableChannel input1(); - - @Input("input2") - SubscribableChannel input2(); - - @Input("input3") - SubscribableChannel input3(); - - @Input("input4") - SubscribableChannel input4(); - } +@EnableBinding({ MySource.class }) +public class RocketMQProduceApplication { public interface MySource { @Output("output1") @@ -44,7 +26,7 @@ public class RocketMQApplication { } public static void main(String[] args) { - SpringApplication.run(RocketMQApplication.class, args); + SpringApplication.run(RocketMQProduceApplication.class, args); } @Bean @@ -86,13 +68,13 @@ public class RocketMQApplication { @Override public void run(String... args) throws Exception { // COMMIT_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg1", false); + senderService.sendTransactionalMsg("transactional-msg1", 1); // ROLLBACK_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg2", true); - // ROLLBACK_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg3", true); - // COMMIT_MESSAGE message - senderService.sendTransactionalMsg("transactional-msg4", false); + senderService.sendTransactionalMsg("transactional-msg2", 2); + // ROLLBACK_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg3", 3); + // COMMIT_MESSAGE message + senderService.sendTransactionalMsg("transactional-msg4", 4); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java similarity index 86% rename from spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java index e84ada23b..b6d49cf8b 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/SenderService.java @@ -4,8 +4,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.alibaba.cloud.examples.RocketMQApplication.MySource; +import org.springframework.cloud.alibaba.cloud.examples.RocketMQProduceApplication.MySource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -40,12 +41,11 @@ public class SenderService { source.output1().send(message); } - public void sendTransactionalMsg(T msg, boolean error) throws Exception { + public void sendTransactionalMsg(T msg, int num) throws Exception { MessageBuilder builder = MessageBuilder.withPayload(msg) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON); - if (error) { - builder.setHeader("test", "1"); - } + builder.setHeader("test", String.valueOf(num)); + builder.setHeader(RocketMQHeaders.TAGS, "binder"); Message message = builder.build(); source.output2().send(message); } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java new file mode 100644 index 000000000..0bd48410f --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/TransactionListenerImpl.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.alibaba.cloud.examples; + +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; +import org.springframework.messaging.Message; + +/** + * @author Jim + */ +@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10) +public class TransactionListenerImpl implements RocketMQLocalTransactionListener { + @Override + public RocketMQLocalTransactionState executeLocalTransaction(Message msg, + Object arg) { + Object num = msg.getHeaders().get("test"); + + if ("1".equals(num)) { + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " unknown"); + return RocketMQLocalTransactionState.UNKNOWN; + } + else if ("2".equals(num)) { + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " rollback"); + return RocketMQLocalTransactionState.ROLLBACK; + } + System.out.println( + "executer: " + new String((byte[]) msg.getPayload()) + " commit"); + return RocketMQLocalTransactionState.COMMIT; + } + + @Override + public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { + System.out.println("check: " + new String((byte[]) msg.getPayload())); + return RocketMQLocalTransactionState.COMMIT; + } +} \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties new file mode 100644 index 000000000..beca964a4 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/resources/application.properties @@ -0,0 +1,20 @@ +logging.level.org.springframework.cloud.stream.binder.rocketmq=DEBUG + +spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 + +spring.cloud.stream.bindings.output1.destination=test-topic +spring.cloud.stream.bindings.output1.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group +spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true + +spring.cloud.stream.bindings.output2.destination=TransactionTopic +spring.cloud.stream.bindings.output2.content-type=application/json +spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true +spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup + +spring.application.name=rocketmq-produce-example + +server.port=28081 + +management.endpoints.web.exposure.include=* +management.endpoint.health.show-details=always \ No newline at end of file diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java deleted file mode 100644 index 65d72662a..000000000 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionCheckListener.java +++ /dev/null @@ -1,18 +0,0 @@ -package org.springframework.cloud.alibaba.cloud.examples; - -import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.client.producer.TransactionCheckListener; -import org.apache.rocketmq.common.message.MessageExt; - -/** - * @author Jim - */ -public class MyTransactionCheckListener implements TransactionCheckListener { - - @Override - public LocalTransactionState checkLocalTransactionState(MessageExt msg) { - System.out.println("TransactionCheckListener: " + new String(msg.getBody())); - return LocalTransactionState.COMMIT_MESSAGE; - } - -} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java b/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java deleted file mode 100644 index 752d4e5f3..000000000 --- a/spring-cloud-alibaba-examples/rocketmq-example/src/main/java/org/springframework/cloud/alibaba/cloud/examples/MyTransactionExecuter.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.springframework.cloud.alibaba.cloud.examples; - -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; -import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.common.message.Message; - -/** - * @author Jim - */ -public class MyTransactionExecuter implements LocalTransactionExecuter { - @Override - public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { - if ("1".equals(msg.getUserProperty("test"))) { - System.out.println(new String(msg.getBody()) + " rollback"); - return LocalTransactionState.ROLLBACK_MESSAGE; - } - System.out.println(new String(msg.getBody()) + " commit"); - return LocalTransactionState.COMMIT_MESSAGE; - } -} diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml index fad854178..a4ad9a3ec 100644 --- a/spring-cloud-stream-binder-rocketmq/pom.xml +++ b/spring-cloud-stream-binder-rocketmq/pom.xml @@ -21,54 +21,41 @@ spring-cloud-stream - - org.apache.rocketmq - rocketmq-client - - - - io.dropwizard.metrics - metrics-core - 4.0.3 - provided - true - - org.springframework.boot spring-boot-configuration-processor - provided true org.springframework.boot spring-boot - provided true org.springframework.boot spring-boot-autoconfigure - provided true org.springframework.boot spring-boot-actuator - provided true org.springframework.boot spring-boot-actuator-autoconfigure - provided true + + org.apache.rocketmq + rocketmq-spring-boot-starter + + org.springframework.boot spring-boot-starter-test diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index 66517074b..c57ce9b65 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -21,43 +21,16 @@ package org.springframework.cloud.stream.binder.rocketmq; */ public interface RocketMQBinderConstants { - String ENDPOINT_ID = "rocketmq-binder"; - /** * Header key */ - String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE"; - - String ROCKET_FLAG = "ROCKETMQ_FLAG"; - - String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT"; - - String ROCKET_TRANSACTIONAL_ARG = "ROCKETMQ_TRANSACTIONAL_ARG"; - - String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT"; + String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG"; /** - * Instrumentation + * Default value */ - String LASTSEND_TIMESTAMP = "lastSend.timestamp"; - - interface Metrics { - interface Producer { - String PREFIX = "scs-rocketmq.producer."; - String TOTAL_SENT = "totalSent"; - String TOTAL_SENT_FAILURES = "totalSentFailures"; - String SENT_PER_SECOND = "sentPerSecond"; - String SENT_FAILURES_PER_SECOND = "sentFailuresPerSecond"; - } + String DEFAULT_NAME_SERVER = "127.0.0.1:9876"; - interface Consumer { - String GROUP_PREFIX = "scs-rocketmq.consumerGroup."; - String PREFIX = "scs-rocketmq.consumer."; - String TOTAL_CONSUMED = "totalConsumed"; - String CONSUMED_PER_SECOND = "consumedPerSecond"; - String TOTAL_CONSUMED_FAILURES = "totalConsumedFailures"; - String CONSUMED_FAILURES_PER_SECOND = "consumedFailuresPerSecond"; - } - } + String DEFAULT_GROUP = "rocketmq_binder_default_group_name"; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java new file mode 100644 index 000000000..b50e799d2 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq; + +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQBinderUtils { + + public static RocketMQBinderConfigurationProperties mergeProperties( + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + RocketMQProperties rocketMQProperties) { + RocketMQBinderConfigurationProperties result = new RocketMQBinderConfigurationProperties(); + if (StringUtils.isEmpty(rocketMQProperties.getNameServer())) { + result.setNameServer(rocketBinderConfigurationProperties.getNameServer()); + } + else { + result.setNameServer(rocketMQProperties.getNameServer()); + } + if (rocketMQProperties.getProducer() == null + || StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) { + result.setAccessKey(rocketBinderConfigurationProperties.getAccessKey()); + } + else { + result.setAccessKey(rocketMQProperties.getProducer().getAccessKey()); + } + if (rocketMQProperties.getProducer() == null + || StringUtils.isEmpty(rocketMQProperties.getProducer().getSecretKey())) { + result.setSecretKey(rocketBinderConfigurationProperties.getSecretKey()); + } + else { + result.setSecretKey(rocketMQProperties.getProducer().getSecretKey()); + } + if (rocketMQProperties.getProducer() == null || StringUtils + .isEmpty(rocketMQProperties.getProducer().getCustomizedTraceTopic())) { + result.setCustomizedTraceTopic( + rocketBinderConfigurationProperties.getCustomizedTraceTopic()); + } + else { + result.setCustomizedTraceTopic( + rocketMQProperties.getProducer().getCustomizedTraceTopic()); + } + if (rocketMQProperties.getProducer() != null + && rocketMQProperties.getProducer().isEnableMsgTrace()) { + result.setEnableMsgTrace(Boolean.TRUE); + } + else { + result.setEnableMsgTrace( + rocketBinderConfigurationProperties.isEnableMsgTrace()); + } + return result; + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index b85df2a0f..152ccd319 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,17 +16,22 @@ package org.springframework.cloud.stream.binder.rocketmq; -import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; -import org.apache.rocketmq.client.producer.TransactionCheckListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; -import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; +import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; @@ -40,10 +45,11 @@ import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; /** - * @author Timur Valiev * @author Jim */ public class RocketMQMessageChannelBinder extends @@ -51,22 +57,23 @@ public class RocketMQMessageChannelBinder extends implements ExtendedPropertiesBinder { - private static final Logger logger = LoggerFactory - .getLogger(RocketMQMessageChannelBinder.class); + private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private final RocketMQProperties rocketMQProperties; private final InstrumentationManager instrumentationManager; - private final ConsumersManager consumersManager; - private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); + private Map topicInUse = new HashMap<>(); - public RocketMQMessageChannelBinder(ConsumersManager consumersManager, - RocketMQTopicProvisioner provisioningProvider, + public RocketMQMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider, + RocketMQExtendedBindingProperties extendedBindingProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + RocketMQProperties rocketMQProperties, InstrumentationManager instrumentationManager) { super(null, provisioningProvider); - this.consumersManager = consumersManager; + this.extendedBindingProperties = extendedBindingProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.rocketMQProperties = rocketMQProperties; this.instrumentationManager = instrumentationManager; } @@ -75,23 +82,73 @@ public class RocketMQMessageChannelBinder extends ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( - destination.getName(), producerProperties, - rocketBinderConfigurationProperties, instrumentationManager); + + RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils + .mergeProperties(rocketBinderConfigurationProperties, + rocketMQProperties); + + RocketMQTemplate rocketMQTemplate; if (producerProperties.getExtension().getTransactional()) { - // transaction message check LocalTransactionExecuter - messageHandler.setLocalTransactionExecuter( - getClassConfiguration(destination.getName(), - producerProperties.getExtension().getExecuter(), - LocalTransactionExecuter.class)); - // transaction message check TransactionCheckListener - messageHandler.setTransactionCheckListener( - getClassConfiguration(destination.getName(), - producerProperties.getExtension() - .getTransactionCheckListener(), - TransactionCheckListener.class)); + Map rocketMQTemplates = getBeanFactory() + .getBeansOfType(RocketMQTemplate.class); + if (rocketMQTemplates.size() == 0) { + throw new IllegalStateException( + "there is no RocketMQTemplate in Spring BeanFactory"); + } + else if (rocketMQTemplates.size() > 1) { + throw new IllegalStateException( + "there is more than 1 RocketMQTemplates in Spring BeanFactory"); + } + rocketMQTemplate = rocketMQTemplates.values().iterator().next(); + } + else { + rocketMQTemplate = new RocketMQTemplate(); + rocketMQTemplate.setObjectMapper(this.getApplicationContext() + .getBeansOfType(ObjectMapper.class).values().iterator().next()); + DefaultMQProducer producer; + String ak = mergedProperties.getAccessKey(); + String sk = mergedProperties.getSecretKey(); + if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { + RPCHook rpcHook = new AclClientRPCHook( + new SessionCredentials(ak, sk)); + producer = new DefaultMQProducer( + producerProperties.getExtension().getGroup(), rpcHook, + mergedProperties.isEnableMsgTrace(), + mergedProperties.getCustomizedTraceTopic()); + producer.setVipChannelEnabled(false); + producer.setInstanceName( + RocketMQUtil.getInstanceName(rpcHook, destination.getName())); + } + else { + producer = new DefaultMQProducer( + producerProperties.getExtension().getGroup()); + producer.setVipChannelEnabled( + producerProperties.getExtension().getVipChannelEnabled()); + } + producer.setNamesrvAddr(mergedProperties.getNameServer()); + producer.setSendMsgTimeout( + producerProperties.getExtension().getSendMessageTimeout()); + producer.setRetryTimesWhenSendFailed( + producerProperties.getExtension().getRetryTimesWhenSendFailed()); + producer.setRetryTimesWhenSendAsyncFailed(producerProperties + .getExtension().getRetryTimesWhenSendAsyncFailed()); + producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension() + .getCompressMessageBodyThreshold()); + producer.setRetryAnotherBrokerWhenNotStoreOK( + producerProperties.getExtension().isRetryNextServer()); + producer.setMaxMessageSize( + producerProperties.getExtension().getMaxMessageSize()); + rocketMQTemplate.setProducer(producer); } + RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( + rocketMQTemplate, destination.getName(), + producerProperties.getExtension().getGroup(), + producerProperties.getExtension().getTransactional(), + instrumentationManager); + messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); + messageHandler.setSync(producerProperties.getExtension().getSync()); + if (errorChannel != null) { messageHandler.setSendFailureChannel(errorChannel); } @@ -113,9 +170,22 @@ public class RocketMQMessageChannelBinder extends "'group must be configured for channel " + destination.getName()); } + RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer( + consumerProperties, rocketBinderConfigurationProperties, this); + listenerContainer.setConsumerGroup(group); + listenerContainer.setTopic(destination.getName()); + listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency()); + listenerContainer.setSuspendCurrentQueueTimeMillis( + consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis()); + listenerContainer.setDelayLevelWhenNextConsume( + consumerProperties.getExtension().getDelayLevelWhenNextConsume()); + listenerContainer + .setNameServer(rocketBinderConfigurationProperties.getNameServer()); + RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( - consumersManager, consumerProperties, destination.getName(), group, - instrumentationManager); + listenerContainer, consumerProperties, instrumentationManager); + + topicInUse.put(destination.getName(), group); ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group, consumerProperties); @@ -143,6 +213,10 @@ public class RocketMQMessageChannelBinder extends return extendedBindingProperties.getExtendedProducerProperties(channelName); } + public Map getTopicInUse() { + return topicInUse; + } + @Override public String getDefaultsPrefix() { return extendedBindingProperties.getDefaultsPrefix(); @@ -153,47 +227,6 @@ public class RocketMQMessageChannelBinder extends return extendedBindingProperties.getExtendedPropertiesEntryClass(); } - private T getClassConfiguration(String destName, String className, - Class interfaceClass) { - if (StringUtils.isEmpty(className)) { - throw new RuntimeException("Binding for channel " + destName - + " using transactional message, should set " - + interfaceClass.getSimpleName() + " configuration" - + interfaceClass.getSimpleName() + " should be set, like " - + "'spring.cloud.stream.rocketmq.bindings.output.producer.xxx=TheFullClassNameOfYour" - + interfaceClass.getSimpleName() + "'"); - } - else if (StringUtils.isNotEmpty(className)) { - Class fieldClass; - // check class exists - try { - fieldClass = ClassUtils.forName(className, - RocketMQMessageChannelBinder.class.getClassLoader()); - } - catch (ClassNotFoundException e) { - throw new RuntimeException("Binding for channel " + destName - + " using transactional message, but " + className - + " class is not found"); - } - // check interface incompatible - if (!interfaceClass.isAssignableFrom(fieldClass)) { - throw new RuntimeException("Binding for channel " + destName - + " using transactional message, but " + className - + " is incompatible with " + interfaceClass.getSimpleName() - + " interface"); - } - try { - return (T) fieldClass.newInstance(); - } - catch (Exception e) { - throw new RuntimeException("Binding for channel " + destName - + " using transactional message, but " + className - + " instance error", e); - } - } - return null; - } - public void setExtendedBindingProperties( RocketMQExtendedBindingProperties extendedBindingProperties) { this.extendedBindingProperties = extendedBindingProperties; diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java deleted file mode 100644 index 7707c07ed..000000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq; - -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT; -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; -import org.springframework.integration.support.MutableMessage; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.MessageHeaderAccessor; - -/** - * @author Timur Valiev - * @author Jim - */ -public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor { - - public RocketMQMessageHeaderAccessor() { - super(); - } - - public RocketMQMessageHeaderAccessor(Message message) { - super(message); - } - - public Acknowledgement getAcknowledgement(Message message) { - return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class); - } - - public RocketMQMessageHeaderAccessor withAcknowledgment( - Acknowledgement acknowledgment) { - setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment); - return this; - } - - public String getTags() { - return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, ""); - } - - public RocketMQMessageHeaderAccessor withTags(String tag) { - setHeader(MessageConst.PROPERTY_TAGS, tag); - return this; - } - - public String getKeys() { - return (String) getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, ""); - } - - public RocketMQMessageHeaderAccessor withKeys(String keys) { - setHeader(MessageConst.PROPERTY_KEYS, keys); - return this; - } - - public MessageExt getRocketMessage() { - return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class); - } - - public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) { - setHeader(ORIGINAL_ROCKET_MESSAGE, message); - return this; - } - - public Integer getDelayTimeLevel() { - return (Integer) getMessageHeaders() - .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0); - } - - public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) { - setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel); - return this; - } - - public Integer getFlag() { - return (Integer) getMessageHeaders().getOrDefault(ROCKET_FLAG, 0); - } - - public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) { - setHeader(ROCKET_FLAG, delayTimeLevel); - return this; - } - - public Object getTransactionalArg() { - return getMessageHeaders().get(ROCKET_TRANSACTIONAL_ARG); - } - - public Object withTransactionalArg(Object arg) { - setHeader(ROCKET_TRANSACTIONAL_ARG, arg); - return this; - } - - public SendResult getSendResult() { - return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class); - } - - public static void putSendResult(MutableMessage message, SendResult sendResult) { - message.getHeaders().put(ROCKET_SEND_RESULT, sendResult); - } - - public Map getUserProperties() { - Map result = new HashMap<>(); - for (Map.Entry entry : this.toMap().entrySet()) { - if (entry.getValue() instanceof String - && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) - && !entry.getKey().equals(MessageHeaders.CONTENT_TYPE)) { - result.put(entry.getKey(), (String) entry.getValue()); - } - } - return result; - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java deleted file mode 100644 index 1a7dc3b15..000000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.actuator; - -import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID; - -import java.util.HashMap; -import java.util.Map; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.actuate.endpoint.annotation.Endpoint; -import org.springframework.boot.actuate.endpoint.annotation.ReadOperation; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; - -/** - * @author Timur Valiev - * @author Jim - */ -@Endpoint(id = ENDPOINT_ID) -public class RocketMQBinderEndpoint { - - @Autowired(required = false) - private InstrumentationManager instrumentationManager; - - @ReadOperation - public Map invoke() { - Map result = new HashMap<>(); - if (instrumentationManager != null) { - result.put("metrics", - instrumentationManager.getMetricRegistry().getMetrics()); - result.put("runtime", instrumentationManager.getRuntime()); - } - else { - result.put("warning", - "please add metrics-core dependency, we use it for metrics"); - } - return result; - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java index 12ed17eb9..b7e310369 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java @@ -28,33 +28,25 @@ import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationM */ public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator { - @Autowired(required = false) + @Autowired private InstrumentationManager instrumentationManager; @Override protected void doHealthCheck(Health.Builder builder) throws Exception { - if (instrumentationManager != null) { - if (instrumentationManager.getHealthInstrumentations().stream() - .allMatch(Instrumentation::isUp)) { - builder.up(); - return; - } - if (instrumentationManager.getHealthInstrumentations().stream() - .allMatch(Instrumentation::isOutOfService)) { - builder.outOfService(); - return; - } - builder.down(); - instrumentationManager.getHealthInstrumentations().stream() - .filter(instrumentation -> !instrumentation.isStarted()) - .forEach(instrumentation1 -> builder - .withException(instrumentation1.getStartException())); + if (instrumentationManager.getHealthInstrumentations().stream() + .allMatch(Instrumentation::isUp)) { + builder.up(); + return; } - else { - builder.down(); - builder.withDetail("warning", - "please add metrics-core dependency, we use it for metrics"); + if (instrumentationManager.getHealthInstrumentations().stream() + .allMatch(Instrumentation::isOutOfService)) { + builder.outOfService(); + return; } - + builder.down(); + instrumentationManager.getHealthInstrumentations().stream() + .filter(instrumentation -> !instrumentation.isStarted()) + .forEach(instrumentation1 -> builder + .withException(instrumentation1.getStartException())); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java index ede8b259e..aa7ca366e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java @@ -16,23 +16,26 @@ package org.springframework.cloud.stream.binder.rocketmq.config; -import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; -import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; /** * @author Timur Valiev * @author Jim */ @Configuration +@Import({ RocketMQAutoConfiguration.class, + RocketMQBinderHealthIndicatorAutoConfiguration.class }) @EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class }) public class RocketMQBinderAutoConfiguration { @@ -42,7 +45,7 @@ public class RocketMQBinderAutoConfiguration { private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; @Autowired(required = false) - private InstrumentationManager instrumentationManager; + private RocketMQProperties rocketMQProperties = new RocketMQProperties(); @Autowired public RocketMQBinderAutoConfiguration( @@ -50,8 +53,6 @@ public class RocketMQBinderAutoConfiguration { RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { this.extendedBindingProperties = extendedBindingProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, - this.rocketBinderConfigurationProperties.getLogLevel()); } @Bean @@ -62,18 +63,18 @@ public class RocketMQBinderAutoConfiguration { @Bean public RocketMQMessageChannelBinder rocketMessageChannelBinder( RocketMQTopicProvisioner provisioningProvider, - ConsumersManager consumersManager) { + InstrumentationManager instrumentationManager) { RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder( - consumersManager, provisioningProvider, - rocketBinderConfigurationProperties, instrumentationManager); + provisioningProvider, extendedBindingProperties, + rocketBinderConfigurationProperties, rocketMQProperties, + instrumentationManager); binder.setExtendedBindingProperties(extendedBindingProperties); return binder; } @Bean - public ConsumersManager consumersManager() { - return new ConsumersManager(instrumentationManager, - rocketBinderConfigurationProperties); + public InstrumentationManager instrumentationManager() { + return new InstrumentationManager(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java similarity index 64% rename from spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java rename to spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java index f1fb5f1bb..bac4927e6 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java @@ -16,13 +16,9 @@ package org.springframework.cloud.stream.binder.rocketmq.config; -import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration; import org.springframework.boot.actuate.endpoint.annotation.Endpoint; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint; import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -30,24 +26,12 @@ import org.springframework.context.annotation.Configuration; * @author Jim */ @Configuration -@AutoConfigureAfter(EndpointAutoConfiguration.class) @ConditionalOnClass(Endpoint.class) -public class RocketMQBinderEndpointAutoConfiguration { - - @Bean - public RocketMQBinderEndpoint rocketBinderEndpoint() { - return new RocketMQBinderEndpoint(); - } +public class RocketMQBinderHealthIndicatorAutoConfiguration { @Bean public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() { return new RocketMQBinderHealthIndicator(); } - @Bean - @ConditionalOnClass(name = "com.codahale.metrics.Counter") - public InstrumentationManager instrumentationManager() { - return new InstrumentationManager(); - } - } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java new file mode 100644 index 000000000..e481ac87f --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.config; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; +import org.apache.rocketmq.spring.config.RocketMQConfigUtils; +import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor; +import org.apache.rocketmq.spring.config.TransactionHandlerRegistry; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; +import org.springframework.util.StringUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @author Jim + */ +@Configuration +@AutoConfigureAfter(RocketMQAutoConfiguration.class) +@ConditionalOnMissingBean(DefaultMQProducer.class) +public class RocketMQComponent4BinderAutoConfiguration { + + private final Environment environment; + + public RocketMQComponent4BinderAutoConfiguration(Environment environment) { + this.environment = environment; + } + + @Bean + @ConditionalOnMissingBean(DefaultMQProducer.class) + public DefaultMQProducer defaultMQProducer() { + DefaultMQProducer producer; + String configNameServer = environment.resolveRequiredPlaceholders( + "${spring.cloud.stream.rocketmq.binder.name-server:${rocketmq.producer.name-server:}}"); + String ak = environment.resolveRequiredPlaceholders( + "${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}"); + String sk = environment.resolveRequiredPlaceholders( + "${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}"); + if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { + producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP, + new AclClientRPCHook(new SessionCredentials(ak, sk))); + producer.setVipChannelEnabled(false); + } + else { + producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP); + } + producer.setNamesrvAddr(configNameServer); + return producer; + } + + @Bean(destroyMethod = "destroy") + @ConditionalOnMissingBean + public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, + ObjectMapper objectMapper) { + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); + rocketMQTemplate.setProducer(mqProducer); + rocketMQTemplate.setObjectMapper(objectMapper); + return rocketMQTemplate; + } + + @Bean + @ConditionalOnBean(RocketMQTemplate.class) + @ConditionalOnMissingBean(TransactionHandlerRegistry.class) + public TransactionHandlerRegistry transactionHandlerRegistry( + RocketMQTemplate template) { + return new TransactionHandlerRegistry(template); + } + + @Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME) + @ConditionalOnBean(TransactionHandlerRegistry.class) + public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor( + TransactionHandlerRegistry transactionHandlerRegistry) { + return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry); + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java deleted file mode 100644 index 4b5d9e3fd..000000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.consuming; - -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; - -/** - * @author Timur Valiev - * @author Jim - */ -public class Acknowledgement { - - /** - * for {@link ConsumeConcurrentlyContext} using - */ - private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - /** - * Message consume retry strategy
- * -1,no retry,put into DLQ directly
- * 0,broker control retry frequency
- * >0,client control retry frequency - */ - private Integer consumeConcurrentlyDelayLevel = 0; - - /** - * for {@link ConsumeOrderlyContext} using - */ - private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS; - private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L; - - public Acknowledgement setConsumeConcurrentlyStatus( - ConsumeConcurrentlyStatus consumeConcurrentlyStatus) { - this.consumeConcurrentlyStatus = consumeConcurrentlyStatus; - return this; - } - - public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() { - return consumeConcurrentlyStatus; - } - - public ConsumeOrderlyStatus getConsumeOrderlyStatus() { - return consumeOrderlyStatus; - } - - public Acknowledgement setConsumeOrderlyStatus( - ConsumeOrderlyStatus consumeOrderlyStatus) { - this.consumeOrderlyStatus = consumeOrderlyStatus; - return this; - } - - public Integer getConsumeConcurrentlyDelayLevel() { - return consumeConcurrentlyDelayLevel; - } - - public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) { - this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel; - } - - public Long getConsumeOrderlySuspendCurrentQueueTimeMill() { - return consumeOrderlySuspendCurrentQueueTimeMill; - } - - public void setConsumeOrderlySuspendCurrentQueueTimeMill( - Long consumeOrderlySuspendCurrentQueueTimeMill) { - this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill; - } - - public static Acknowledgement buildOrderlyInstance() { - Acknowledgement acknowledgement = new Acknowledgement(); - acknowledgement.setConsumeOrderlyStatus(ConsumeOrderlyStatus.SUCCESS); - return acknowledgement; - } - - public static Acknowledgement buildConcurrentlyInstance() { - Acknowledgement acknowledgement = new Acknowledgement(); - acknowledgement - .setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS); - return acknowledgement; - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java deleted file mode 100644 index 12fde2ba2..000000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.consuming; - -import java.util.AbstractMap; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; -import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation; -import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; -import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; - -/** - * @author Timur Valiev - * @author Jim - */ -public class ConsumersManager { - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private final Map consumerGroups = new HashMap<>(); - private final Map started = new HashMap<>(); - private final Map, ExtendedConsumerProperties> propertiesMap = new HashMap<>(); - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - - private InstrumentationManager instrumentationManager; - - public ConsumersManager(InstrumentationManager instrumentationManager, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) { - this.instrumentationManager = instrumentationManager; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; - } - - public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, - String topic, - ExtendedConsumerProperties consumerProperties) { - propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), - consumerProperties); - - Optional.ofNullable(instrumentationManager).ifPresent(manager -> { - ConsumerGroupInstrumentation instrumentation = manager - .getConsumerGroupInstrumentation(group); - instrumentationManager.addHealthInstrumentation(instrumentation); - }); - - if (consumerGroups.containsKey(group)) { - return consumerGroups.get(group); - } - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); - consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - consumerGroups.put(group, consumer); - started.put(group, false); - consumer.setConsumeThreadMax(consumerProperties.getConcurrency()); - consumer.setConsumeThreadMin(consumerProperties.getConcurrency()); - if (consumerProperties.getExtension().getBroadcasting()) { - consumer.setMessageModel(MessageModel.BROADCASTING); - } - logger.info("RocketMQ consuming for SCS group {} created", group); - return consumer; - } - - public synchronized void startConsumers() throws MQClientException { - for (String group : getConsumerGroups()) { - start(group); - } - } - - public synchronized void startConsumer(String group) throws MQClientException { - start(group); - } - - public synchronized void stopConsumer(String group) { - stop(group); - } - - private void stop(String group) { - if (consumerGroups.get(group) != null) { - consumerGroups.get(group).shutdown(); - started.put(group, false); - } - } - - private synchronized void start(String group) throws MQClientException { - if (started.get(group)) { - return; - } - - ConsumerGroupInstrumentation groupInstrumentation = null; - if (Optional.ofNullable(instrumentationManager).isPresent()) { - groupInstrumentation = instrumentationManager - .getConsumerGroupInstrumentation(group); - instrumentationManager.addHealthInstrumentation(groupInstrumentation); - } - - try { - consumerGroups.get(group).start(); - started.put(group, true); - Optional.ofNullable(groupInstrumentation) - .ifPresent(g -> g.markStartedSuccessfully()); - } - catch (MQClientException e) { - Optional.ofNullable(groupInstrumentation) - .ifPresent(g -> g.markStartFailed(e)); - logger.error("RocketMQ Consumer hasn't been started. Caused by " - + e.getErrorMessage(), e); - throw e; - } - } - - public synchronized Set getConsumerGroups() { - return consumerGroups.keySet(); - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java new file mode 100644 index 000000000..35f29ae83 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -0,0 +1,419 @@ +/* + * Copyright (C) 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rocketmq.consuming; + +import java.util.List; +import java.util.Objects; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.spring.annotation.ConsumeMode; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; +import org.apache.rocketmq.spring.support.RocketMQListenerContainer; +import org.apache.rocketmq.spring.support.RocketMQUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.springframework.context.SmartLifecycle; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author Jim + */ +public class RocketMQListenerBindingContainer + implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { + + private final static Logger log = LoggerFactory + .getLogger(RocketMQListenerBindingContainer.class); + + private long suspendCurrentQueueTimeMillis = 1000; + + /** + * Message consume retry strategy
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency. + */ + private int delayLevelWhenNextConsume = 0; + + private String nameServer; + + private String consumerGroup; + + private String topic; + + private int consumeThreadMax = 64; + + private String charset = "UTF-8"; + + private RocketMQListener rocketMQListener; + + private DefaultMQPushConsumer consumer; + + private boolean running; + + private final ExtendedConsumerProperties rocketMQConsumerProperties; + + private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder; + private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + + // The following properties came from RocketMQConsumerProperties. + private ConsumeMode consumeMode; + private SelectorType selectorType; + private String selectorExpression; + private MessageModel messageModel; + + public RocketMQListenerBindingContainer( + ExtendedConsumerProperties rocketMQConsumerProperties, + RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + RocketMQMessageChannelBinder rocketMQMessageChannelBinder) { + this.rocketMQConsumerProperties = rocketMQConsumerProperties; + this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder; + this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly() + ? ConsumeMode.ORDERLY + : ConsumeMode.CONCURRENTLY; + if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) { + this.selectorType = SelectorType.TAG; + this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags(); + } + else { + this.selectorType = SelectorType.SQL92; + this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql(); + } + this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting() + ? MessageModel.BROADCASTING + : MessageModel.CLUSTERING; + } + + @Override + public void setupMessageListener(RocketMQListener rocketMQListener) { + this.rocketMQListener = rocketMQListener; + } + + @Override + public void destroy() throws Exception { + this.setRunning(false); + if (Objects.nonNull(consumer)) { + consumer.shutdown(); + } + log.info("container destroyed, {}", this.toString()); + } + + @Override + public void afterPropertiesSet() throws Exception { + initRocketMQPushConsumer(); + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); + } + + @Override + public void start() { + if (this.isRunning()) { + throw new IllegalStateException( + "container already running. " + this.toString()); + } + + try { + consumer.start(); + } + catch (MQClientException e) { + throw new IllegalStateException("Failed to start RocketMQ push consumer", e); + } + this.setRunning(true); + + log.info("running container: {}", this.toString()); + } + + @Override + public void stop() { + if (this.isRunning()) { + if (Objects.nonNull(consumer)) { + consumer.shutdown(); + } + setRunning(false); + } + } + + @Override + public boolean isRunning() { + return running; + } + + private void setRunning(boolean running) { + this.running = running; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE; + } + + private void initRocketMQPushConsumer() throws MQClientException { + Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); + Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); + Assert.notNull(nameServer, "Property 'nameServer' is required"); + Assert.notNull(topic, "Property 'topic' is required"); + + String ak = rocketBinderConfigurationProperties.getAccessKey(); + String sk = rocketBinderConfigurationProperties.getSecretKey(); + if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { + RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk)); + consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, + new AllocateMessageQueueAveragely(), + rocketBinderConfigurationProperties.isEnableMsgTrace(), + rocketBinderConfigurationProperties.getCustomizedTraceTopic()); + consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, topic)); + consumer.setVipChannelEnabled(false); + } + else { + consumer = new DefaultMQPushConsumer(consumerGroup, + rocketBinderConfigurationProperties.isEnableMsgTrace(), + rocketBinderConfigurationProperties.getCustomizedTraceTopic()); + } + + consumer.setNamesrvAddr(nameServer); + consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); + consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); + + switch (messageModel) { + case BROADCASTING: + consumer.setMessageModel( + org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); + break; + case CLUSTERING: + consumer.setMessageModel( + org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); + break; + default: + throw new IllegalArgumentException("Property 'messageModel' was wrong."); + } + + switch (selectorType) { + case TAG: + consumer.subscribe(topic, selectorExpression); + break; + case SQL92: + consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); + break; + default: + throw new IllegalArgumentException("Property 'selectorType' was wrong."); + } + + switch (consumeMode) { + case ORDERLY: + consumer.setMessageListener(new DefaultMessageListenerOrderly()); + break; + case CONCURRENTLY: + consumer.setMessageListener(new DefaultMessageListenerConcurrently()); + break; + default: + throw new IllegalArgumentException("Property 'consumeMode' was wrong."); + } + + if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { + ((RocketMQPushConsumerLifecycleListener) rocketMQListener) + .prepareStart(consumer); + } + + } + + @Override + public String toString() { + return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup + + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" + + messageModel + '}'; + } + + public long getSuspendCurrentQueueTimeMillis() { + return suspendCurrentQueueTimeMillis; + } + + public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + } + + public int getDelayLevelWhenNextConsume() { + return delayLevelWhenNextConsume; + } + + public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { + this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + } + + public String getNameServer() { + return nameServer; + } + + public void setNameServer(String nameServer) { + this.nameServer = nameServer; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public int getConsumeThreadMax() { + return consumeThreadMax; + } + + public void setConsumeThreadMax(int consumeThreadMax) { + this.consumeThreadMax = consumeThreadMax; + } + + public String getCharset() { + return charset; + } + + public void setCharset(String charset) { + this.charset = charset; + } + + public RocketMQListener getRocketMQListener() { + return rocketMQListener; + } + + public void setRocketMQListener(RocketMQListener rocketMQListener) { + this.rocketMQListener = rocketMQListener; + } + + public DefaultMQPushConsumer getConsumer() { + return consumer; + } + + public void setConsumer(DefaultMQPushConsumer consumer) { + this.consumer = consumer; + } + + public ExtendedConsumerProperties getRocketMQConsumerProperties() { + return rocketMQConsumerProperties; + } + + public ConsumeMode getConsumeMode() { + return consumeMode; + } + + public SelectorType getSelectorType() { + return selectorType; + } + + public String getSelectorExpression() { + return selectorExpression; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public class DefaultMessageListenerConcurrently + implements MessageListenerConcurrently { + + @SuppressWarnings("unchecked") + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + rocketMQListener + .onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); + long costTime = System.currentTimeMillis() - now; + log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } + catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + } + + public class DefaultMessageListenerOrderly implements MessageListenerOrderly { + + @SuppressWarnings("unchecked") + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { + for (MessageExt messageExt : msgs) { + log.debug("received msg: {}", messageExt); + try { + long now = System.currentTimeMillis(); + rocketMQListener + .onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); + long costTime = System.currentTimeMillis() - now; + log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + } + catch (Exception e) { + log.warn("consume message failed. messageExt:{}", messageExt, e); + context.setSuspendCurrentQueueTimeMillis( + suspendCurrentQueueTimeMillis); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + } + + return ConsumeOrderlyStatus.SUCCESS; + } + } + +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java deleted file mode 100644 index d8d7fc5dd..000000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.exception; - -import org.springframework.messaging.Message; -import org.springframework.messaging.MessagingException; - -/** - * An exception that is the payload of an {@code ErrorMessage} when occurs send failure. - * - * @author Jim - * @since 0.2.2 - */ -public class RocketMQSendFailureException extends MessagingException { - - private final org.apache.rocketmq.common.message.Message rocketmqMsg; - - public RocketMQSendFailureException(Message message, - org.apache.rocketmq.common.message.Message rocketmqMsg, Throwable cause) { - super(message, cause); - this.rocketmqMsg = rocketmqMsg; - } - - public org.apache.rocketmq.common.message.Message getRocketmqMsg() { - return rocketmqMsg; - } - - @Override - public String toString() { - return super.toString() + " [rocketmqMsg=" + this.rocketmqMsg + "]"; - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index dd4c67b1c..33b3fef17 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -16,82 +16,49 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; -import org.apache.commons.lang3.ClassUtils; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.MessageSelector; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListener; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.core.RocketMQListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; -import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement; -import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager; -import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation; +import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; +import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.MessagingException; import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryListener; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; /** * @author Jim */ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { - private static final Logger logger = LoggerFactory + private static final Logger log = LoggerFactory .getLogger(RocketMQInboundChannelAdapter.class); - private ConsumerInstrumentation consumerInstrumentation; - - private InstrumentationManager instrumentationManager; - private RetryTemplate retryTemplate; private RecoveryCallback recoveryCallback; - private DefaultMQPushConsumer consumer; - - private CloudStreamMessageListener listener; + private RocketMQListenerBindingContainer rocketMQListenerContainer; private final ExtendedConsumerProperties consumerProperties; - private final String destination; + private final InstrumentationManager instrumentationManager; - private final String group; - - private final ConsumersManager consumersManager; - - public RocketMQInboundChannelAdapter(ConsumersManager consumersManager, + public RocketMQInboundChannelAdapter( + RocketMQListenerBindingContainer rocketMQListenerContainer, ExtendedConsumerProperties consumerProperties, - String destination, String group, InstrumentationManager instrumentationManager) { - this.consumersManager = consumersManager; + this.rocketMQListenerContainer = rocketMQListenerContainer; this.consumerProperties = consumerProperties; - this.destination = destination; - this.group = group; this.instrumentationManager = instrumentationManager; } @@ -108,16 +75,27 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to " + "send an error message when retries are exhausted"); } - this.consumer = consumersManager.getOrCreateConsumer(group, destination, - consumerProperties); - Boolean isOrderly = consumerProperties.getExtension().getOrderly(); - this.listener = isOrderly ? new CloudStreamMessageListenerOrderly() - : new CloudStreamMessageListenerConcurrently(); + BindingRocketMQListener listener = new BindingRocketMQListener(); + rocketMQListenerContainer.setRocketMQListener(listener); if (retryTemplate != null) { - this.retryTemplate.registerListener(this.listener); + this.retryTemplate.registerListener(listener); } + + try { + rocketMQListenerContainer.afterPropertiesSet(); + + } + catch (Exception e) { + log.error("rocketMQListenerContainer init error: " + e.getMessage(), e); + throw new IllegalArgumentException( + "rocketMQListenerContainer init error: " + e.getMessage(), e); + } + + instrumentationManager.addHealthInstrumentation( + new Instrumentation(rocketMQListenerContainer.getTopic() + + rocketMQListenerContainer.getConsumerGroup())); } @Override @@ -126,53 +104,28 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { || !consumerProperties.getExtension().getEnabled()) { return; } - - String tags = consumerProperties.getExtension().getTags(); - - Set tagsSet = tags == null ? new HashSet<>() - : Arrays.stream(tags.split("\\|\\|")).map(String::trim) - .collect(Collectors.toSet()); - - Optional.ofNullable(instrumentationManager).ifPresent(manager -> { - consumerInstrumentation = manager.getConsumerInstrumentation(destination); - manager.addHealthInstrumentation(consumerInstrumentation); - }); - - try { - if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) { - this.consumer.subscribe(destination, MessageSelector - .bySql(consumerProperties.getExtension().getSql())); - } - else { - this.consumer.subscribe(destination, String.join(" || ", tagsSet)); - } - Optional.ofNullable(consumerInstrumentation) - .ifPresent(c -> c.markStartedSuccessfully()); - } - catch (MQClientException e) { - Optional.ofNullable(consumerInstrumentation) - .ifPresent(c -> c.markStartFailed(e)); - logger.error("RocketMQ Consumer hasn't been subscribed. Caused by " - + e.getErrorMessage(), e); - throw new RuntimeException("RocketMQ Consumer hasn't been subscribed.", e); - } - - this.consumer.registerMessageListener(this.listener); - try { - consumersManager.startConsumer(group); - } - catch (MQClientException e) { - logger.error( - "RocketMQ Consumer startup failed. Caused by " + e.getErrorMessage(), - e); - throw new RuntimeException("RocketMQ Consumer startup failed.", e); + rocketMQListenerContainer.start(); + instrumentationManager + .getHealthInstrumentation(rocketMQListenerContainer.getTopic() + + rocketMQListenerContainer.getConsumerGroup()) + .markStartedSuccessfully(); + } + catch (Exception e) { + instrumentationManager + .getHealthInstrumentation(rocketMQListenerContainer.getTopic() + + rocketMQListenerContainer.getConsumerGroup()) + .markStartFailed(e); + log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); + throw new MessagingException(MessageBuilder.withPayload( + "RocketMQTemplate startup failed, Caused by " + e.getMessage()) + .build(), e); } } @Override protected void doStop() { - consumersManager.stopConsumer(group); + rocketMQListenerContainer.stop(); } public void setRetryTemplate(RetryTemplate retryTemplate) { @@ -183,84 +136,21 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { this.recoveryCallback = recoveryCallback; } - protected class CloudStreamMessageListener implements MessageListener, RetryListener { + protected class BindingRocketMQListener + implements RocketMQListener, RetryListener { - Acknowledgement consumeMessage(final List msgs) { + @Override + public void onMessage(Message message) { boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null; - try { - if (enableRetry) { - return RocketMQInboundChannelAdapter.this.retryTemplate.execute( - (RetryCallback) context -> doSendMsgs( - msgs, context), - new RecoveryCallback() { - @Override - public Acknowledgement recover(RetryContext context) - throws Exception { - RocketMQInboundChannelAdapter.this.recoveryCallback - .recover(context); - if (ClassUtils.isAssignable(this.getClass(), - MessageListenerConcurrently.class)) { - return Acknowledgement - .buildConcurrentlyInstance(); - } - else { - return Acknowledgement.buildOrderlyInstance(); - } - } - }); - } - else { - Acknowledgement result = doSendMsgs(msgs, null); - Optional.ofNullable( - RocketMQInboundChannelAdapter.this.instrumentationManager) - .ifPresent(manager -> { - manager.getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumed(); - }); - return result; - } + if (enableRetry) { + RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> { + RocketMQInboundChannelAdapter.this.sendMessage(message); + return null; + }, (RecoveryCallback) RocketMQInboundChannelAdapter.this.recoveryCallback); } - catch (Exception e) { - logger.error( - "RocketMQ Message hasn't been processed successfully. Caused by ", - e); - Optional.ofNullable( - RocketMQInboundChannelAdapter.this.instrumentationManager) - .ifPresent(manager -> { - manager.getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumedFailure(); - }); + else { + RocketMQInboundChannelAdapter.this.sendMessage(message); } - return null; - } - - private Acknowledgement doSendMsgs(final List msgs, - RetryContext context) { - List acknowledgements = new ArrayList<>(); - msgs.forEach(msg -> { - String retryInfo = context == null ? "" - : "retryCount-" + String.valueOf(context.getRetryCount()) + "|"; - logger.debug(retryInfo + "consuming msg:\n" + msg); - logger.debug(retryInfo + "message body:\n" + new String(msg.getBody())); - Acknowledgement acknowledgement = new Acknowledgement(); - Message toChannel = convertMessagingFromRocketMQMsg(msg, - acknowledgement); - acknowledgements.add(acknowledgement); - RocketMQInboundChannelAdapter.this.sendMessage(toChannel); - }); - return acknowledgements.get(0); - } - - private Message convertMessagingFromRocketMQMsg(MessageExt msg, - Acknowledgement acknowledgement) { - return MessageBuilder.withPayload(msg.getBody()) - .setHeaders(new RocketMQMessageHeaderAccessor() - .withAcknowledgment(acknowledgement).withTags(msg.getTags()) - .withKeys(msg.getKeys()).withFlag(msg.getFlag()) - .withRocketMessage(msg)) - .build(); } @Override @@ -272,69 +162,12 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { @Override public void close(RetryContext context, RetryCallback callback, Throwable throwable) { - if (throwable != null) { - Optional.ofNullable( - RocketMQInboundChannelAdapter.this.instrumentationManager) - .ifPresent(manager -> { - manager.getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumedFailure(); - }); - } - else { - Optional.ofNullable( - RocketMQInboundChannelAdapter.this.instrumentationManager) - .ifPresent(manager -> { - manager.getConsumerInstrumentation( - RocketMQInboundChannelAdapter.this.destination) - .markConsumed(); - }); - } } @Override public void onError(RetryContext context, RetryCallback callback, Throwable throwable) { - } - } - - protected class CloudStreamMessageListenerConcurrently - extends CloudStreamMessageListener implements MessageListenerConcurrently { - @Override - public ConsumeConcurrentlyStatus consumeMessage(final List msgs, - ConsumeConcurrentlyContext context) { - Acknowledgement acknowledgement = consumeMessage(msgs); - if (acknowledgement != null) { - context.setDelayLevelWhenNextConsume( - acknowledgement.getConsumeConcurrentlyDelayLevel()); - return acknowledgement.getConsumeConcurrentlyStatus(); - } - else { - context.setDelayLevelWhenNextConsume(consumerProperties.getExtension() - .getError().getDelayLevelWhenNextConsume()); - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - } - } - - protected class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener - implements MessageListenerOrderly { - - @Override - public ConsumeOrderlyStatus consumeMessage(List msgs, - ConsumeOrderlyContext context) { - Acknowledgement acknowledgement = consumeMessage(msgs); - if (acknowledgement != null) { - context.setSuspendCurrentQueueTimeMillis( - (acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill())); - return acknowledgement.getConsumeOrderlyStatus(); - } - else { - context.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension() - .getError().getSuspendCurrentQueueTimeMillis()); - return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; - } } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index f2dd937cb..65dbb9e56 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -16,122 +16,93 @@ package org.springframework.cloud.stream.binder.rocketmq.integration; -import org.apache.rocketmq.client.exception.MQBrokerException; +import java.util.Optional; + import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.client.producer.TransactionCheckListener; -import org.apache.rocketmq.client.producer.TransactionMQProducer; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor; -import org.springframework.cloud.stream.binder.rocketmq.exception.RocketMQSendFailureException; +import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation; -import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; -import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import org.springframework.context.Lifecycle; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageStrategy; -import org.springframework.integration.support.MutableMessage; +import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; import org.springframework.util.Assert; - -import java.time.Instant; -import java.util.Map; -import java.util.Optional; +import org.springframework.util.StringUtils; /** * @author Jim */ public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { - private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); - - private DefaultMQProducer producer; + private final static Logger log = LoggerFactory + .getLogger(RocketMQMessageHandler.class); - private ProducerInstrumentation producerInstrumentation; + private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy(); - private InstrumentationManager instrumentationManager; + private MessageChannel sendFailureChannel; - private LocalTransactionExecuter localTransactionExecuter; + private final RocketMQTemplate rocketMQTemplate; - private TransactionCheckListener transactionCheckListener; + private final Boolean transactional; - private MessageChannel sendFailureChannel; + private final String destination; - private final ExtendedProducerProperties producerProperties; + private final String groupName; - private final String destination; + private final InstrumentationManager instrumentationManager; - private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; + private boolean sync = false; private volatile boolean running = false; - public RocketMQMessageHandler(String destination, - ExtendedProducerProperties producerProperties, - RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, + public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String destination, + String groupName, Boolean transactional, InstrumentationManager instrumentationManager) { + this.rocketMQTemplate = rocketMQTemplate; this.destination = destination; - this.producerProperties = producerProperties; - this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; + this.groupName = groupName; + this.transactional = transactional; this.instrumentationManager = instrumentationManager; } @Override public void start() { - if (producerProperties.getExtension().getTransactional()) { - producer = new TransactionMQProducer(destination); - if (transactionCheckListener != null) { - ((TransactionMQProducer) producer) - .setTransactionCheckListener(transactionCheckListener); + if (!transactional) { + instrumentationManager + .addHealthInstrumentation(new Instrumentation(destination)); + try { + rocketMQTemplate.afterPropertiesSet(); + instrumentationManager.getHealthInstrumentation(destination) + .markStartedSuccessfully(); + } + catch (Exception e) { + instrumentationManager.getHealthInstrumentation(destination) + .markStartFailed(e); + log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); + throw new MessagingException(MessageBuilder.withPayload( + "RocketMQTemplate startup failed, Caused by " + e.getMessage()) + .build(), e); } - } - else { - producer = new DefaultMQProducer(destination); - } - - producer.setVipChannelEnabled( - producerProperties.getExtension().getVipChannelEnabled()); - - Optional.ofNullable(instrumentationManager).ifPresent(manager -> { - producerInstrumentation = manager.getProducerInstrumentation(destination); - manager.addHealthInstrumentation(producerInstrumentation); - }); - - producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr()); - - if (producerProperties.getExtension().getMaxMessageSize() > 0) { - producer.setMaxMessageSize( - producerProperties.getExtension().getMaxMessageSize()); - } - - try { - producer.start(); - Optional.ofNullable(producerInstrumentation) - .ifPresent(p -> p.markStartedSuccessfully()); - } - catch (MQClientException e) { - Optional.ofNullable(producerInstrumentation) - .ifPresent(p -> p.markStartFailed(e)); - logger.error( - "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); - throw new MessagingException(e.getMessage(), e); } running = true; } @Override public void stop() { - if (producer != null) { - producer.shutdown(); + if (!transactional) { + rocketMQTemplate.destroy(); } running = false; } @@ -144,100 +115,95 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li @Override protected void handleMessageInternal(org.springframework.messaging.Message message) throws Exception { - Message toSend = null; try { - if (message.getPayload() instanceof byte[]) { - toSend = new Message(destination, (byte[]) message.getPayload()); + final StringBuilder topicWithTags = new StringBuilder(destination); + String tags = Optional + .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("") + .toString(); + if (!StringUtils.isEmpty(tags)) { + topicWithTags.append(":").append(tags); } - else if (message.getPayload() instanceof String) { - toSend = new Message(destination, - ((String) message.getPayload()).getBytes()); + SendResult sendRes = null; + if (transactional) { + sendRes = rocketMQTemplate.sendMessageInTransaction(groupName, + topicWithTags.toString(), message, message.getHeaders() + .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG)); + log.debug("transactional send to topic " + topicWithTags + " " + sendRes); } else { - throw new UnsupportedOperationException("Payload class isn't supported: " - + message.getPayload().getClass()); - } - RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor( - message); - headerAccessor.setLeaveMutable(true); - toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel()); - toSend.setTags(headerAccessor.getTags()); - toSend.setKeys(headerAccessor.getKeys()); - toSend.setFlag(headerAccessor.getFlag()); - for (Map.Entry entry : headerAccessor.getUserProperties() - .entrySet()) { - toSend.putUserProperty(entry.getKey(), entry.getValue()); - } - - SendResult sendRes; - if (producerProperties.getExtension().getTransactional()) { - sendRes = producer.sendMessageInTransaction(toSend, - localTransactionExecuter, headerAccessor.getTransactionalArg()); - } - else { - sendRes = producer.send(toSend); + int delayLevel = 0; + try { + Object delayLevelObj = message.getHeaders() + .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0); + if (delayLevelObj instanceof Number) { + delayLevel = ((Number) delayLevelObj).intValue(); + } + else if (delayLevelObj instanceof String) { + delayLevel = Integer.parseInt((String) delayLevelObj); + } + } + catch (Exception e) { + // ignore + } + if (sync) { + sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, + rocketMQTemplate.getProducer().getSendMsgTimeout(), + delayLevel); + log.debug("sync send to topic " + topicWithTags + " " + sendRes); + } + else { + rocketMQTemplate.asyncSend(topicWithTags.toString(), message, + new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.debug("async send to topic " + topicWithTags + " " + + sendResult); + } + + @Override + public void onException(Throwable e) { + log.error( + "RocketMQ Message hasn't been sent. Caused by " + + e.getMessage()); + if (getSendFailureChannel() != null) { + getSendFailureChannel().send( + RocketMQMessageHandler.this.errorMessageStrategy + .buildErrorMessage( + new MessagingException( + message, e), + null)); + } + } + }); + } } - - if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { + if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { if (getSendFailureChannel() != null) { this.getSendFailureChannel().send(message); } else { - throw new RocketMQSendFailureException(message, toSend, + throw new MessagingException(message, new MQClientException("message hasn't been sent", null)); } } - if (message instanceof MutableMessage) { - RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message, - sendRes); - } - Optional.ofNullable(instrumentationManager).ifPresent(manager -> { - manager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP, - Instant.now().toEpochMilli()); - }); - Optional.ofNullable(producerInstrumentation).ifPresent(p -> p.markSent()); } - catch (MQClientException | RemotingException | MQBrokerException - | InterruptedException | UnsupportedOperationException e) { - Optional.ofNullable(producerInstrumentation) - .ifPresent(p -> p.markSentFailure()); - logger.error( - "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); + catch (Exception e) { + log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); if (getSendFailureChannel() != null) { - getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage( - new RocketMQSendFailureException(message, toSend, e), null)); + getSendFailureChannel().send(this.errorMessageStrategy + .buildErrorMessage(new MessagingException(message, e), null)); } else { - throw new RocketMQSendFailureException(message, toSend, e); + throw new MessagingException(message, e); } } } - /** - * Using in RocketMQ Transactional Mode. Set RocketMQ localTransactionExecuter in - * {@link DefaultMQProducer#sendMessageInTransaction}. - * @param localTransactionExecuter the executer running when produce msg. - */ - public void setLocalTransactionExecuter( - LocalTransactionExecuter localTransactionExecuter) { - this.localTransactionExecuter = localTransactionExecuter; - } - - /** - * Using in RocketMQ Transactional Mode. Set RocketMQ transactionCheckListener in - * {@link TransactionMQProducer#setTransactionCheckListener}. - * @param transactionCheckListener the listener set in {@link TransactionMQProducer}. - */ - public void setTransactionCheckListener( - TransactionCheckListener transactionCheckListener) { - this.transactionCheckListener = transactionCheckListener; - } - /** * Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent - * to this channel with a payload of a {@link RocketMQSendFailureException} with the - * failed message and cause. + * to this channel with a payload of a {@link MessagingException} with the failed + * message and cause. * @param sendFailureChannel the failure channel. * @since 0.2.2 */ @@ -259,4 +225,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li public MessageChannel getSendFailureChannel() { return sendFailureChannel; } + + public void setSync(boolean sync) { + this.sync = sync; + } } \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java deleted file mode 100644 index 6e4386d00..000000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.metrics; - -import com.codahale.metrics.MetricRegistry; - -/** - * @author Timur Valiev - * @author Jim - */ -public class ConsumerGroupInstrumentation extends Instrumentation { - private MetricRegistry metricRegistry; - - public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) { - super(name); - this.metricRegistry = metricRegistry; - } - -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java deleted file mode 100644 index 787e4b83f..000000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.metrics; - -import static com.codahale.metrics.MetricRegistry.name; - -import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; - -/** - * @author juven.xuxb - * @author Jim - */ -public class ConsumerInstrumentation extends Instrumentation { - - private final Counter totalConsumed; - private final Counter totalConsumedFailures; - private final Meter consumedPerSecond; - private final Meter consumedFailuresPerSecond; - - public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) { - super(baseMetricName); - - this.totalConsumed = registry - .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED)); - this.consumedPerSecond = registry - .meter(name(baseMetricName, Consumer.CONSUMED_PER_SECOND)); - this.totalConsumedFailures = registry - .counter(name(baseMetricName, Consumer.TOTAL_CONSUMED_FAILURES)); - this.consumedFailuresPerSecond = registry - .meter(name(baseMetricName, Consumer.CONSUMED_FAILURES_PER_SECOND)); - } - - public void markConsumed() { - totalConsumed.inc(); - consumedPerSecond.mark(); - } - - public void markConsumedFailure() { - totalConsumedFailures.inc(); - consumedFailuresPerSecond.mark(); - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java index d9997ce37..885a183f1 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java @@ -27,7 +27,7 @@ public class Instrumentation { protected final AtomicBoolean started = new AtomicBoolean(false); protected Exception startException = null; - Instrumentation(String name) { + public Instrumentation(String name) { this.name = name; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java index c1c1ceada..e07628bbc 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java @@ -22,47 +22,16 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Consumer; -import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer; - -import com.codahale.metrics.MetricRegistry; - /** * @author Timur Valiev * @author Jim */ public class InstrumentationManager { - private final MetricRegistry metricRegistry = new MetricRegistry(); private final Map runtime = new ConcurrentHashMap<>(); - private final Map producerInstrumentations = new HashMap<>(); - private final Map consumeInstrumentations = new HashMap<>(); - private final Map consumerGroupsInstrumentations = new HashMap<>(); - private final Map healthInstrumentations = new HashMap<>(); - public ProducerInstrumentation getProducerInstrumentation(String destination) { - String key = Producer.PREFIX + destination; - producerInstrumentations.putIfAbsent(key, - new ProducerInstrumentation(metricRegistry, key)); - return producerInstrumentations.get(key); - } - - public ConsumerInstrumentation getConsumerInstrumentation(String destination) { - String key = Consumer.PREFIX + destination; - consumeInstrumentations.putIfAbsent(key, - new ConsumerInstrumentation(metricRegistry, key)); - return consumeInstrumentations.get(key); - } - - public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) { - String key = Consumer.GROUP_PREFIX + group; - consumerGroupsInstrumentations.putIfAbsent(key, - new ConsumerGroupInstrumentation(metricRegistry, key)); - return consumerGroupsInstrumentations.get(key); - } - public Set getHealthInstrumentations() { return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue) .collect(Collectors.toSet()); @@ -72,11 +41,12 @@ public class InstrumentationManager { healthInstrumentations.put(instrumentation.getName(), instrumentation); } + public Instrumentation getHealthInstrumentation(String key) { + return healthInstrumentations.get(key); + } + public Map getRuntime() { return runtime; } - public MetricRegistry getMetricRegistry() { - return metricRegistry; - } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java deleted file mode 100644 index 1ede78026..000000000 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (C) 2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.rocketmq.metrics; - -import static com.codahale.metrics.MetricRegistry.name; - -import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.Metrics.Producer; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; - -/** - * @author juven.xuxb - * @author Jim - */ -public class ProducerInstrumentation extends Instrumentation { - - private final Counter totalSent; - private final Counter totalSentFailures; - private final Meter sentPerSecond; - private final Meter sentFailuresPerSecond; - - public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) { - super(baseMetricName); - - this.totalSent = registry.counter(name(baseMetricName, Producer.TOTAL_SENT)); - this.totalSentFailures = registry - .counter(name(baseMetricName, Producer.TOTAL_SENT_FAILURES)); - this.sentPerSecond = registry - .meter(name(baseMetricName, Producer.SENT_PER_SECOND)); - this.sentFailuresPerSecond = registry - .meter(name(baseMetricName, Producer.SENT_FAILURES_PER_SECOND)); - } - - public void markSent() { - totalSent.inc(); - sentPerSecond.mark(); - } - - public void markSentFailure() { - totalSentFailures.inc(); - sentFailuresPerSecond.mark(); - } -} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index 5c12a3b46..af292e74e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -16,7 +16,9 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; +import org.apache.rocketmq.common.MixAll; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants; /** * @author Timur Valiev @@ -25,24 +27,69 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") public class RocketMQBinderConfigurationProperties { - private String namesrvAddr = "127.0.0.1:9876"; + /** + * The name server for rocketMQ, formats: `host:port;host:port`. + */ + private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER; - private String logLevel = "ERROR"; + /** + * The property of "access-key". + */ + private String accessKey; - public String getNamesrvAddr() { - return namesrvAddr; + /** + * The property of "secret-key". + */ + private String secretKey; + + /** + * Switch flag instance for message trace. + */ + private boolean enableMsgTrace = true; + + /** + * The name value of message trace topic.If you don't config,you can use the default + * trace topic name. + */ + private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC; + + public String getNameServer() { + return nameServer; + } + + public void setNameServer(String nameServer) { + this.nameServer = nameServer; + } + + public String getAccessKey() { + return accessKey; } - public void setNamesrvAddr(String namesrvAddr) { - this.namesrvAddr = namesrvAddr; + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; } - public String getLogLevel() { - return logLevel; + public String getSecretKey() { + return secretKey; } - public void setLogLevel(String logLevel) { - this.logLevel = logLevel; + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; } + public boolean isEnableMsgTrace() { + return enableMsgTrace; + } + + public void setEnableMsgTrace(boolean enableMsgTrace) { + this.enableMsgTrace = enableMsgTrace; + } + + public String getCustomizedTraceTopic() { + return customizedTraceTopic; + } + + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.customizedTraceTopic = customizedTraceTopic; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 8202fefb5..6cfe9b842 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -50,41 +50,17 @@ public class RocketMQConsumerProperties { */ private Boolean orderly = false; - private Boolean enabled = true; - - private Error error; - - public static class Error { - - /** - * Reconsume later timeMillis in ConsumeOrderlyContext. - */ - private Long suspendCurrentQueueTimeMillis = 1000L; - - /** - * Message consume retry strategy in ConsumeConcurrentlyContext. - * - * -1,no retry,put into DLQ directly 0,broker control retry frequency >0,client - * control retry frequency - */ - private Integer delayLevelWhenNextConsume = 0; - - public Long getSuspendCurrentQueueTimeMillis() { - return suspendCurrentQueueTimeMillis; - } - - public void setSuspendCurrentQueueTimeMillis(Long suspendCurrentQueueTimeMillis) { - this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; - } + /** + * for concurrently listener. message consume retry strategy + */ + private int delayLevelWhenNextConsume = 0; - public Integer getDelayLevelWhenNextConsume() { - return delayLevelWhenNextConsume; - } + /** + * for orderly listener. next retry delay time + */ + private long suspendCurrentQueueTimeMillis = 1000; - public void setDelayLevelWhenNextConsume(Integer delayLevelWhenNextConsume) { - this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; - } - } + private Boolean enabled = true; public String getTags() { return tags; @@ -126,11 +102,19 @@ public class RocketMQConsumerProperties { this.broadcasting = broadcasting; } - public Error getError() { - return error; + public int getDelayLevelWhenNextConsume() { + return delayLevelWhenNextConsume; + } + + public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { + this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + } + + public long getSuspendCurrentQueueTimeMillis() { + return suspendCurrentQueueTimeMillis; } - public void setError(Error error) { - this.error = error; + public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java index b526d960d..a8d784e73 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java @@ -17,8 +17,6 @@ package org.springframework.cloud.stream.binder.rocketmq.properties; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.LocalTransactionExecuter; -import org.apache.rocketmq.client.producer.TransactionCheckListener; /** * @author Timur Valiev @@ -29,23 +27,61 @@ public class RocketMQProducerProperties { private Boolean enabled = true; /** - * Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize} + * Name of producer. */ - private Integer maxMessageSize = 0; + private String group; + + /** + * Maximum allowed message size in bytes {@link DefaultMQProducer#maxMessageSize}. + */ + private Integer maxMessageSize = 1024 * 1024 * 4; private Boolean transactional = false; + private Boolean sync = false; + + private Boolean vipChannelEnabled = true; + /** - * full class name of {@link LocalTransactionExecuter} + * Millis of send message timeout. */ - private String executer; + private int sendMessageTimeout = 3000; /** - * full class name of {@link TransactionCheckListener} + * Compress message body threshold, namely, message body larger than 4k will be + * compressed on default. */ - private String transactionCheckListener; + private int compressMessageBodyThreshold = 1024 * 4; - private Boolean vipChannelEnabled = true; + /** + * Maximum number of retry to perform internally before claiming sending failure in + * synchronous mode. This may potentially cause message duplication which is up to + * application developers to resolve. + */ + private int retryTimesWhenSendFailed = 2; + + /** + *

+ * Maximum number of retry to perform internally before claiming sending failure in + * asynchronous mode. + *

+ * This may potentially cause message duplication which is up to application + * developers to resolve. + */ + private int retryTimesWhenSendAsyncFailed = 2; + + /** + * Indicate whether to retry another broker on sending failure internally. + */ + private boolean retryNextServer = false; + + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } public Boolean getEnabled() { return enabled; @@ -71,20 +107,12 @@ public class RocketMQProducerProperties { this.transactional = transactional; } - public String getExecuter() { - return executer; - } - - public void setExecuter(String executer) { - this.executer = executer; - } - - public String getTransactionCheckListener() { - return transactionCheckListener; + public Boolean getSync() { + return sync; } - public void setTransactionCheckListener(String transactionCheckListener) { - this.transactionCheckListener = transactionCheckListener; + public void setSync(Boolean sync) { + this.sync = sync; } public Boolean getVipChannelEnabled() { @@ -94,4 +122,45 @@ public class RocketMQProducerProperties { public void setVipChannelEnabled(Boolean vipChannelEnabled) { this.vipChannelEnabled = vipChannelEnabled; } -} + + public int getSendMessageTimeout() { + return sendMessageTimeout; + } + + public void setSendMessageTimeout(int sendMessageTimeout) { + this.sendMessageTimeout = sendMessageTimeout; + } + + public int getCompressMessageBodyThreshold() { + return compressMessageBodyThreshold; + } + + public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) { + this.compressMessageBodyThreshold = compressMessageBodyThreshold; + } + + public int getRetryTimesWhenSendFailed() { + return retryTimesWhenSendFailed; + } + + public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { + this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; + } + + public int getRetryTimesWhenSendAsyncFailed() { + return retryTimesWhenSendAsyncFailed; + } + + public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) { + this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed; + } + + public boolean isRetryNextServer() { + return retryNextServer; + } + + public void setRetryNextServer(boolean retryNextServer) { + this.retryNextServer = retryNextServer; + } + +} \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java index 456ef5cc6..d8d08626b 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java @@ -18,8 +18,6 @@ package org.springframework.cloud.stream.binder.rocketmq.provisioning; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQClientException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; @@ -36,9 +34,6 @@ import org.springframework.cloud.stream.provisioning.ProvisioningProvider; public class RocketMQTopicProvisioner implements ProvisioningProvider, ExtendedProducerProperties> { - private static final Logger logger = LoggerFactory - .getLogger(RocketMQTopicProvisioner.class); - @Override public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties properties) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories index 43b85129a..46f79a889 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories @@ -1,2 +1,2 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration +org.springframework.cloud.stream.binder.rocketmq.config.RocketMQComponent4BinderAutoConfiguration diff --git a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java index 9b1be23f5..4a787e65d 100644 --- a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java +++ b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -22,7 +22,6 @@ import org.junit.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration; -import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; @@ -33,8 +32,7 @@ public class RocketMQAutoConfigurationTests { private ApplicationContextRunner contextRunner = new ApplicationContextRunner() .withConfiguration( - AutoConfigurations.of(RocketMQBinderEndpointAutoConfiguration.class, - RocketMQBinderAutoConfiguration.class)) + AutoConfigurations.of(RocketMQBinderAutoConfiguration.class)) .withPropertyValues( "spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876", "spring.cloud.stream.bindings.output.destination=TopicOrderTest", @@ -55,7 +53,7 @@ public class RocketMQAutoConfigurationTests { this.contextRunner.run(context -> { RocketMQBinderConfigurationProperties binderConfigurationProperties = context .getBean(RocketMQBinderConfigurationProperties.class); - assertThat(binderConfigurationProperties.getNamesrvAddr()) + assertThat(binderConfigurationProperties.getNameServer()) .isEqualTo("127.0.0.1:9876"); RocketMQExtendedBindingProperties bindingProperties = context .getBean(RocketMQExtendedBindingProperties.class);