From 74f1df7691a46ae368549928ffded28d87428c83 Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Sat, 9 Apr 2022 16:28:37 +0800
Subject: [PATCH 01/10] some rocketmq example.

---
 spring-cloud-alibaba-examples/pom.xml         |  7 ++
 .../pom.xml                                   | 53 ++++++++++++
 ...RocketMQBroadcastConsumer1Application.java | 47 +++++++++++
 .../src/main/resources/application.yml        | 23 ++++++
 .../pom.xml                                   | 52 ++++++++++++
 ...RocketMQBroadcastConsumer2Application.java | 46 +++++++++++
 .../src/main/resources/application.yml        | 23 ++++++
 .../pom.xml                                   | 52 ++++++++++++
 .../RocketMQBroadcastConsumeApplication.java  | 60 ++++++++++++++
 .../src/main/resources/application.yml        | 22 +++++
 .../rocketmq-delay-consume-example/pom.xml    | 55 +++++++++++++
 .../RocketMQDelayConsumeApplication.java      | 67 +++++++++++++++
 .../src/main/resources/application.yml        | 25 ++++++
 .../rocketmq-orderly-consume-example/pom.xml  | 52 ++++++++++++
 .../orderly/OrderlyMessageQueueSelector.java  | 41 ++++++++++
 .../RocketMQOrderlyConsumeApplication.java    | 82 +++++++++++++++++++
 .../src/main/resources/application.yml        | 33 ++++++++
 .../rocketmq-sql-consume-example/pom.xml      | 55 +++++++++++++
 .../sql/RocketMQSqlConsumeApplication.java    | 82 +++++++++++++++++++
 .../src/main/resources/application.yml        | 29 +++++++
 20 files changed, 906 insertions(+)
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml

diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml
index 736c7adc8..6e66e3f3c 100644
--- a/spring-cloud-alibaba-examples/pom.xml
+++ b/spring-cloud-alibaba-examples/pom.xml
@@ -35,6 +35,13 @@
         <module>rocketmq-example/rocketmq-consume-example</module>
         <module>rocketmq-example/rocketmq-produce-example</module>
         <module>rocketmq-example/rocketmq-comprehensive-example</module>
+        <module>rocketmq-example/rocketmq-orderly-consume-example</module>
+        <module>rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example</module>
+        <module>rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example</module>
+        <module>rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example</module>
+        <module>rocketmq-example/rocketmq-delay-consume-example</module>
+        <module>rocketmq-example/rocketmq-sql-consume-example</module>
+
         <module>spring-cloud-bus-rocketmq-example</module>
         <module>spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example</module>
         <module>spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-consul-example</module>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
