This example illustrates how to use RocketMQ Binder implement pub/sub messages for Spring Cloud applications.
[RocketMQ](https://rocketmq.apache.org/) is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
Before we start the demo, let's look at Spring Cloud Stream.
Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.
There are two concepts in Spring Cloud Stream: Binder 和 Binding.
* Binder: A strategy interface used to bind an app interface to a logical name.
Binder Implementations includes `KafkaMessageChannelBinder` of kafka, `RabbitMessageChannelBinder` of RabbitMQ and `RocketMQMessageChannelBinder` of `RocketMQ`.
* Binding: Including Input Binding and Output Binding.
Binding is Bridge between the external messaging systems and application provided Producers and Consumers of messages.
* input1: subscribe the message of `test-topic` topic and consume ordered messages(all messages should in the same MessageQueue if you want to consume ordered messages).
* input2: subscribe the message of `test-topic` topic and consume concurrent messages which tags are `tagStr`, the thread number in pool is 20 in Consumer side.
log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
```
## Order example
RocketMQ provides ordered messages using FIFO order.
There are two types of ordered messages.
* Global: For a specified topic, all messages are published and consumed in strict FIFO (First In First Out) order.
* Partition: For a specified topic, all messages are partitioned according to the `Sharding Key`. Messages within the same partition are published and consumed in strict FIFO order. `Sharding Key` is a key field used to distinguish different partitions in sequential messages, and it is a completely different concept from the Key of ordinary messages.
### Create Topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
Refer to [Transaction Example](https://rocketmq.apache.org/docs/transaction-example/).
> It can be thought of as a two-phase commit message implementation to ensure eventual consistency in distributed system. Transactional message ensures that the execution of local transaction and the sending of message can be performed atomically.
### Application
Refer to https://rocketmq.apache.org/
> 1、 Transactional status
>
> There are three states for transactional message:
> (1) TransactionStatus.CommitTransaction: commit transaction,it means that allow consumers to consume this message.
> (2) TransactionStatus.RollbackTransaction: rollback transaction,it means that the message will be deleted and not allowed to consume.
> (3) TransactionStatus.Unknown: intermediate state,it means that MQ is needed to check back to determine the status.
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t tx
```
### Example code
**application.yml**
```yaml
server:
port: 28088
spring:
application:
name: rocketmq-tx-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
transactionListener: myTransactionListener
producerType: Trans
bindings:
producer-out-0:
destination: tx
consumer-in-0:
destination: tx
group: tx-group
logging:
level:
org.springframework.context.support: debug
```
**TransactionListenerImpl**
To execute local transaction.
```java
@Component("myTransactionListener")
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = msg.getProperty("test");
if ("1".equals(num)) {
System.out.println("executer: " + new String(msg.getBody()) + " unknown");
return LocalTransactionState.UNKNOW;
}
else if ("2".equals(num)) {
System.out.println("executer: " + new String(msg.getBody()) + " rollback");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println("executer: " + new String(msg.getBody()) + " commit");
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("check: " + new String(msg.getBody()));
- Retry consumption message: According to the configured number of re-consumption, the server will re-push the message according to whether the client's consumption is successful or not.
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t retrieable
```
### Example code
**application.yml**
```yaml
server:
port: 28089
spring:
application:
name: rocketmq-retrieable-consume-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
consumer-in-0:
consumer:
## According to the configured number of `max-reconsume-times`,
## the server will re-push the message according to whether the client's consumption is successful or not
push:
max-reconsume-times: 3
bindings:
producer-out-0:
destination: retrieable
consumer-in-0:
destination: retrieable
group: retrieable-consumer
logging:
level:
org.springframework.context.support: debug
```
**code**
```java
@SpringBootApplication
public class RocketMQRetrieableConsumeApplication {
Note: You should add [metrics-core dependency](https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core) if you want to see metrics data. endpoint will show warning information if you don't add that dependency:
```json
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
```
## More
For more information about RocketMQ, see [RocketMQ Project](https://rocketmq.apache.org).