Fix RocketMQ pollable and sql example (#3385)

* Fix RocketMQ pollable and sql example
pull/3387/head
RuanSheng 2 years ago committed by GitHub
parent be819bbb3e
commit 1dbdceb695
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -681,6 +681,17 @@ public class RocketMQSqlConsumeApplication {
}
```
#### 常见问题
- MQClientException: The broker does not support consumer to filter message by SQL92
1. 修改 RocketMQ 服务端配置文件。
`conf/2m-2s-async/broker-a.properties` 配置文件末尾添加 `enablePropertyFilter=true`
2. 重启 mqbroker 并指定配置文件。
`mqbroker` 启动时指定配置文件:`conf/2m-2s-async/broker-a.properties`,例如:
```shell
bin/mqbroker -n 127.0.0.1:9876 -c conf/2m-2s-async/broker-a.properties autoCreateTopicEnable=true
```
## 事务消息示例
### 什么是事务消息?

@ -675,6 +675,17 @@ public class RocketMQSqlConsumeApplication {
}
```
#### 常见问题
- MQClientException: The broker does not support consumer to filter message by SQL92
1. Modify RocketMQ server configuration file.
In the `conf/2m-2s-async/broker-a.properties` configuration file, add `enablePropertyFilter=true`.
2. Restart mqbroker and specify the configuration file.
Specify the configuration file when `mqbroker` starts: `conf/2m-2s-async/broker-a.properties`, for example:
```shell
bin/mqbroker -n 127.0.0.1:9876 -c conf/2m-2s-async/broker-a.properties autoCreateTopicEnable=true
```
## Transaction example
### What is transactional message?

@ -30,6 +30,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.messaging.Message;
@ -48,7 +49,24 @@ public class RocketMQPollableConsumeApplication {
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQPollableConsumeApplication.class, args);
ConfigurableApplicationContext context = SpringApplication.run(RocketMQPollableConsumeApplication.class, args);
PollableMessageSource destIn = context.getBean(PollableMessageSource.class);
new Thread(() -> {
while (true) {
try {
if (!destIn.poll((m) -> {
SimpleMsg newPayload = (SimpleMsg) m.getPayload();
System.out.println(newPayload.getMsg());
}, new ParameterizedTypeReference<SimpleMsg>() {
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
}).start();
}
@Bean
@ -65,22 +83,4 @@ public class RocketMQPollableConsumeApplication {
};
}
@Bean
public ApplicationRunner pollableRunner(PollableMessageSource destIn) {
return args -> {
while (true) {
try {
if (!destIn.poll((m) -> {
SimpleMsg newPayload = (SimpleMsg) m.getPayload();
System.out.println(newPayload.getMsg());
}, new ParameterizedTypeReference<SimpleMsg>() { })) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}
}

Loading…
Cancel
Save