new file mode 100644
index 000000000..4f9cb958f
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>com.alibaba.cloud</groupId>
+        <artifactId>spring-cloud-alibaba-examples</artifactId>
+        <version>${revision}</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <artifactId>rocketmq-broadcast-consumer2-example</artifactId>
+    <name>Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Cosumer1</name>
+    <description>Example demonstrating how to broadcast consumption</description>
+
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-json</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>${maven-deploy-plugin.version}</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java
new file mode 100644
index 000000000..053065b99
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQBroadcastConsumer1Application {
+	private static final Logger log = LoggerFactory
+			.getLogger(RocketMQBroadcastConsumer1Application.class);
+
+	public static void main(String[] args) {
+		SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args);
+	}
+
+	@Bean
+	public Consumer<Message<String>> consumer() {
+		return msg -> {
+			log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload());
+		};
+	}
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml
new file mode 100644
index 000000000..598cecb48
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml
@@ -0,0 +1,23 @@
+server:
+  port: 28084
+spring:
+  application:
+    name: rocketmq-broadcast-consumer1-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: 192.168.0.200:9876
+        bindings:
+          consumer-in-0:
+            consumer:
+              messageModel: BROADCASTING
+      bindings:
+        consumer-in-0:
+          destination: broadcast
+          group: broadcast-consumer
+logging:
+  level:
+    org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
new file mode 100644
index 000000000..9bcd14626
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>com.alibaba.cloud</groupId>
+        <artifactId>spring-cloud-alibaba-examples</artifactId>
+        <version>${revision}</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <artifactId>rocketmq-broadcast-consumer1-example</artifactId>
+    <name>Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Consumer2</name>
+    <description>Example demonstrating how to broadcast consumption</description>
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-json</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>${maven-deploy-plugin.version}</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java
new file mode 100644
index 000000000..cbebc43cd
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQBroadcastConsumer2Application {
+	private static final Logger log = LoggerFactory
+			.getLogger(RocketMQBroadcastConsumer2Application.class);
+
+	public static void main(String[] args) {
+		SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
+	}
+	@Bean
+	public Consumer<Message<String>> consumer() {
+		return msg -> {
+			log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload());
+		};
+	}
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml
new file mode 100644
index 000000000..5060f2378
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml
@@ -0,0 +1,23 @@
+server:
+  port: 28083
+spring:
+  application:
+    name: rocketmq-broadcast-consumer2-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: 192.168.0.200:9876
+        bindings:
+          consumer-in-0:
+            consumer:
+              messageModel: BROADCASTING
+      bindings:
+        consumer-in-0:
+          destination: broadcast
+          group: broadcast-consumer
+logging:
+  level:
+    org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml
new file mode 100644
index 000000000..3aad09fe0
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>com.alibaba.cloud</groupId>
+        <artifactId>spring-cloud-alibaba-examples</artifactId>
+        <version>${revision}</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <artifactId>rocketmq-broadcast-producer-example</artifactId>
+    <name>Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Producer</name>
+    <description>Example demonstrating how to use rocketmq to produce</description>
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-json</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>${maven-deploy-plugin.version}</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java
new file mode 100644
index 000000000..3832d85da
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+
+
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQBroadcastConsumeApplication {
+	private static final Logger log = LoggerFactory
+			.getLogger(RocketMQBroadcastConsumeApplication.class);
+
+	public static void main(String[] args) {
+		SpringApplication.run(RocketMQBroadcastConsumeApplication.class, args);
+	}
+
+	@Bean
+	public Supplier<Flux<Message<String>>> producer() {
+		return () -> {
+			return Flux.range(0, 100).map(i -> {
+				String key = "KEY" + i;
+				Map<String, Object> headers = new HashMap<>();
+				headers.put(MessageConst.PROPERTY_KEYS, key);
+				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+				Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
+				return msg;
+			}).log();
+		};
+	}
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
new file mode 100644
index 000000000..63ea4d29c
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
@@ -0,0 +1,22 @@
+server:
+  port: 28085
+spring:
+  application:
+    name: rocketmq-broadcast-producer-example
+  cloud:
+    stream:
+      function:
+        definition: producer;
+      rocketmq:
+        binder:
+          name-server: 192.168.0.200:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+      bindings:
+        producer-out-0:
+          destination: broadcast
+logging:
+  level:
+    org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml
new file mode 100644
index 000000000..dedbe275a
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>com.alibaba.cloud</groupId>
+        <artifactId>spring-cloud-alibaba-examples</artifactId>
+        <version>${revision}</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <artifactId>rocketmq-delay-consume-example</artifactId>
+    <name>Spring Cloud Starter Stream Alibaba RocketMQ Delay Consume Example</name>
+    <description>Example demonstrating how to use rocketmq to delay consume</description>
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>${maven-deploy-plugin.version}</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
new file mode 100644
index 000000000..529530746
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQDelayConsumeApplication {
+	private static final Logger log = LoggerFactory
+			.getLogger(RocketMQDelayConsumeApplication.class);
+
+	public static void main(String[] args) {
+		SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
+	}
+
+	@Bean
+	public Supplier<Flux<Message<String>>> producer() {
+		return () -> {
+			return Flux.range(0, 100).map(i -> {
+				String key = "KEY" + i;
+				Map<String, Object> headers = new HashMap<>();
+				headers.put(MessageConst.PROPERTY_KEYS, key);
+				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+				headers.put("DELAY", 2);
+				Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
+				return msg;
+			}).log();
+		};
+	}
+
+	@Bean
+	public Consumer<Message<String>> consumer() {
+		return msg -> {
+			log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload());
+		};
+	}
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
new file mode 100644
index 000000000..bffd72cf0
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
@@ -0,0 +1,25 @@
+server:
+  port: 28086
+spring:
+  application:
+    name: rocketmq-delay-consume-example
+  cloud:
+    stream:
+      function:
+        definition: producer;consumer;
+      rocketmq:
+        binder:
+          name-server: 192.168.0.200:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+      bindings:
+        producer-out-0:
+          destination: delay
+        consumer-in-0:
+          destination: delay
+          group: delay-group
+logging:
+  level:
+    org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml
new file mode 100644
index 000000000..14d80e313
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>com.alibaba.cloud</groupId>
+        <artifactId>spring-cloud-alibaba-examples</artifactId>
+        <version>${revision}</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <artifactId>rocketmq-orderly-consume-example</artifactId>
+    <name>Spring Cloud Starter Stream Alibaba RocketMQ Orderly Consume Example</name>
+    <description>Example demonstrating how to use rocketmq to produce, and consume orderly</description>
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-json</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>${maven-deploy-plugin.version}</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
new file mode 100644
index 000000000..e1b275838
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.orderly;
+
+import java.util.List;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * @author sorie
+ */
+@Component
+public class OrderlyMessageQueueSelector implements MessageQueueSelector {
+	@Override
+	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+		Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+		int index = id % Math.min(mqs.size(), RocketMQOrderlyConsumeApplication.tags.length);
+		return mqs.get(index);
+	}
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java
new file mode 100644
index 000000000..53dc15a40
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.orderly;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQOrderlyConsumeApplication {
+	private static final Logger log = LoggerFactory
+			.getLogger(RocketMQOrderlyConsumeApplication.class);
+
+	/***
+	 * tag array.
+	 */
+	public static final String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
+
+	public static void main(String[] args) {
+		SpringApplication.run(RocketMQOrderlyConsumeApplication.class, args);
+	}
+
+	@Bean
+	public Supplier<Flux<Message<String>>> producer() {
+		return () -> {
+			return Flux.range(0, 100).map(i -> {
+				String key = "KEY" + i;
+				Map<String, Object> headers = new HashMap<>();
+				headers.put(MessageConst.PROPERTY_KEYS, key);
+				headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
+				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+				Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
+				return msg;
+			}).log();
+		};
+	}
+
+	@Bean
+	public Consumer<Message<String>> consumer() {
+		return msg -> {
+			String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
+					MessageConst.PROPERTY_TAGS).toString();
+			log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " TAG:" +
+					msg.getHeaders().get(tagHeaderKey).toString());
+			try {
+				Thread.sleep(100);
+			}
+			catch (InterruptedException ignored) {
+			}
+		};
+	}
+
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml
new file mode 100644
index 000000000..a7eb232b0
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml
@@ -0,0 +1,33 @@
+server:
+  port: 28082
+spring:
+  application:
+    name: rocketmq-orderly-consume-example
+  cloud:
+    stream:
+      function:
+        definition: producer;consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+              messageQueueSelector: orderlyMessageQueueSelector
+          consumer-in-0:
+            consumer:
+              # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+              subscription: 'TagA || TagC || TagD'
+              push:
+                orderly: true
+      bindings:
+        producer-out-0:
+          destination: orderly
+        consumer-in-0:
+          destination: orderly
+          group: orderly-consumer
+
+logging:
+  level:
+    org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml
new file mode 100644
index 000000000..534d05021
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>com.alibaba.cloud</groupId>
+        <artifactId>spring-cloud-alibaba-examples</artifactId>
+        <version>${revision}</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <artifactId>rocketmq-sql-consume-example</artifactId>
+    <name>Spring Cloud Starter Stream Alibaba RocketMQ Sql Consume Example</name>
+    <description>Example demonstrating how to use rocketmq to filter message by sql</description>
+    <packaging>jar</packaging>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>${maven-deploy-plugin.version}</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
new file mode 100644
index 000000000..fea68af68
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQSqlConsumeApplication {
+	private static final Logger log = LoggerFactory
+			.getLogger(RocketMQSqlConsumeApplication.class);
+
+	public static void main(String[] args) {
+		SpringApplication.run(RocketMQSqlConsumeApplication.class, args);
+	}
+
+	/**
+	 * color array.
+	 */
+	public static final String[] color = new String[] {"red1", "red2", "red3", "red4", "red5"};
+
+	/**
+	 * price array.
+	 */
+	public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5};
+
+	@Bean
+	public Supplier<Flux<Message<String>>> producer() {
+		return () -> {
+			return Flux.range(0, 100).map(i -> {
+				String key = "KEY" + i;
+				Map<String, Object> headers = new HashMap<>();
+				headers.put(MessageConst.PROPERTY_KEYS, key);
+				headers.put("color", color[i % color.length]);
+				headers.put("price", price[i % price.length]);
+				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+				Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
+				return msg;
+			}).log();
+		};
+	}
+
+	@Bean
+	public Consumer<Message<String>> consumer() {
+		return msg -> {
+			String colorHeaderKey = "color";
+			String priceHeaderKey = "price";
+			log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " COLOR:" +
+					msg.getHeaders().get(colorHeaderKey).toString() + " " +
+					"PRICE: " + msg.getHeaders().get(priceHeaderKey).toString());
+		};
+	}
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
new file mode 100644
index 000000000..a679d7b57
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
@@ -0,0 +1,29 @@
+server:
+  port: 28087
+spring:
+  application:
+    name: rocketmq-sql-consume-example
+  cloud:
+    stream:
+      function:
+        definition: producer;consumer;
+      rocketmq:
+        binder:
+          name-server: 192.168.0.200:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+          consumer-in-0:
+            consumer:
+#             tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+              subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)
+      bindings:
+        producer-out-0:
+          destination: sql
+        consumer-in-0:
+          destination: sql
+          group: sql-group
+logging:
+  level:
+    org.springframework.context.support: debug

From a7f21a9b94b79ee6e04b34ec9b4c264baf6cec68 Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Mon, 11 Apr 2022 20:36:06 +0800
Subject: [PATCH 02/10] fix nameserver address prob.

---
 .gitignore                                                      | 1 +
 .../src/main/resources/application.yml                          | 2 +-
 .../src/main/resources/application.yml                          | 2 +-
 .../src/main/resources/application.yml                          | 2 +-
 .../src/main/resources/application.yml                          | 2 +-
 .../src/main/resources/application.yml                          | 2 +-
 6 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/.gitignore b/.gitignore
index 459a01ca3..c90f66d27 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,3 +43,4 @@ target
 
 # AsciiDoc
 spring-cloud-alibaba-docs/**/*.html
