This example illustrates how to use RocketMQ Binder implement pub/sub messages for Spring Cloud applications.
[RocketMQ](https://rocketmq.apache.org/) is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
Before we start the demo, let's look at Spring Cloud Stream.
Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.
There are two concepts in Spring Cloud Stream: Binder 和 Binding.
* Binder: A strategy interface used to bind an app interface to a logical name.
Binder Implementations includes `KafkaMessageChannelBinder` of kafka, `RabbitMessageChannelBinder` of RabbitMQ and `RocketMQMessageChannelBinder` of `RocketMQ`.
* Binding: Including Input Binding and Output Binding.
Binding is Bridge between the external messaging systems and application provided Producers and Consumers of messages.
2. Start the application in IDE or by building a fatjar.
1. Start in IDE: Find main class `RocketMQApplication`, and execute the main method.
2. Build a fatjar: Execute command `mvn clean package` to build a fatjar, and run command `java -jar rocketmq-example.jar` to start the application.
### Message Handle
Using the binding named output and sent messages to `test-topic` topic.
And using two input bindings to subscribe messages.
* input1: subscribe the message of `test-topic` topic and consume ordered messages(all messages should in the same MessageQueue if you want to consuming ordered messages).
* input2: subscribe the message of `test-topic` topic and consume concurrent messages which tags is `tagStr`, the thread number in pool is 20 in Consumer side.
log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}
```
## Order example
RocketMQ provides ordered messages using FIFO order.
There are two types of ordered messages.
* Global: For a specified topic, all messages are published and consumed in strict FIFO (First In First Out) order.
* Partition: For a specified topic, all messages are partitioned according to the `Sharding Key`. Messages within the same partition are published and consumed in strict FIFO order. `Sharding Key` is a key field used to distinguish different partitions in sequential messages, and it is a completely different concept from the Key of ordinary messages.
### Create Topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
Refer to [Transaction Example](https://rocketmq.apache.org/docs/transaction-example/).
> It can be thought of as a two-phase commit message implementation to ensure eventual consistency in distributed system. Transactional message ensures that the execution of local transaction and the sending of message can be performed atomically.
### Application
Refer to https://rocketmq.apache.org/
> 1、 Transactional status
>
> There are three states for transactional message:
> (1) TransactionStatus.CommitTransaction: commit transaction,it means that allow consumers to consume this message.
> (2) TransactionStatus.RollbackTransaction: rollback transaction,it means that the message will be deleted and not allowed to consume.
> (3) TransactionStatus.Unknown: intermediate state,it means that MQ is needed to check back to determine the status.
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t tx
```
### Example code
**application.yml**
```yaml
server:
port: 28088
spring:
application:
name: rocketmq-tx-example
cloud:
stream:
function:
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
transactionListener: myTransactionListener
producerType: Trans
bindings:
producer-out-0:
destination: tx
consumer-in-0:
destination: tx
group: tx-group
logging:
level:
org.springframework.context.support: debug
```
**TransactionListenerImpl**
To execute local transaction.
```java
@Component("myTransactionListener")
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = msg.getProperty("test");
if ("1".equals(num)) {
System.out.println("executer: " + new String(msg.getBody()) + " unknown");
return LocalTransactionState.UNKNOW;
}
else if ("2".equals(num)) {
System.out.println("executer: " + new String(msg.getBody()) + " rollback");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
System.out.println("executer: " + new String(msg.getBody()) + " commit");
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("check: " + new String(msg.getBody()));
Note: You should add [metrics-core dependency](https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core) if you want to see metrics data. endpoint will show warning information if you don't add that dependency:
```json
{
"warning": "please add metrics-core dependency, we use it for metrics"
}
```
## More
For more information about RocketMQ, see [RocketMQ Project](https://rocketmq.apache.org).
If you have any ideas or suggestions for Spring Cloud RocketMQ Binder, please don't hesitate to tell us by submitting github issues.