|
|
|
@ -0,0 +1,378 @@
|
|
|
|
|
== Spring Cloud Alibaba RocketMQ Binder (NEW)
|
|
|
|
|
|
|
|
|
|
=== RocketMQ 介绍
|
|
|
|
|
|
|
|
|
|
https://rocketmq.apache.org[RocketMQ] 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
|
|
|
|
|
|
|
|
|
|
具有以下特点:
|
|
|
|
|
|
|
|
|
|
* 能够保证严格的消息顺序
|
|
|
|
|
|
|
|
|
|
* 提供丰富的消息拉取模式
|
|
|
|
|
|
|
|
|
|
* 高效的订阅者水平扩展能力
|
|
|
|
|
|
|
|
|
|
* 实时的消息订阅机制
|
|
|
|
|
|
|
|
|
|
* 亿级消息堆积能力
|
|
|
|
|
|
|
|
|
|
=== RocketMQ 基本使用
|
|
|
|
|
|
|
|
|
|
* 下载 RocketMQ
|
|
|
|
|
|
|
|
|
|
下载 https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip[RocketMQ最新的二进制文件],并解压
|
|
|
|
|
|
|
|
|
|
解压后的目录结构如下:
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
apache-rocketmq
|
|
|
|
|
├── LICENSE
|
|
|
|
|
├── NOTICE
|
|
|
|
|
├── README.md
|
|
|
|
|
├── benchmark
|
|
|
|
|
├── bin
|
|
|
|
|
├── conf
|
|
|
|
|
└── lib
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
* 启动 NameServer
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
nohup sh bin/mqnamesrv &
|
|
|
|
|
tail -f ~/logs/rocketmqlogs/namesrv.log
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
* 启动 Broker
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
nohup sh bin/mqbroker -n localhost:9876 &
|
|
|
|
|
tail -f ~/logs/rocketmqlogs/broker.log
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
* 发送、接收消息
|
|
|
|
|
|
|
|
|
|
发送消息:
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
发送成功后显示:`SendResult [sendStatus=SEND_OK, msgId= ...`
|
|
|
|
|
|
|
|
|
|
接收消息:
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
接收成功后显示:`ConsumeMessageThread_%d Receive New Messages: [MessageExt...`
|
|
|
|
|
|
|
|
|
|
* 关闭 Server
|
|
|
|
|
|
|
|
|
|
```bash
|
|
|
|
|
sh bin/mqshutdown broker
|
|
|
|
|
sh bin/mqshutdown namesrv
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
=== Spring Cloud Stream 介绍
|
|
|
|
|
|
|
|
|
|
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 `Spring Integration` 与 Broker 进行连接。
|
|
|
|
|
|
|
|
|
|
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
|
|
|
|
|
|
|
|
|
|
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
|
|
|
|
|
|
|
|
|
|
* Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
|
|
|
|
|
|
|
|
|
|
比如 `Kafka` 的实现 `KafkaMessageChannelBinder`,`RabbitMQ` 的实现 `RabbitMessageChannelBinder` 以及 `RocketMQ` 的实现 `RocketMQMessageChannelBinder`。
|
|
|
|
|
|
|
|
|
|
* Binding: 包括 Input Binding 和 Output Binding。
|
|
|
|
|
|
|
|
|
|
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
|
|
|
|
|
|
|
|
|
|
.Spring Cloud Stream
|
|
|
|
|
image::https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png[]
|
|
|
|
|
|
|
|
|
|
使用 Spring Cloud Stream 完成一段简单的消息发送和消息接收代码:
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
MessageChannel messageChannel = new DirectChannel();
|
|
|
|
|
|
|
|
|
|
// 消息订阅
|
|
|
|
|
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
|
|
|
|
|
@Override
|
|
|
|
|
public void handleMessage(Message<?> message) throws MessagingException {
|
|
|
|
|
System.out.println("receive msg: " + message.getPayload());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// 消息发送
|
|
|
|
|
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
这段代码所有的消息类都是 `spring-messaging` 模块里提供的。屏蔽具体消息中间件的底层实现,如果想用更换消息中间件,在配置文件里配置相关消息中间件信息以及修改 binder 依赖即可。
|
|
|
|
|
|
|
|
|
|
**Spring Cloud Stream 底层基于这段代码去做了各种抽象。**
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
=== 如何使用 Spring Cloud Alibaba RocketMQ Binder
|
|
|
|
|
|
|
|
|
|
如果要在您的项目中引入 RocketMQ Binder,需要引入如下 maven 依赖:
|
|
|
|
|
|
|
|
|
|
```xml
|
|
|
|
|
<dependency>
|
|
|
|
|
<groupId>com.alibaba.cloud</groupId>
|
|
|
|
|
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
|
|
|
|
|
</dependency>
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
或者可以使用 Spring Cloud Stream RocketMQ Starter:
|
|
|
|
|
|
|
|
|
|
```xml
|
|
|
|
|
<dependency>
|
|
|
|
|
<groupId>com.alibaba.cloud</groupId>
|
|
|
|
|
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
|
|
|
|
|
</dependency>
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
=== Spring Cloud Alibaba RocketMQ Binder 实现
|
|
|
|
|
|
|
|
|
|
这是 Spring Cloud Stream RocketMQ Binder 的实现架构:
|
|
|
|
|
|
|
|
|
|
.SCS RocketMQ Binder
|
|
|
|
|
image::https://img.alicdn.com/tfs/TB1v8rcbUY1gK0jSZFCXXcwqXXa-1236-773.png[]
|
|
|
|
|
|
|
|
|
|
RocketMQ Binder 的重构优化去除了对 https://github.com/apache/rocketmq-spring[RocketMQ-Spring]框架的依赖 。
|
|
|
|
|
RocketMQ Binder 核心类 `RocketMQMessageChannelBinder` 实现了 Spring Cloud Stream 规范,内部会构建 https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.java[RocketMQInboundChannelAdapter] 和 https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProducerMessageHandler.java[RocketMQProducerMessageHandler]。
|
|
|
|
|
|
|
|
|
|
`RocketMQProducerMessageHandler` 会基于 Binding 配置通过 https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java[RocketMQProduceFactory]构造 RocketMQ Producer,其内部会把 `spring-messaging` 模块内 `org.springframework.messaging.Message` 消息类转换成 RocketMQ 的消息类 `org.apache.rocketmq.common.message.Message`,然后发送出去。
|
|
|
|
|
|
|
|
|
|
`RocketMQInboundChannelAdapter` 也会基于 Binding 配置通过 https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java[RocketMQConsumerFactory]构造 DefaultMQPushConsumer,其内部会启动 RocketMQ Consumer 接收消息。
|
|
|
|
|
|
|
|
|
|
NOTE: 与 https://github.com/apache/rocketmq-spring[RocketMQ-Spring] 框架的兼容需要手动处理
|
|
|
|
|
|
|
|
|
|
目前 Binder 支持在 `Header` 中设置相关的 key 来进行 RocketMQ Message 消息的特性设置。
|
|
|
|
|
|
|
|
|
|
比如 `TAGS`、`KEYS`、`TRANSACTIONAL_ARGS` 等 RocketMQ 消息对应的标签,详情见 https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/contants/RocketMQConst.java[com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst]
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
MessageBuilder builder = MessageBuilder.withPayload(msg)
|
|
|
|
|
.setHeader(RocketMQHeaders.TAGS, "binder")
|
|
|
|
|
.setHeader(RocketMQHeaders.KEYS, "my-key");
|
|
|
|
|
Message message = builder.build();
|
|
|
|
|
output().send(message);
|
|
|
|
|
```
|
|
|
|
|
NOTE: 更多使用请参考样例: https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-produce-example/src/main/java/com/alibaba/cloud/examples/SenderService.java[com.alibaba.cloud.examples.SenderService]
|
|
|
|
|
|
|
|
|
|
=== MessageSource 支持
|
|
|
|
|
|
|
|
|
|
SCS RocketMQ Binder 支持 `MessageSource`,可以进行消息的拉取,例子如下:
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
@SpringBootApplication
|
|
|
|
|
@EnableBinding(MQApplication.PolledProcessor.class)
|
|
|
|
|
public class MQApplication {
|
|
|
|
|
|
|
|
|
|
private final Logger logger =
|
|
|
|
|
LoggerFactory.getLogger(MQApplication.class);
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
|
SpringApplication.run(MQApplication.class, args);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
public ApplicationRunner runner(PollableMessageSource source,
|
|
|
|
|
MessageChannel dest) {
|
|
|
|
|
return args -> {
|
|
|
|
|
while (true) {
|
|
|
|
|
boolean result = source.poll(m -> {
|
|
|
|
|
String payload = (String) m.getPayload();
|
|
|
|
|
logger.info("Received: " + payload);
|
|
|
|
|
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
|
|
|
|
|
.copyHeaders(m.getHeaders())
|
|
|
|
|
.build());
|
|
|
|
|
}, new ParameterizedTypeReference<String>() { });
|
|
|
|
|
if (result) {
|
|
|
|
|
logger.info("Processed a message");
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
logger.info("Nothing to do");
|
|
|
|
|
}
|
|
|
|
|
Thread.sleep(5_000);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static interface PolledProcessor {
|
|
|
|
|
|
|
|
|
|
@Input
|
|
|
|
|
PollableMessageSource source();
|
|
|
|
|
|
|
|
|
|
@Output
|
|
|
|
|
MessageChannel dest();
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
=== 配置选项
|
|
|
|
|
|
|
|
|
|
==== RocketMQ Binder Properties
|
|
|
|
|
|
|
|
|
|
spring.cloud.stream.rocketmq.binder.name-server::
|
|
|
|
|
RocketMQ NameServer 地址(老版本使用 namesrv-addr 配置项)。
|
|
|
|
|
+
|
|
|
|
|
Default: `127.0.0.1:9876`.
|
|
|
|
|
spring.cloud.stream.rocketmq.binder.access-key::
|
|
|
|
|
阿里云账号 AccessKey。
|
|
|
|
|
+
|
|
|
|
|
Default: null.
|
|
|
|
|
spring.cloud.stream.rocketmq.binder.secret-key::
|
|
|
|
|
阿里云账号 SecretKey。
|
|
|
|
|
+
|
|
|
|
|
Default: null.
|
|
|
|
|
spring.cloud.stream.rocketmq.binder.enable-msg-trace::
|
|
|
|
|
是否为 Producer 和 Consumer 开启消息轨迹功能
|
|
|
|
|
+
|
|
|
|
|
Default: `true`.
|
|
|
|
|
spring.cloud.stream.rocketmq.binder.customized-trace-topic::
|
|
|
|
|
消息轨迹开启后存储的 topic 名称。
|
|
|
|
|
+
|
|
|
|
|
Default: `RMQ_SYS_TRACE_TOPIC`.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
==== RocketMQ Consumer Properties
|
|
|
|
|
|
|
|
|
|
下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.` 为前缀的 RocketMQ Consumer 相关的配置。
|
|
|
|
|
更多见 https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java[com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties]。
|
|
|
|
|
|
|
|
|
|
enable::
|
|
|
|
|
是否启用 Consumer。
|
|
|
|
|
+
|
|
|
|
|
默认值: `true`.
|
|
|
|
|
subscription::
|
|
|
|
|
Consumer 基于 TAGS 订阅,多个 tag 以 `||` 分割。更多见 `com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties.subscription`
|
|
|
|
|
+
|
|
|
|
|
默认值: empty.
|
|
|
|
|
messageModel::
|
|
|
|
|
Consumer 消费模式。如果想让每一个的订阅者都能接收到消息,可以使用广播模式。更多见 `org.apache.rocketmq.common.protocol.heartbeat.MessageModel`
|
|
|
|
|
+
|
|
|
|
|
默认值: `CLUSTERING`.
|
|
|
|
|
consumeFromWhere::
|
|
|
|
|
Consumer 从哪里开始消费。更多见 `org.apache.rocketmq.common.consumer.ConsumeFromWhere`
|
|
|
|
|
+
|
|
|
|
|
默认值: `CONSUME_FROM_LAST_OFFSET`.
|
|
|
|
|
|
|
|
|
|
#下面的这些配置是 Consumer Push 模式相关的配置。#
|
|
|
|
|
`spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.push.`
|
|
|
|
|
|
|
|
|
|
orderly::
|
|
|
|
|
是否同步消费消息模式
|
|
|
|
|
+
|
|
|
|
|
默认值: `false`.
|
|
|
|
|
delayLevelWhenNextConsume::
|
|
|
|
|
异步消费消息模式下消费失败重试策略:
|
|
|
|
|
* -1,不重复,直接放入死信队列
|
|
|
|
|
* 0,broker 控制重试策略
|
|
|
|
|
* >0,client 控制重试策略
|
|
|
|
|
+
|
|
|
|
|
默认值: `0`.
|
|
|
|
|
suspendCurrentQueueTimeMillis::
|
|
|
|
|
同步消费消息模式下消费失败后再次消费的时间间隔。
|
|
|
|
|
+
|
|
|
|
|
默认值: `1000`.
|
|
|
|
|
|
|
|
|
|
其他更多参数见 `com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties.Push`
|
|
|
|
|
|
|
|
|
|
#下面的这些配置是 Consumer Pull 模式相关的配置。#
|
|
|
|
|
`spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.pull.`
|
|
|
|
|
|
|
|
|
|
pullThreadNums::
|
|
|
|
|
消费时拉取的线程数
|
|
|
|
|
+
|
|
|
|
|
默认值: `20`.
|
|
|
|
|
pollTimeoutMillis::
|
|
|
|
|
拉取时的超时毫秒数
|
|
|
|
|
+
|
|
|
|
|
默认值: `1000 * 5`.
|
|
|
|
|
|
|
|
|
|
其他更多参数见 `com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties.Pull`.
|
|
|
|
|
|
|
|
|
|
NOTE: 更多参数见 https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java[com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties]
|
|
|
|
|
|
|
|
|
|
==== RocketMQ Provider Properties
|
|
|
|
|
|
|
|
|
|
下面的这些配置是以 `spring.cloud.stream.rocketmq.bindings.<channelName>.producer.` 为前缀的 RocketMQ Producer 相关的配置。更多见 https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java[com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties]
|
|
|
|
|
|
|
|
|
|
enable::
|
|
|
|
|
是否启用 Producer。
|
|
|
|
|
+
|
|
|
|
|
默认值: `true`.
|
|
|
|
|
group::
|
|
|
|
|
Producer group name。
|
|
|
|
|
+
|
|
|
|
|
默认值: empty.
|
|
|
|
|
maxMessageSize::
|
|
|
|
|
消息发送的最大字节数。
|
|
|
|
|
+
|
|
|
|
|
默认值: `8249344`.
|
|
|
|
|
producerType::
|
|
|
|
|
消息生产者类型,普通或者事务。更多见 `com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties.ProducerType`.
|
|
|
|
|
+
|
|
|
|
|
默认值: `Normal`.
|
|
|
|
|
transactionListener::
|
|
|
|
|
事务消息监听器的beanName,在 `producerType=Trans` 时才有效;必须是实现 `org.apache.rocketmq.client.producer.TransactionListener` 接口的Spring Bean。
|
|
|
|
|
|
|
|
|
|
sendType::
|
|
|
|
|
消息发送类型(同步、异步、单向)。更多见`com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties.SendType`.
|
|
|
|
|
+
|
|
|
|
|
默认值: `Sync`.
|
|
|
|
|
sendCallBack::
|
|
|
|
|
消息发送后回调函数的beanName,在 `sendType=Async` 时才有效;必须是实现 `org.apache.rocketmq.client.producer.SendCallback` 接口的Spring Bean。
|
|
|
|
|
vipChannelEnabled::
|
|
|
|
|
是否在 Vip Channel 上发送消息。
|
|
|
|
|
+
|
|
|
|
|
默认值: `true`.
|
|
|
|
|
sendMessageTimeout::
|
|
|
|
|
发送消息的超时时间(毫秒)。
|
|
|
|
|
+
|
|
|
|
|
默认值: `3000`.
|
|
|
|
|
compressMessageBodyThreshold::
|
|
|
|
|
消息体压缩阀值(当消息体超过 4k 的时候会被压缩)。
|
|
|
|
|
+
|
|
|
|
|
默认值: `4096`.
|
|
|
|
|
retryTimesWhenSendFailed::
|
|
|
|
|
在同步发送消息的模式下,消息发送失败的重试次数。
|
|
|
|
|
+
|
|
|
|
|
默认值: `2`.
|
|
|
|
|
retryTimesWhenSendAsyncFailed::
|
|
|
|
|
在异步发送消息的模式下,消息发送失败的重试次数。
|
|
|
|
|
+
|
|
|
|
|
默认值: `2`.
|
|
|
|
|
retryAnotherBroker::
|
|
|
|
|
消息发送失败的情况下是否重试其它的 broker。
|
|
|
|
|
+
|
|
|
|
|
默认值: `false`.
|
|
|
|
|
|
|
|
|
|
NOTE: 生产者其他更多参数请见:
|
|
|
|
|
https://github.com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java[com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties]
|
|
|
|
|
|
|
|
|
|
=== 阿里云 MQ 服务
|
|
|
|
|
|
|
|
|
|
使用阿里云 MQ 服务需要配置 AccessKey、SecretKey 以及云上的 NameServer 地址。
|
|
|
|
|
|
|
|
|
|
NOTE: 0.1.2 & 0.2.2 & 0.9.0 才支持该功能
|
|
|
|
|
|
|
|
|
|
```properties
|
|
|
|
|
spring.cloud.stream.rocketmq.binder.access-key=YourAccessKey
|
|
|
|
|
spring.cloud.stream.rocketmq.binder.secret-key=YourSecretKey
|
|
|
|
|
spring.cloud.stream.rocketmq.binder.name-server=NameServerInMQ
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
NOTE: topic 和 group 请以 实例id% 为前缀进行配置。比如 topic 为 "test",需要配置成 "实例id%test"
|
|
|
|
|
|
|
|
|
|
.NameServer 的获取(配置中请去掉 http:// 前缀)
|
|
|
|
|
image::https://spring-cloud-alibaba.oss-cn-beijing.aliyuncs.com/MQ.png[]
|