+**/eclipse/*
\ No newline at end of file
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml
index 598cecb48..f072f9178 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml
@@ -9,7 +9,7 @@ spring:
         definition: consumer;
       rocketmq:
         binder:
-          name-server: 192.168.0.200:9876
+          name-server: localhost:9876
         bindings:
           consumer-in-0:
             consumer:
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml
index 5060f2378..d6a7cf42f 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml
@@ -9,7 +9,7 @@ spring:
         definition: consumer;
       rocketmq:
         binder:
-          name-server: 192.168.0.200:9876
+          name-server: localhost:9876
         bindings:
           consumer-in-0:
             consumer:
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
index 63ea4d29c..e1945ab90 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
@@ -9,7 +9,7 @@ spring:
         definition: producer;
       rocketmq:
         binder:
-          name-server: 192.168.0.200:9876
+          name-server: localhost:9876
         bindings:
           producer-out-0:
             producer:
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
index bffd72cf0..25ed6dd9b 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
@@ -9,7 +9,7 @@ spring:
         definition: producer;consumer;
       rocketmq:
         binder:
-          name-server: 192.168.0.200:9876
+          name-server: localhost:9876
         bindings:
           producer-out-0:
             producer:
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
index a679d7b57..2ff74a231 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
@@ -9,7 +9,7 @@ spring:
         definition: producer;consumer;
       rocketmq:
         binder:
-          name-server: 192.168.0.200:9876
+          name-server: localhost:9876
         bindings:
           producer-out-0:
             producer:

From 49507f56c8ec42cc285d5e3a04e7468e249ce5e9 Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Mon, 11 Apr 2022 20:36:57 +0800
Subject: [PATCH 03/10] fix Application name prob.

---
 ...ation.java => RocketMQBroadcastProducerApplication.java} | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
 rename spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/{RocketMQBroadcastConsumeApplication.java => RocketMQBroadcastProducerApplication.java} (90%)

diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java
similarity index 90%
rename from spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java
rename to spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java
index 3832d85da..692fac32c 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java
@@ -36,12 +36,12 @@ import org.springframework.messaging.support.GenericMessage;
  * @author sorie
  */
 @SpringBootApplication
