rocketmq example docs

pull/2570/head
sorie 3 years ago
parent 6fcee00660
commit ce8e5ec17c

@ -11,7 +11,7 @@
这是官方对 Spring Cloud Stream 的一段介绍:
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 `Spring Integration` 与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念Binder 和 Binding。
@ -28,15 +28,31 @@ Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间
![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png)
## 示例
### 如何接入
在启动示例进行演示之前,我们先了解一下 Spring Cloud 应用如何接入 RocketMQ Binder。
## 准备工作
> **注意:本章节只是为了便于您理解接入方式,本示例代码中已经完成****接入工作,您无需再进行修改。**
### 下载并启动 RocketMQ
1. 首先,修改 `pom.xml` 文件,引入 RocketMQ Stream Starter。
**在接入 RocketMQ Binder 之前,首先需要启动 RocketMQ 的 Name Server 和 Broker。**
1. 下载[RocketMQ最新的二进制文件](https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip),并解压
2. 启动 Name Server
```bash
sh bin/mqnamesrv
```
3. 启动 Broker
```bash
sh bin/mqbroker -n localhost:9876
```
### 引入依赖
修改 `pom.xml` 文件,引入 RocketMQ Stream Starter。
```xml
<dependency>
@ -45,7 +61,17 @@ Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间
</dependency>
```
2. 配置 Input 和 Output 的 Binding 信息并配合 `@EnableBinding` 注解使其生效
## 简单示例
### 创建Topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
```
### 示例代码
配置 Input 和 Output 的 Binding 信息并配合 `@EnableBinding` 注解使其生效
```java
@SpringBootApplication
@ -68,33 +94,6 @@ spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=test-group
```
3. 消息发送及消息订阅
### 下载并启动 RocketMQ
**在接入 RocketMQ Binder 之前,首先需要启动 RocketMQ 的 Name Server 和 Broker。**
1. 下载[RocketMQ最新的二进制文件](https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip),并解压
2. 启动 Name Server
```bash
sh bin/mqnamesrv
```
3. 启动 Broker
```bash
sh bin/mqbroker -n localhost:9876
```
4. 创建 Topic: test-topic
```bash
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
```
### 应用启动
@ -105,7 +104,7 @@ sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
spring.application.name=rocketmq-example
server.port=28081
```
2. 启动应用,支持 IDE 直接启动和编译打包后启动。
1. IDE 直接启动:找到主类 `RocketMQApplication`,执行 main 方法启动应用。
@ -196,6 +195,509 @@ public class ReceiveService {
}
```
## 广播消费示例
广播会发送消息给所有消费者。如果你想同一消费组下所有消费者接收到同一个topic下的消息广播消费非常适合此场景。
### 创建Topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t broadcast
```
### 生产者
**application.yml**
```yaml
server:
port: 28085
spring:
application:
name: rocketmq-broadcast-producer-example
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
bindings:
producer-out-0:
destination: broadcast
logging:
level:
org.springframework.context.support: debug
```
**code**
使用`ApplicationRunner`和`StreamBridge`发送消息。
```java
@SpringBootApplication
public class RocketMQBroadcastProducerApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQBroadcastProducerApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
}
@Bean
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
}
```
### 消费者
启动两个消费者实例。
#### 消费者1
**application.yml**
```yaml
server:
port: 28084
spring:
application:
name: rocketmq-broadcast-consumer1-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
consumer-in-0:
consumer:
messageModel: BROADCASTING
bindings:
consumer-in-0:
destination: broadcast
group: broadcast-consumer
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQBroadcastConsumer1Application {
private static final Logger log = LoggerFactory
.getLogger(RocketMQBroadcastConsumer1Application.class);
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args);
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
```
#### 消费者2
**application.yml**
```yaml
server:
port: 28083
spring:
application:
name: rocketmq-broadcast-consumer2-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
consumer-in-0:
consumer:
messageModel: BROADCASTING
bindings:
consumer-in-0:
destination: broadcast
group: broadcast-consumer
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQBroadcastConsumer2Application {
private static final Logger log = LoggerFactory
.getLogger(RocketMQBroadcastConsumer2Application.class);
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
```
## 顺序消费示例
顺序消息FIFO消息是消息队列RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
顺序消息分为两类:
- 全局顺序对于指定的一个Topic所有消息按照严格的先入先出FIFOFirst In First Out的顺序进行发布和消费。分区顺序对于指定的一个Topic所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段和普通消息的Key是完全不同的概念。
### 创建Topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
```
### 示例代码
**application.yml**
```yaml
server:
port: 28082
spring:
application:
name: rocketmq-orderly-consume-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
# 定义messageSelector
messageQueueSelector: orderlyMessageQueueSelector
consumer-in-0:
consumer:
# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
subscription: 'TagA || TagC || TagD'
push:
orderly: true
bindings:
producer-out-0:
destination: orderly
consumer-in-0:
destination: orderly
group: orderly-consumer
logging:
level:
org.springframework.context.support: debug
```
**MessageQueueSelector**
选择适合自己的分区选择算法,保证同一个参数得到的结果相同。
```java
@Component
public class OrderlyMessageQueueSelector implements MessageQueueSelector {
private static final Logger log = LoggerFactory
.getLogger(OrderlyMessageQueueSelector.class);
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size();
return mqs.get(index);
}
}
```
**生产者&消费者**
```java
@SpringBootApplication
public class RocketMQOrderlyConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQOrderlyConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
/***
* tag array.
*/
public static final String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
public static void main(String[] args) {
SpringApplication.run(RocketMQOrderlyConsumeApplication.class, args);
}
@Bean
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
MessageConst.PROPERTY_TAGS).toString();
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
msg.getHeaders().get(tagHeaderKey).toString());
try {
Thread.sleep(100);
}
catch (InterruptedException ignored) {
}
};
}
}
```
## 延时消息示例
- 延时消息Producer将消息发送到消息队列RocketMQ服务端但并不期望立马投递这条消息而是延迟一定时间后才投递到Consumer进行消费该消息即延时消息。
### 创建Topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t delay
```
### 示例代码
**application.yml**
```yaml
server:
port: 28086
spring:
application:
name: rocketmq-delay-consume-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
bindings:
producer-out-0:
destination: delay
consumer-in-0:
destination: delay
group: delay-group
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQDelayConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQDelayConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
}
@Bean
public ApplicationRunner producerDelay() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
// 设置延时等级1~10
headers.put("DELAY", 2);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public ApplicationRunner producerSchedule() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
// 发送延时消息需要设置延时时间单位毫秒ms消息将在指定延时时间后投递例如消息将在3秒后投递。
long delayTime = System.currentTimeMillis() + 3000;
headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
```
## 过滤消息示例
### 创建Topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t sql
```
### 示例代码
**application.yml**
支持tag过滤或者sql过滤设置`spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.subscription`即可。
tag示例: `tag:red || blue`
sql示例: `sql:(color in ('red1', 'red2', 'red4') and price>3)`
更多请参考: [Filter](https://rocketmq.apache.org/docs/filter-by-sql92-example/)
```yaml
server:
port: 28087
spring:
application:
name: rocketmq-sql-consume-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
consumer-in-0:
consumer:
# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)
bindings:
producer-out-0:
destination: sql
consumer-in-0:
destination: sql
group: sql-group
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQSqlConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQSqlConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQSqlConsumeApplication.class, args);
}
/**
* color array.
*/
public static final String[] color = new String[] {"red1", "red2", "red3", "red4", "red5"};
/**
* price array.
*/
public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5};
@Bean
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put("color", color[i % color.length]);
headers.put("price", price[i % price.length]);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
String colorHeaderKey = "color";
String priceHeaderKey = "price";
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " COLOR:" +
msg.getHeaders().get(colorHeaderKey).toString() + " " +
"PRICE: " + msg.getHeaders().get(priceHeaderKey).toString());
};
}
}
```
## Endpoint 信息查看
Spring Boot 应用支持通过 Endpoint 来暴露相关信息RocketMQ Stream Starter 也支持这一点。

@ -24,15 +24,29 @@ This is a overview of Spring Cloud Stream.
![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png)
## Demo
## Preparation
### Integration with RocketMQ Binder
### Download and Startup RocketMQ
You should startup Name Server and Broker before using RocketMQ Binder.
1. Download [RocketMQ](https://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip) and unzip it.
2. Startup Name Server
Before we start the demo, let's learn how to Integration with RocketMQ Binder to a Spring Cloud application.
```bash
sh bin/mqnamesrv
```
3. Startup Broker
```bash
sh bin/mqbroker -n localhost:9876
```
**Note: This section is to show you how to connect to Sentinel. The configurations have been completed in the following example, so you don't need modify the code any more.**
### Declare dependency
1. Add dependency spring-cloud-starter-stream-rocketmq in the pom.xml file in your Spring Cloud project.
Add dependency spring-cloud-starter-stream-rocketmq to the `pom.xml` file in your Spring Cloud project.
```xml
<dependency>
@ -41,7 +55,17 @@ Before we start the demo, let's learn how to Integration with RocketMQ Binder to
</dependency>
```
2. Configure Input and Output Binding and cooperate with `@EnableBinding` annotation
## Simple example
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
```
### Integration with RocketMQ Binder
Configure Input and Output Binding and cooperate with `@EnableBinding` annotation
```java
@SpringBootApplication
@ -66,32 +90,6 @@ spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=test-group
```
3. pub/sub messages
### Download and Startup RocketMQ
You should startup Name Server and Broker before using RocketMQ Binder.
1. Download [RocketMQ](https://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip) and unzip it.
2. Startup Name Server
```bash
sh bin/mqnamesrv
```
3. Startup Broker
```bash
sh bin/mqbroker -n localhost:9876
```
4. Create topic: test-topic
```bash
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
```
### Start Application
@ -101,7 +99,7 @@ sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
spring.application.name=rocketmq-example
server.port=28081
```
2. Start the application in IDE or by building a fatjar.
1. Start in IDE: Find main class `RocketMQApplication`, and execute the main method.
@ -192,6 +190,508 @@ public class ReceiveService {
}
```
## Broadcasting exmaple
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t broadcast
```
### Producer
**application.yml**
```yaml
server:
port: 28085
spring:
application:
name: rocketmq-broadcast-producer-example
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
bindings:
producer-out-0:
destination: broadcast
logging:
level:
org.springframework.context.support: debug
```
**code**
Use `ApplicationRunner` and `StreamBridge` to send messages.
```java
@SpringBootApplication
public class RocketMQBroadcastProducerApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQBroadcastProducerApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
}
@Bean
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
}
```
### Consumer
Startup two consumers.
#### Consumer1
**application.yml**
```yaml
server:
port: 28084
spring:
application:
name: rocketmq-broadcast-consumer1-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
consumer-in-0:
consumer:
messageModel: BROADCASTING
bindings:
consumer-in-0:
destination: broadcast
group: broadcast-consumer
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQBroadcastConsumer1Application {
private static final Logger log = LoggerFactory
.getLogger(RocketMQBroadcastConsumer1Application.class);
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args);
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
```
#### Consumer2
**application.yml**
```yaml
server:
port: 28083
spring:
application:
name: rocketmq-broadcast-consumer2-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
consumer-in-0:
consumer:
messageModel: BROADCASTING
bindings:
consumer-in-0:
destination: broadcast
group: broadcast-consumer
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQBroadcastConsumer2Application {
private static final Logger log = LoggerFactory
.getLogger(RocketMQBroadcastConsumer2Application.class);
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
```
## Order example
RocketMQ provides ordered messages using FIFO order.
There are two types of ordered messages.
* Global: For a specified topic, all messages are published and consumed in strict FIFO (First In First Out) order.
* Partition: For a specified topic, all messages are partitioned according to the `Sharding Key`. Messages within the same partition are published and consumed in strict FIFO order. `Sharding Key` is a key field used to distinguish different partitions in sequential messages, and it is a completely different concept from the Key of ordinary messages.
### Create Topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
```
### Example code
**application.yml**
```yaml
server:
port: 28082
spring:
application:
name: rocketmq-orderly-consume-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
# 定义messageSelector
messageQueueSelector: orderlyMessageQueueSelector
consumer-in-0:
consumer:
# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
subscription: 'TagA || TagC || TagD'
push:
orderly: true
bindings:
producer-out-0:
destination: orderly
consumer-in-0:
destination: orderly
group: orderly-consumer
logging:
level:
org.springframework.context.support: debug
```
**MessageQueueSelector**
Choose a partition selection algorithm for you, and ensure that the same parameters get the same results.
```java
@Component
public class OrderlyMessageQueueSelector implements MessageQueueSelector {
private static final Logger log = LoggerFactory
.getLogger(OrderlyMessageQueueSelector.class);
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size();
return mqs.get(index);
}
}
```
**Producer&Consumer**
```java
@SpringBootApplication
public class RocketMQOrderlyConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQOrderlyConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
/***
* tag array.
*/
public static final String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
public static void main(String[] args) {
SpringApplication.run(RocketMQOrderlyConsumeApplication.class, args);
}
@Bean
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
MessageConst.PROPERTY_TAGS).toString();
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
msg.getHeaders().get(tagHeaderKey).toString());
try {
Thread.sleep(100);
}
catch (InterruptedException ignored) {
}
};
}
}
```
## Schedule example
Scheduled messages differ from normal messages in that they wont be delivered until a provided time later.
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t delay
```
### Example code
**application.yml**
```yaml
server:
port: 28086
spring:
application:
name: rocketmq-delay-consume-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
bindings:
producer-out-0:
destination: delay
consumer-in-0:
destination: delay
group: delay-group
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQDelayConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQDelayConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
}
@Bean
public ApplicationRunner producerDelay() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
// Set the delay level 1~10
headers.put("DELAY", 2);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public ApplicationRunner producerSchedule() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
// Sending scheduled message, you need to set the delay time in milliseconds (ms). The message will be delivered after the specified delay time, for example, the message will be delivered after 3 seconds.
long delayTime = System.currentTimeMillis() + 3000;
headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
```
## Filter example
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t sql
```
### Example code
**application.yml**
RocketMQ stream binder supports filter by tag or sql, just setting `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.subscription`.
Tag example: `tag:red || blue`
Sql example: `sql:(color in ('red1', 'red2', 'red4') and price>3)`
More: [Filter](https://rocketmq.apache.org/docs/filter-by-sql92-example/)
```yaml
server:
port: 28087
spring:
application:
name: rocketmq-sql-consume-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
consumer-in-0:
consumer:
# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)
bindings:
producer-out-0:
destination: sql
consumer-in-0:
destination: sql
group: sql-group
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQSqlConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQSqlConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQSqlConsumeApplication.class, args);
}
/**
* color array.
*/
public static final String[] color = new String[] {"red1", "red2", "red3", "red4", "red5"};
/**
* price array.
*/
public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5};
@Bean
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put("color", color[i % color.length]);
headers.put("price", price[i % price.length]);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
String colorHeaderKey = "color";
String priceHeaderKey = "price";
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " COLOR:" +
msg.getHeaders().get(colorHeaderKey).toString() + " " +
"PRICE: " + msg.getHeaders().get(priceHeaderKey).toString());
};
}
}
```
## Endpoint
Add dependency `spring-cloud-starter-stream-rocketmq` to your pom.xml file, and configure your endpoint security strategy.

Loading…
Cancel
Save