-public class RocketMQBroadcastConsumeApplication {
+public class RocketMQBroadcastProducerApplication {
 	private static final Logger log = LoggerFactory
-			.getLogger(RocketMQBroadcastConsumeApplication.class);
+			.getLogger(RocketMQBroadcastProducerApplication.class);
 
 	public static void main(String[] args) {
-		SpringApplication.run(RocketMQBroadcastConsumeApplication.class, args);
+		SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
 	}
 
 	@Bean

From 2da4cd15004fd918c944839328bf25d4d8fd3fa5 Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Mon, 11 Apr 2022 20:39:50 +0800
Subject: [PATCH 04/10] fix Rocketmq example artifactId prob.

---
 .../rocketmq-broadcast-consumer1-example/pom.xml                | 2 +-
 .../rocketmq-broadcast-consumer2-example/pom.xml                | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
index 4f9cb958f..d4894c03a 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
@@ -11,7 +11,7 @@
     <modelVersion>4.0.0</modelVersion>
 
 
-    <artifactId>rocketmq-broadcast-consumer2-example</artifactId>
+    <artifactId>rocketmq-broadcast-consumer1-example</artifactId>
     <name>Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Cosumer1</name>
     <description>Example demonstrating how to broadcast consumption</description>
 
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
index 9bcd14626..ecd738686 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
@@ -11,7 +11,7 @@
     <modelVersion>4.0.0</modelVersion>
 
 
-    <artifactId>rocketmq-broadcast-consumer1-example</artifactId>
+    <artifactId>rocketmq-broadcast-consumer2-example</artifactId>
     <name>Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Consumer2</name>
     <description>Example demonstrating how to broadcast consumption</description>
     <packaging>jar</packaging>

From 091f3a4e1270eff9867eaf28d2a345d0b9ff24be Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Mon, 11 Apr 2022 22:38:31 +0800
Subject: [PATCH 05/10] Refactor Rocketmq: use StreamBridge.send() instead of
 functional producer

---
 spring-cloud-alibaba-examples/pom.xml         |  1 +
 .../pom.xml                                   |  5 +++
 ...RocketMQBroadcastConsumer1Application.java |  5 ++-
 .../pom.xml                                   |  5 +++
 ...RocketMQBroadcastConsumer2Application.java |  6 ++-
 .../pom.xml                                   |  5 +++
 .../RocketMQBroadcastProducerApplication.java | 22 ++++++----
 .../src/main/resources/application.yml        |  2 -
 .../rocketmq-delay-consume-example/pom.xml    |  5 +++
 .../RocketMQDelayConsumeApplication.java      | 41 ++++++++++++++-----
 .../src/main/resources/application.yml        |  2 +-
 .../rocketmq-example-common/pom.xml           | 19 +++++++++
 .../cloud/examples/common/SimpleMsg.java      | 37 +++++++++++++++++
 .../src/main/resources/application.yml        | 29 +++++++++++++
 .../rocketmq-orderly-consume-example/pom.xml  |  5 +++
 .../orderly/OrderlyMessageQueueSelector.java  |  7 +++-
 .../RocketMQOrderlyConsumeApplication.java    | 25 ++++++-----
 .../src/main/resources/application.yml        |  2 +-
 .../rocketmq-sql-consume-example/pom.xml      |  6 ++-
 .../sql/RocketMQSqlConsumeApplication.java    | 25 ++++++-----
 .../src/main/resources/application.yml        |  2 +-
 21 files changed, 205 insertions(+), 51 deletions(-)
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/pom.xml
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/java/com/alibaba/cloud/examples/common/SimpleMsg.java
 create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/resources/application.yml

diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml
index 6e66e3f3c..0f030bcdd 100644
--- a/spring-cloud-alibaba-examples/pom.xml
+++ b/spring-cloud-alibaba-examples/pom.xml
@@ -41,6 +41,7 @@
         <module>rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example</module>
         <module>rocketmq-example/rocketmq-delay-consume-example</module>
         <module>rocketmq-example/rocketmq-sql-consume-example</module>
+        <module>rocketmq-example/rocketmq-example-common</module>
 
         <module>spring-cloud-bus-rocketmq-example</module>
         <module>spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example</module>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
index d4894c03a..156055b6b 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
@@ -31,6 +31,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-json</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>rocketmq-example-common</artifactId>
+            <version>${revision}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java
index 053065b99..e37fc0aa4 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java
@@ -18,6 +18,7 @@ package com.alibaba.cloud.examples.broadcast;
 
 import java.util.function.Consumer;
 
+import com.alibaba.cloud.examples.common.SimpleMsg;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,9 +40,9 @@ public class RocketMQBroadcastConsumer1Application {
 	}
 
 	@Bean
-	public Consumer<Message<String>> consumer() {
+	public Consumer<Message<SimpleMsg>> consumer() {
 		return msg -> {
-			log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload());
+			log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
 		};
 	}
 }
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
index ecd738686..156112602 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
@@ -30,6 +30,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-json</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>rocketmq-example-common</artifactId>
+            <version>${revision}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java
index cbebc43cd..cf5f88484 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java
@@ -18,6 +18,7 @@ package com.alibaba.cloud.examples.broadcast;
 
 import java.util.function.Consumer;
 
+import com.alibaba.cloud.examples.common.SimpleMsg;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,10 +38,11 @@ public class RocketMQBroadcastConsumer2Application {
 	public static void main(String[] args) {
 		SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
 	}
+
 	@Bean
-	public Consumer<Message<String>> consumer() {
+	public Consumer<Message<SimpleMsg>> consumer() {
 		return msg -> {
-			log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload());
+			log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
 		};
 	}
 }
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml
index 3aad09fe0..f410675f0 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml
@@ -30,6 +30,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-json</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>rocketmq-example-common</artifactId>
+            <version>${revision}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java
index 692fac32c..9f52ea911 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java
@@ -18,15 +18,18 @@ package com.alibaba.cloud.examples.broadcast;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.function.Supplier;
 
+import com.alibaba.cloud.examples.common.SimpleMsg;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
 
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.function.StreamBridge;
 import org.springframework.context.annotation.Bean;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.support.GenericMessage;
@@ -39,22 +42,23 @@ import org.springframework.messaging.support.GenericMessage;
 public class RocketMQBroadcastProducerApplication {
 	private static final Logger log = LoggerFactory
 			.getLogger(RocketMQBroadcastProducerApplication.class);
-
+	@Autowired
+	private StreamBridge streamBridge;
 	public static void main(String[] args) {
 		SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
 	}
 
 	@Bean
-	public Supplier<Flux<Message<String>>> producer() {
-		return () -> {
-			return Flux.range(0, 100).map(i -> {
+	public ApplicationRunner producer() {
+		return args -> {
+			for (int i = 0; i < 100; i++) {
 				String key = "KEY" + i;
 				Map<String, Object> headers = new HashMap<>();
 				headers.put(MessageConst.PROPERTY_KEYS, key);
 				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
-				Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
-				return msg;
-			}).log();
+				Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers);
+				streamBridge.send("producer-out-0", msg);
+			}
 		};
 	}
 }
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
index e1945ab90..37fd8c8ef 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
@@ -5,8 +5,6 @@ spring:
     name: rocketmq-broadcast-producer-example
   cloud:
     stream:
-      function:
-        definition: producer;
       rocketmq:
         binder:
           name-server: localhost:9876
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml
index dedbe275a..db4b0b7f7 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml
@@ -33,6 +33,11 @@
             <artifactId>spring-boot-starter-actuator</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>rocketmq-example-common</artifactId>
+            <version>${revision}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
index 529530746..9ef0f8ac3 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
@@ -19,15 +19,17 @@ package com.alibaba.cloud.examples.broadcast;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 
+import com.alibaba.cloud.examples.common.SimpleMsg;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.function.StreamBridge;
 import org.springframework.context.annotation.Bean;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.support.GenericMessage;
@@ -38,30 +40,49 @@ import org.springframework.messaging.support.GenericMessage;
 public class RocketMQDelayConsumeApplication {
 	private static final Logger log = LoggerFactory
 			.getLogger(RocketMQDelayConsumeApplication.class);
+	@Autowired
+	private StreamBridge streamBridge;
 
 	public static void main(String[] args) {
 		SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
 	}
 
 	@Bean
-	public Supplier<Flux<Message<String>>> producer() {
-		return () -> {
-			return Flux.range(0, 100).map(i -> {
+	public ApplicationRunner producerDelay() {
+		return args -> {
+			for (int i = 0; i < 100; i++) {
 				String key = "KEY" + i;
 				Map<String, Object> headers = new HashMap<>();
 				headers.put(MessageConst.PROPERTY_KEYS, key);
 				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
 				headers.put("DELAY", 2);
-				Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
-				return msg;
-			}).log();
+				Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
+				streamBridge.send("producer-out-0", msg);
+			}
 		};
 	}
 
 	@Bean
-	public Consumer<Message<String>> consumer() {
+	public ApplicationRunner producerSchedule() {
+		return args -> {
+			for (int i = 0; i < 100; i++) {
+				String key = "KEY" + i;
+				Map<String, Object> headers = new HashMap<>();
+				headers.put(MessageConst.PROPERTY_KEYS, key);
+				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+				// send message after 3000ms
+				long delayTime = System.currentTimeMillis() + 3000;
+				headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
+				Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
+				streamBridge.send("producer-out-0", msg);
+			}
+		};
+	}
+
+	@Bean
+	public Consumer<Message<SimpleMsg>> consumer() {
 		return msg -> {
-			log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload());
+			log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getMsg());
 		};
 	}
 }
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
index 25ed6dd9b..1fcd79b9f 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
@@ -6,7 +6,7 @@ spring:
   cloud:
     stream:
       function:
-        definition: producer;consumer;
+        definition: consumer;
       rocketmq:
         binder:
           name-server: localhost:9876
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/pom.xml
new file mode 100644
index 000000000..a002bca53
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/pom.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <groupId>com.alibaba.cloud</groupId>
+        <artifactId>spring-cloud-alibaba-examples</artifactId>
+        <version>${revision}</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+
+    <artifactId>rocketmq-example-common</artifactId>
+    <name>Spring Cloud Starter Stream Alibaba RocketMQ Example COMMON</name>
+    <description>Some rocketMQ exmaple common codes</description>
+    <packaging>jar</packaging>
+
+</project>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/java/com/alibaba/cloud/examples/common/SimpleMsg.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/java/com/alibaba/cloud/examples/common/SimpleMsg.java
new file mode 100644
index 000000000..3ec591aad
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/java/com/alibaba/cloud/examples/common/SimpleMsg.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.common;
+
+/**
+ * @author sorie
+ */
+public class SimpleMsg {
+
+	private String msg;
+
+	public SimpleMsg(String msg) {
+		this.msg = msg;
+	}
+
+	public String getMsg() {
+		return msg;
+	}
+
+	public void setMsg(String msg) {
+		this.msg = msg;
+	}
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/resources/application.yml
new file mode 100644
index 000000000..2ff74a231
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/resources/application.yml
@@ -0,0 +1,29 @@
+server:
+  port: 28087
+spring:
+  application:
+    name: rocketmq-sql-consume-example
+  cloud:
+    stream:
+      function:
+        definition: producer;consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+          consumer-in-0:
+            consumer:
+#             tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+              subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)
+      bindings:
+        producer-out-0:
+          destination: sql
+        consumer-in-0:
+          destination: sql
+          group: sql-group
+logging:
+  level:
+    org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml
index 14d80e313..83d93942b 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml
@@ -30,6 +30,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-json</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>rocketmq-example-common</artifactId>
+            <version>${revision}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
index e1b275838..8611e8d6c 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
@@ -22,6 +22,8 @@ import org.apache.rocketmq.client.producer.MessageQueueSelector;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.stereotype.Component;
@@ -32,10 +34,13 @@ import org.springframework.stereotype.Component;
  */
 @Component
 public class OrderlyMessageQueueSelector implements MessageQueueSelector {
+	private static final Logger log = LoggerFactory
+			.getLogger(OrderlyMessageQueueSelector.class);
 	@Override
 	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
 		Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
-		int index = id % Math.min(mqs.size(), RocketMQOrderlyConsumeApplication.tags.length);
+		String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
+		int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size();
 		return mqs.get(index);
 	}
 }
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java
index 53dc15a40..b87f45b21 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java
@@ -19,16 +19,18 @@ package com.alibaba.cloud.examples.orderly;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 
+import com.alibaba.cloud.examples.common.SimpleMsg;
 import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.function.StreamBridge;
 import org.springframework.context.annotation.Bean;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.support.GenericMessage;
@@ -40,6 +42,9 @@ public class RocketMQOrderlyConsumeApplication {
 	private static final Logger log = LoggerFactory
 			.getLogger(RocketMQOrderlyConsumeApplication.class);
 
+	@Autowired
+	private StreamBridge streamBridge;
+
 	/***
 	 * tag array.
 	 */
@@ -50,26 +55,26 @@ public class RocketMQOrderlyConsumeApplication {
 	}
 
 	@Bean
-	public Supplier<Flux<Message<String>>> producer() {
-		return () -> {
-			return Flux.range(0, 100).map(i -> {
+	public ApplicationRunner producer() {
+		return args -> {
+			for (int i = 0; i < 100; i++) {
 				String key = "KEY" + i;
 				Map<String, Object> headers = new HashMap<>();
 				headers.put(MessageConst.PROPERTY_KEYS, key);
 				headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
 				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
-				Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
-				return msg;
-			}).log();
+				Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
+				streamBridge.send("producer-out-0", msg);
+			}
 		};
 	}
 
 	@Bean
-	public Consumer<Message<String>> consumer() {
+	public Consumer<Message<SimpleMsg>> consumer() {
 		return msg -> {
 			String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
 					MessageConst.PROPERTY_TAGS).toString();
-			log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " TAG:" +
+			log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
 					msg.getHeaders().get(tagHeaderKey).toString());
 			try {
 				Thread.sleep(100);
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml
index a7eb232b0..c14ee5d73 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml
@@ -6,7 +6,7 @@ spring:
   cloud:
     stream:
       function:
-        definition: producer;consumer;
+        definition: consumer;
       rocketmq:
         binder:
           name-server: localhost:9876
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml
index 534d05021..294c8c862 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml
@@ -32,7 +32,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-actuator</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>rocketmq-example-common</artifactId>
+            <version>${revision}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
index fea68af68..78a07def7 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
@@ -19,15 +19,17 @@ package com.alibaba.cloud.examples.broadcast;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 
+import com.alibaba.cloud.examples.common.SimpleMsg;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.function.StreamBridge;
 import org.springframework.context.annotation.Bean;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.support.GenericMessage;
@@ -38,7 +40,8 @@ import org.springframework.messaging.support.GenericMessage;
 public class RocketMQSqlConsumeApplication {
 	private static final Logger log = LoggerFactory
 			.getLogger(RocketMQSqlConsumeApplication.class);
-
+	@Autowired
+	private StreamBridge streamBridge;
 	public static void main(String[] args) {
 		SpringApplication.run(RocketMQSqlConsumeApplication.class, args);
 	}
@@ -54,27 +57,27 @@ public class RocketMQSqlConsumeApplication {
 	public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5};
 
 	@Bean
-	public Supplier<Flux<Message<String>>> producer() {
-		return () -> {
-			return Flux.range(0, 100).map(i -> {
+	public ApplicationRunner producer() {
+		return args -> {
+			for (int i = 0; i < 100; i++) {
 				String key = "KEY" + i;
 				Map<String, Object> headers = new HashMap<>();
 				headers.put(MessageConst.PROPERTY_KEYS, key);
 				headers.put("color", color[i % color.length]);
 				headers.put("price", price[i % price.length]);
 				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
-				Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
-				return msg;
-			}).log();
+				Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
+				streamBridge.send("producer-out-0", msg);
+			}
 		};
 	}
 
 	@Bean
-	public Consumer<Message<String>> consumer() {
+	public Consumer<Message<SimpleMsg>> consumer() {
 		return msg -> {
 			String colorHeaderKey = "color";
 			String priceHeaderKey = "price";
-			log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " COLOR:" +
+			log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " COLOR:" +
 					msg.getHeaders().get(colorHeaderKey).toString() + " " +
 					"PRICE: " + msg.getHeaders().get(priceHeaderKey).toString());
 		};
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
index 2ff74a231..b7f41e841 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
@@ -6,7 +6,7 @@ spring:
   cloud:
     stream:
       function:
-        definition: producer;consumer;
+        definition: consumer;
       rocketmq:
         binder:
           name-server: localhost:9876

From 7381ef0aa50e24d3ebc920af2531962f4d707f16 Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Mon, 11 Apr 2022 22:42:12 +0800
Subject: [PATCH 06/10] gitignore roll back

---
 .gitignore | 1 -
 1 file changed, 1 deletion(-)

diff --git a/.gitignore b/.gitignore
index c90f66d27..459a01ca3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -43,4 +43,3 @@ target
 
 # AsciiDoc
 spring-cloud-alibaba-docs/**/*.html
-**/eclipse/*
\ No newline at end of file

From 8a55030dd63ee971208b0075bae1d305ea1a52fb Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Sun, 17 Apr 2022 12:31:11 +0800
Subject: [PATCH 07/10] rocketmq example docs

---
 .../rocketmq-example/readme-zh.md             | 572 ++++++++++++++++--
 .../rocketmq-example/readme.md                | 566 ++++++++++++++++-
 2 files changed, 1070 insertions(+), 68 deletions(-)

diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
index ec27f69eb..8d46de1f7 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
+++ b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
@@ -11,7 +11,7 @@
 这是官方对 Spring Cloud Stream 的一段介绍:
 
 Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 `Spring Integration` 与 Broker 进行连接。
- 
+
 Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
 
 Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
@@ -28,15 +28,31 @@ Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间
 
 ![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png)
 
-## 示例
 
-### 如何接入
 
-在启动示例进行演示之前,我们先了解一下 Spring Cloud 应用如何接入 RocketMQ Binder。
+## 准备工作
 
-> **注意:本章节只是为了便于您理解接入方式,本示例代码中已经完成****接入工作,您无需再进行修改。**
+### 下载并启动 RocketMQ
 
-1. 首先,修改 `pom.xml` 文件,引入 RocketMQ Stream Starter。
+**在接入 RocketMQ Binder 之前,首先需要启动 RocketMQ 的 Name Server 和 Broker。**
+
+1. 下载[RocketMQ最新的二进制文件](https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip),并解压
+
+2. 启动 Name Server
+
+```bash
+sh bin/mqnamesrv
+```
+
+3. 启动 Broker
+
+```bash
+sh bin/mqbroker -n localhost:9876
+```
+
+### 引入依赖
+
+修改 `pom.xml` 文件,引入 RocketMQ Stream Starter。
 
 ```xml
 <dependency>
@@ -45,7 +61,17 @@ Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间
 </dependency>
 ```
 
-2. 配置 Input 和 Output 的 Binding 信息并配合 `@EnableBinding` 注解使其生效
+## 简单示例
+
+### 创建Topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
+```
+
+### 示例代码
+
+配置 Input 和 Output 的 Binding 信息并配合 `@EnableBinding` 注解使其生效
 
 ```java
 @SpringBootApplication
@@ -68,33 +94,6 @@ spring.cloud.stream.bindings.output.content-type=application/json
 spring.cloud.stream.bindings.input.destination=test-topic
 spring.cloud.stream.bindings.input.content-type=application/json
 spring.cloud.stream.bindings.input.group=test-group
-
-```
-	
-3. 消息发送及消息订阅
-
-### 下载并启动 RocketMQ
-
-**在接入 RocketMQ Binder 之前,首先需要启动 RocketMQ 的 Name Server 和 Broker。**
-
-1. 下载[RocketMQ最新的二进制文件](https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip),并解压
-
-2. 启动 Name Server
-
-```bash
-sh bin/mqnamesrv
-```
-
-3. 启动 Broker
-
-```bash
-sh bin/mqbroker -n localhost:9876
-```
-
-4. 创建 Topic: test-topic
-
-```bash
-sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
 ```
 
 ### 应用启动
@@ -105,7 +104,7 @@ sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
 spring.application.name=rocketmq-example
 server.port=28081
 ```
-	
+
 2. 启动应用,支持 IDE 直接启动和编译打包后启动。
 
 	1. IDE 直接启动:找到主类 `RocketMQApplication`,执行 main 方法启动应用。
@@ -196,6 +195,509 @@ public class ReceiveService {
 }
 ```
 
+## 广播消费示例
+
+​	广播会发送消息给所有消费者。如果你想同一消费组下所有消费者接收到同一个topic下的消息,广播消费非常适合此场景。
+
+### 创建Topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t broadcast
+```
+
+### 生产者
+
+**application.yml**
+
+```yaml
+server:
+  port: 28085
+spring:
+  application:
+    name: rocketmq-broadcast-producer-example
+  cloud:
+    stream:
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+      bindings:
+        producer-out-0:
+          destination: broadcast
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+使用`ApplicationRunner`和`StreamBridge`发送消息。
+
+```java
+@SpringBootApplication
+public class RocketMQBroadcastProducerApplication {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQBroadcastProducerApplication.class);
+   @Autowired
+   private StreamBridge streamBridge;
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
+   }
+
+   @Bean
+   public ApplicationRunner producer() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+            Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+}
+```
+
+### 消费者
+
+启动两个消费者实例。
+
+#### 消费者1
+
+**application.yml**
+
+```yaml
+server:
+  port: 28084
+spring:
+  application:
+    name: rocketmq-broadcast-consumer1-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          consumer-in-0:
+            consumer:
+              messageModel: BROADCASTING
+      bindings:
+        consumer-in-0:
+          destination: broadcast
+          group: broadcast-consumer
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+```java
+@SpringBootApplication
+public class RocketMQBroadcastConsumer1Application {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQBroadcastConsumer1Application.class);
+
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args);
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
+      };
+   }
+}
+```
+
+#### 消费者2
+
+**application.yml**
+
+```yaml
+server:
+  port: 28083
+spring:
+  application:
+    name: rocketmq-broadcast-consumer2-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          consumer-in-0:
+            consumer:
+              messageModel: BROADCASTING
+      bindings:
+        consumer-in-0:
+          destination: broadcast
+          group: broadcast-consumer
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+```java
+@SpringBootApplication
+public class RocketMQBroadcastConsumer2Application {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQBroadcastConsumer2Application.class);
+
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
+      };
+   }
+}
+```
+
+## 顺序消费示例
+
+顺序消息(FIFO消息)是消息队列RocketMQ版提供的一种严格按照顺序来发布和消费的消息类型。
+
+顺序消息分为两类:
+
+- 全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
+
+### 创建Topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
+```
+
+### 示例代码
+
+**application.yml**
+
+```yaml
+server:
+  port: 28082
+spring:
+  application:
+    name: rocketmq-orderly-consume-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+              # 定义messageSelector
+              messageQueueSelector: orderlyMessageQueueSelector
+          consumer-in-0:
+            consumer:
+              # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+              subscription: 'TagA || TagC || TagD'
+              push:
+                orderly: true
+      bindings:
+        producer-out-0:
+          destination: orderly
+        consumer-in-0:
+          destination: orderly
+          group: orderly-consumer
+
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**MessageQueueSelector**
+
+选择适合自己的分区选择算法,保证同一个参数得到的结果相同。
+
+```java
+@Component
+public class OrderlyMessageQueueSelector implements MessageQueueSelector {
+   private static final Logger log = LoggerFactory
+         .getLogger(OrderlyMessageQueueSelector.class);
+   @Override
+   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+      Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+      String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
+      int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size();
+      return mqs.get(index);
+   }
+}
+```
+
+**生产者&消费者**
+
+```java
+@SpringBootApplication
+public class RocketMQOrderlyConsumeApplication {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQOrderlyConsumeApplication.class);
+
+   @Autowired
+   private StreamBridge streamBridge;
+
+   /***
+    * tag array.
+    */
+   public static final String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
+
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQOrderlyConsumeApplication.class, args);
+   }
+
+   @Bean
+   public ApplicationRunner producer() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
+               MessageConst.PROPERTY_TAGS).toString();
+         log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
+               msg.getHeaders().get(tagHeaderKey).toString());
+         try {
+            Thread.sleep(100);
+         }
+         catch (InterruptedException ignored) {
+         }
+      };
+   }
+
+}
+```
+
+## 延时消息示例
+
+- 延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
+
+### 创建Topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t delay
+```
+
+### 示例代码
+
+**application.yml**
+
+```yaml
+server:
+  port: 28086
+spring:
+  application:
+    name: rocketmq-delay-consume-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+      bindings:
+        producer-out-0:
+          destination: delay
+        consumer-in-0:
+          destination: delay
+          group: delay-group
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+```java
+@SpringBootApplication
+public class RocketMQDelayConsumeApplication {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQDelayConsumeApplication.class);
+   @Autowired
+   private StreamBridge streamBridge;
+
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
+   }
+
+   @Bean
+   public ApplicationRunner producerDelay() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+   			// 设置延时等级1~10
+            headers.put("DELAY", 2);
+            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+
+   @Bean
+   public ApplicationRunner producerSchedule() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+            // 发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。
+            long delayTime = System.currentTimeMillis() + 3000;
+            headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
+            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getMsg());
+      };
+   }
+}
+```
+
+## 过滤消息示例
+
+### 创建Topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t sql
+```
+
+### 示例代码
+
+**application.yml**
+
+支持tag过滤或者sql过滤,设置`spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.subscription`即可。
+
+tag示例: `tag:red || blue`
+
+sql示例: `sql:(color in ('red1', 'red2', 'red4') and price>3)`
+
+更多请参考: [Filter](https://rocketmq.apache.org/docs/filter-by-sql92-example/)
+
+```yaml
+server:
+  port: 28087
+spring:
+  application:
+    name: rocketmq-sql-consume-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+          consumer-in-0:
+            consumer:
+              # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+              subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)
+      bindings:
+        producer-out-0:
+          destination: sql
+        consumer-in-0:
+          destination: sql
+          group: sql-group
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+```java
+@SpringBootApplication
+public class RocketMQSqlConsumeApplication {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQSqlConsumeApplication.class);
+   @Autowired
+   private StreamBridge streamBridge;
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQSqlConsumeApplication.class, args);
+   }
+
+   /**
+    * color array.
+    */
+   public static final String[] color = new String[] {"red1", "red2", "red3", "red4", "red5"};
+
+   /**
+    * price array.
+    */
+   public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5};
+
+   @Bean
+   public ApplicationRunner producer() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put("color", color[i % color.length]);
+            headers.put("price", price[i % price.length]);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         String colorHeaderKey = "color";
+         String priceHeaderKey = "price";
+         log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " COLOR:" +
+               msg.getHeaders().get(colorHeaderKey).toString() + " " +
+               "PRICE: " + msg.getHeaders().get(priceHeaderKey).toString());
+      };
+   }
+}
+```
+
 ## Endpoint 信息查看
 
 Spring Boot 应用支持通过 Endpoint 来暴露相关信息,RocketMQ Stream Starter 也支持这一点。
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme.md b/spring-cloud-alibaba-examples/rocketmq-example/readme.md
index eec6675f9..1d7838c96 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/readme.md
+++ b/spring-cloud-alibaba-examples/rocketmq-example/readme.md
@@ -24,15 +24,29 @@ This is a overview of Spring Cloud Stream.
 
 ![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/images/SCSt-overview.png)
 
-## Demo
+## Preparation
 
-### Integration with RocketMQ Binder
+### Download and Startup RocketMQ
+
+You should startup Name Server and Broker before using RocketMQ Binder.
+
+1. Download [RocketMQ](https://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip) and unzip it.
+
+2. Startup Name Server
 
-Before we start the demo, let's learn how to Integration with RocketMQ Binder to a Spring Cloud application.
+```bash
+sh bin/mqnamesrv
+```
+
+3. Startup Broker
+
+```bash
+sh bin/mqbroker -n localhost:9876
+```
 
-**Note: This section is to show you how to connect to Sentinel. The configurations have been completed in the following example, so you don't need modify the code any more.**
+### Declare dependency
 
-1. Add dependency spring-cloud-starter-stream-rocketmq in the pom.xml file in your Spring Cloud project.
+Add dependency spring-cloud-starter-stream-rocketmq to the `pom.xml` file in your Spring Cloud project.
 
 ```xml
 <dependency>
@@ -41,7 +55,17 @@ Before we start the demo, let's learn how to Integration with RocketMQ Binder to
 </dependency>
 ```
 
-2. Configure Input and Output Binding and cooperate with `@EnableBinding` annotation
+## Simple example
+
+### Create topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
+```
+
+### Integration with RocketMQ Binder
+
+Configure Input and Output Binding and cooperate with `@EnableBinding` annotation
 
 ```java
 @SpringBootApplication
@@ -66,32 +90,6 @@ spring.cloud.stream.bindings.input.content-type=application/json
 spring.cloud.stream.bindings.input.group=test-group
 
 ```
-	
-3. pub/sub messages
-
-### Download and Startup RocketMQ
-
-You should startup Name Server and Broker before using RocketMQ Binder.
-
-1. Download [RocketMQ](https://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip) and unzip it.
-
-2. Startup Name Server
-
-```bash
-sh bin/mqnamesrv
-```
-
-3. Startup Broker
-
-```bash
-sh bin/mqbroker -n localhost:9876
-```
-
-4. Create topic: test-topic
-
-```bash
-sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
-```
 
 ### Start Application
 
@@ -101,7 +99,7 @@ sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
 spring.application.name=rocketmq-example
 server.port=28081
 ```
-	
+
 2. Start the application in IDE or by building a fatjar.
 
 	1. Start in IDE: Find main class  `RocketMQApplication`, and execute the main method.
@@ -192,6 +190,508 @@ public class ReceiveService {
 }
 ```
 
+## Broadcasting exmaple
+
+### Create topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t broadcast
+```
+
+### Producer
+
+**application.yml**
+
+```yaml
+server:
+  port: 28085
+spring:
+  application:
+    name: rocketmq-broadcast-producer-example
+  cloud:
+    stream:
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+      bindings:
+        producer-out-0:
+          destination: broadcast
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+Use `ApplicationRunner` and `StreamBridge` to send messages.
+
+```java
+@SpringBootApplication
+public class RocketMQBroadcastProducerApplication {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQBroadcastProducerApplication.class);
+   @Autowired
+   private StreamBridge streamBridge;
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
+   }
+
+   @Bean
+   public ApplicationRunner producer() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+            Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+}
+```
+
+### Consumer
+
+Startup two consumers.
+
+#### Consumer1
+
+**application.yml**
+
+```yaml
+server:
+  port: 28084
+spring:
+  application:
+    name: rocketmq-broadcast-consumer1-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          consumer-in-0:
+            consumer:
+              messageModel: BROADCASTING
+      bindings:
+        consumer-in-0:
+          destination: broadcast
+          group: broadcast-consumer
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+```java
+@SpringBootApplication
+public class RocketMQBroadcastConsumer1Application {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQBroadcastConsumer1Application.class);
+
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args);
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
+      };
+   }
+}
+```
+
+#### Consumer2
+
+**application.yml**
+
+```yaml
+server:
+  port: 28083
+spring:
+  application:
+    name: rocketmq-broadcast-consumer2-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          consumer-in-0:
+            consumer:
+              messageModel: BROADCASTING
+      bindings:
+        consumer-in-0:
+          destination: broadcast
+          group: broadcast-consumer
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+```java
+@SpringBootApplication
+public class RocketMQBroadcastConsumer2Application {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQBroadcastConsumer2Application.class);
+
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
+      };
+   }
+}
+```
+
+## Order example
+
+​	RocketMQ provides ordered messages using FIFO order.
+
+​	There are two types of ordered messages.
+
+* Global: For a specified topic, all messages are published and consumed in strict FIFO (First In First Out) order.
+* Partition: For a specified topic, all messages are partitioned according to the `Sharding Key`. Messages within the same partition are published and consumed in strict FIFO order. `Sharding Key` is a key field used to distinguish different partitions in sequential messages, and it is a completely different concept from the Key of ordinary messages.
+
+### Create Topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
+```
+
+### Example code
+
+**application.yml**
+
+```yaml
+server:
+  port: 28082
+spring:
+  application:
+    name: rocketmq-orderly-consume-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+              # 定义messageSelector
+              messageQueueSelector: orderlyMessageQueueSelector
+          consumer-in-0:
+            consumer:
+              # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+              subscription: 'TagA || TagC || TagD'
+              push:
+                orderly: true
+      bindings:
+        producer-out-0:
+          destination: orderly
+        consumer-in-0:
+          destination: orderly
+          group: orderly-consumer
+
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**MessageQueueSelector**
+
+Choose a partition selection algorithm for you, and ensure that the same parameters get the same results.
+
+```java
+@Component
+public class OrderlyMessageQueueSelector implements MessageQueueSelector {
+   private static final Logger log = LoggerFactory
+         .getLogger(OrderlyMessageQueueSelector.class);
+   @Override
+   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+      Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+      String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
+      int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size();
+      return mqs.get(index);
+   }
+}
+```
+
+**Producer&Consumer**
+
+```java
+@SpringBootApplication
+public class RocketMQOrderlyConsumeApplication {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQOrderlyConsumeApplication.class);
+
+   @Autowired
+   private StreamBridge streamBridge;
+
+   /***
+    * tag array.
+    */
+   public static final String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
+
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQOrderlyConsumeApplication.class, args);
+   }
+
+   @Bean
+   public ApplicationRunner producer() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
+               MessageConst.PROPERTY_TAGS).toString();
+         log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
+               msg.getHeaders().get(tagHeaderKey).toString());
+         try {
+            Thread.sleep(100);
+         }
+         catch (InterruptedException ignored) {
+         }
+      };
+   }
+
+}
+```
+
+## Schedule example
+
+Scheduled messages differ from normal messages in that they won’t be delivered until a provided time later.
+
+### Create topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t delay
+```
+
+### Example code
+
+**application.yml**
+
+```yaml
+server:
+  port: 28086
+spring:
+  application:
+    name: rocketmq-delay-consume-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+      bindings:
+        producer-out-0:
+          destination: delay
+        consumer-in-0:
+          destination: delay
+          group: delay-group
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+```java
+@SpringBootApplication
+public class RocketMQDelayConsumeApplication {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQDelayConsumeApplication.class);
+   @Autowired
+   private StreamBridge streamBridge;
+
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
+   }
+
+   @Bean
+   public ApplicationRunner producerDelay() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+   			// Set the delay level 1~10
+            headers.put("DELAY", 2);
+            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+
+   @Bean
+   public ApplicationRunner producerSchedule() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+            // Sending scheduled message, you need to set the delay time in milliseconds (ms). The message will be delivered after the specified delay time, for example, the message will be delivered after 3 seconds.
+            long delayTime = System.currentTimeMillis() + 3000;
+            headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
+            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getMsg());
+      };
+   }
+}
+```
+
+## Filter example
+
+### Create topic
+
+```sh
+sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t sql
+```
+
+### Example code
+
+**application.yml**
+
+RocketMQ stream binder supports filter by tag or sql, just setting `spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.subscription`.
+
+Tag example: `tag:red || blue`
+
+Sql example: `sql:(color in ('red1', 'red2', 'red4') and price>3)`
+
+More: [Filter](https://rocketmq.apache.org/docs/filter-by-sql92-example/)
+
+```yaml
+server:
+  port: 28087
+spring:
+  application:
+    name: rocketmq-sql-consume-example
+  cloud:
+    stream:
+      function:
+        definition: consumer;
+      rocketmq:
+        binder:
+          name-server: localhost:9876
+        bindings:
+          producer-out-0:
+            producer:
+              group: output_1
+          consumer-in-0:
+            consumer:
+              # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+              subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)
+      bindings:
+        producer-out-0:
+          destination: sql
+        consumer-in-0:
+          destination: sql
+          group: sql-group
+logging:
+  level:
+    org.springframework.context.support: debug
+```
+
+**code**
+
+```java
+@SpringBootApplication
+public class RocketMQSqlConsumeApplication {
+   private static final Logger log = LoggerFactory
+         .getLogger(RocketMQSqlConsumeApplication.class);
+   @Autowired
+   private StreamBridge streamBridge;
+   public static void main(String[] args) {
+      SpringApplication.run(RocketMQSqlConsumeApplication.class, args);
+   }
+
+   /**
+    * color array.
+    */
+   public static final String[] color = new String[] {"red1", "red2", "red3", "red4", "red5"};
+
+   /**
+    * price array.
+    */
+   public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5};
+
+   @Bean
+   public ApplicationRunner producer() {
+      return args -> {
+         for (int i = 0; i < 100; i++) {
+            String key = "KEY" + i;
+            Map<String, Object> headers = new HashMap<>();
+            headers.put(MessageConst.PROPERTY_KEYS, key);
+            headers.put("color", color[i % color.length]);
+            headers.put("price", price[i % price.length]);
+            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
+            streamBridge.send("producer-out-0", msg);
+         }
+      };
+   }
+
+   @Bean
+   public Consumer<Message<SimpleMsg>> consumer() {
+      return msg -> {
+         String colorHeaderKey = "color";
+         String priceHeaderKey = "price";
+         log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " COLOR:" +
+               msg.getHeaders().get(colorHeaderKey).toString() + " " +
+               "PRICE: " + msg.getHeaders().get(priceHeaderKey).toString());
+      };
+   }
+}
+```
+
 ## Endpoint
 
 Add dependency `spring-cloud-starter-stream-rocketmq` to your pom.xml file, and configure your endpoint security strategy.

From 1509138e8682de4f56bb58f7e4d05a2c8078a739 Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Mon, 9 May 2022 22:43:53 +0800
Subject: [PATCH 08/10] fix rocketmq example errors.

---
 .../examples/delay/RocketMQDelayConsumeApplication.java  | 6 +++++-
 .../examples/orderly/OrderlyMessageQueueSelector.java    | 9 ++++++++-
 .../examples/sql/RocketMQSqlConsumeApplication.java      | 2 +-
 3 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
index 9ef0f8ac3..1dc210aff 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.alibaba.cloud.examples.broadcast;
+package com.alibaba.cloud.examples.delay;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -47,6 +47,10 @@ public class RocketMQDelayConsumeApplication {
 		SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
 	}
 
+	/**
+	 * Produce delay messages
+	 * @return
+	 */
 	@Bean
 	public ApplicationRunner producerDelay() {
 		return args -> {
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
index 8611e8d6c..2b5f9e980 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
@@ -36,10 +36,17 @@ import org.springframework.stereotype.Component;
 public class OrderlyMessageQueueSelector implements MessageQueueSelector {
 	private static final Logger log = LoggerFactory
 			.getLogger(OrderlyMessageQueueSelector.class);
+
+	/**
+	 * to select a fixed queue by id
+	 * @param mqs
+	 * @param msg
+	 * @param arg
+	 * @return
+	 */
 	@Override
 	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
 		Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
-		String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
 		int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size();
 		return mqs.get(index);
 	}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
index 78a07def7..b4a901604 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package com.alibaba.cloud.examples.broadcast;
+package com.alibaba.cloud.examples.sql;
 
 import java.util.HashMap;
 import java.util.Map;

From 2167fc2095e861046c664a155d2b03542f1b8f24 Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Mon, 9 May 2022 23:30:37 +0800
Subject: [PATCH 09/10] fix rocketmq example code style.

---
 .../delay/RocketMQDelayConsumeApplication.java         |  3 +--
 .../examples/orderly/OrderlyMessageQueueSelector.java  | 10 +++++-----
 2 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
index 1dc210aff..dc031e122 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
@@ -48,8 +48,7 @@ public class RocketMQDelayConsumeApplication {
 	}
 
 	/**
-	 * Produce delay messages
-	 * @return
+	 * Produce delay messages.
 	 */
 	@Bean
 	public ApplicationRunner producerDelay() {
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
index 2b5f9e980..1d8240f09 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
@@ -38,11 +38,11 @@ public class OrderlyMessageQueueSelector implements MessageQueueSelector {
 			.getLogger(OrderlyMessageQueueSelector.class);
 
 	/**
-	 * to select a fixed queue by id
-	 * @param mqs
-	 * @param msg
-	 * @param arg
-	 * @return
+	 * to select a fixed queue by id.
+	 * @param mqs all message queues of this topic.
+	 * @param msg mq message.
+	 * @param arg mq arguments.
+	 * @return message queue selected.
 	 */
 	@Override
 	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

From 6e92b5cc4b09df4e7b0d4f8cf4d928c8d8999e69 Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Wed, 11 May 2022 21:37:56 +0800
Subject: [PATCH 10/10] remove timed message example from rocketmq examples.
 Because it is not supported by rocketmq community version now.

---
 .../rocketmq-example/readme-zh.md             | 19 +------------------
 .../rocketmq-example/readme.md                | 19 +------------------
 .../RocketMQDelayConsumeApplication.java      | 19 +------------------
 3 files changed, 3 insertions(+), 54 deletions(-)

diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
index 8d46de1f7..dcf342af8 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
+++ b/spring-cloud-alibaba-examples/rocketmq-example/readme-zh.md
@@ -561,30 +561,13 @@ public class RocketMQDelayConsumeApplication {
             headers.put(MessageConst.PROPERTY_KEYS, key);
             headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
    			// 设置延时等级1~10
-            headers.put("DELAY", 2);
+            headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
             Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
             streamBridge.send("producer-out-0", msg);
          }
       };
    }
 
-   @Bean
-   public ApplicationRunner producerSchedule() {
-      return args -> {
-         for (int i = 0; i < 100; i++) {
-            String key = "KEY" + i;
-            Map<String, Object> headers = new HashMap<>();
-            headers.put(MessageConst.PROPERTY_KEYS, key);
-            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
-            // 发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。
-            long delayTime = System.currentTimeMillis() + 3000;
-            headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
-            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
-            streamBridge.send("producer-out-0", msg);
-         }
-      };
-   }
-
    @Bean
    public Consumer<Message<SimpleMsg>> consumer() {
       return msg -> {
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/readme.md b/spring-cloud-alibaba-examples/rocketmq-example/readme.md
index 1d7838c96..343370399 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/readme.md
+++ b/spring-cloud-alibaba-examples/rocketmq-example/readme.md
@@ -555,30 +555,13 @@ public class RocketMQDelayConsumeApplication {
             headers.put(MessageConst.PROPERTY_KEYS, key);
             headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
    			// Set the delay level 1~10
-            headers.put("DELAY", 2);
+			 headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
             Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
             streamBridge.send("producer-out-0", msg);
          }
       };
    }
 
-   @Bean
-   public ApplicationRunner producerSchedule() {
-      return args -> {
-         for (int i = 0; i < 100; i++) {
-            String key = "KEY" + i;
-            Map<String, Object> headers = new HashMap<>();
-            headers.put(MessageConst.PROPERTY_KEYS, key);
-            headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
-            // Sending scheduled message, you need to set the delay time in milliseconds (ms). The message will be delivered after the specified delay time, for example, the message will be delivered after 3 seconds.
-            long delayTime = System.currentTimeMillis() + 3000;
-            headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
-            Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
-            streamBridge.send("producer-out-0", msg);
-         }
-      };
-   }
-
    @Bean
    public Consumer<Message<SimpleMsg>> consumer() {
       return msg -> {
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
index dc031e122..4d94b650a 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
@@ -58,30 +58,13 @@ public class RocketMQDelayConsumeApplication {
 				Map<String, Object> headers = new HashMap<>();
 				headers.put(MessageConst.PROPERTY_KEYS, key);
 				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
-				headers.put("DELAY", 2);
+				headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
 				Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
 				streamBridge.send("producer-out-0", msg);
 			}
 		};
 	}
 
-	@Bean
-	public ApplicationRunner producerSchedule() {
-		return args -> {
-			for (int i = 0; i < 100; i++) {
-				String key = "KEY" + i;
-				Map<String, Object> headers = new HashMap<>();
-				headers.put(MessageConst.PROPERTY_KEYS, key);
-				headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
-				// send message after 3000ms
-				long delayTime = System.currentTimeMillis() + 3000;
-				headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
-				Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
-				streamBridge.send("producer-out-0", msg);
-			}
-		};
-	}
-
 	@Bean
 	public Consumer<Message<SimpleMsg>> consumer() {
 		return msg -> {