diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml
index 157fc502c..6006f8338 100644
--- a/spring-cloud-alibaba-examples/pom.xml
+++ b/spring-cloud-alibaba-examples/pom.xml
@@ -35,6 +35,14 @@
seata-example/account-service
rocketmq-example/rocketmq-consume-example
rocketmq-example/rocketmq-produce-example
+ rocketmq-example/rocketmq-comprehensive-example
+ rocketmq-example/rocketmq-orderly-consume-example
+ rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example
+ rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example
+ rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example
+ rocketmq-example/rocketmq-delay-consume-example
+ rocketmq-example/rocketmq-sql-consume-example
+
spring-cloud-bus-rocketmq-example
spring-cloud-alibaba-dubbo-examples
spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
new file mode 100644
index 000000000..4f9cb958f
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-examples
+ ${revision}
+ ../../../pom.xml
+
+ 4.0.0
+
+
+ rocketmq-broadcast-consumer2-example
+ Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Cosumer1
+ Example demonstrating how to broadcast consumption
+
+ jar
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-json
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ ${maven-deploy-plugin.version}
+
+ true
+
+
+
+
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java
new file mode 100644
index 000000000..053065b99
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQBroadcastConsumer1Application {
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQBroadcastConsumer1Application.class);
+
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args);
+ }
+
+ @Bean
+ public Consumer> consumer() {
+ return msg -> {
+ log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload());
+ };
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml
new file mode 100644
index 000000000..598cecb48
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml
@@ -0,0 +1,23 @@
+server:
+ port: 28084
+spring:
+ application:
+ name: rocketmq-broadcast-consumer1-example
+ cloud:
+ stream:
+ function:
+ definition: consumer;
+ rocketmq:
+ binder:
+ name-server: 192.168.0.200:9876
+ bindings:
+ consumer-in-0:
+ consumer:
+ messageModel: BROADCASTING
+ bindings:
+ consumer-in-0:
+ destination: broadcast
+ group: broadcast-consumer
+logging:
+ level:
+ org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
new file mode 100644
index 000000000..9bcd14626
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-examples
+ ${revision}
+ ../../../pom.xml
+
+ 4.0.0
+
+
+ rocketmq-broadcast-consumer1-example
+ Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Consumer2
+ Example demonstrating how to broadcast consumption
+ jar
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-json
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ ${maven-deploy-plugin.version}
+
+ true
+
+
+
+
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java
new file mode 100644
index 000000000..cbebc43cd
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQBroadcastConsumer2Application {
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQBroadcastConsumer2Application.class);
+
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
+ }
+ @Bean
+ public Consumer> consumer() {
+ return msg -> {
+ log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload());
+ };
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml
new file mode 100644
index 000000000..5060f2378
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml
@@ -0,0 +1,23 @@
+server:
+ port: 28083
+spring:
+ application:
+ name: rocketmq-broadcast-consumer2-example
+ cloud:
+ stream:
+ function:
+ definition: consumer;
+ rocketmq:
+ binder:
+ name-server: 192.168.0.200:9876
+ bindings:
+ consumer-in-0:
+ consumer:
+ messageModel: BROADCASTING
+ bindings:
+ consumer-in-0:
+ destination: broadcast
+ group: broadcast-consumer
+logging:
+ level:
+ org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml
new file mode 100644
index 000000000..3aad09fe0
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-examples
+ ${revision}
+ ../../../pom.xml
+
+ 4.0.0
+
+
+ rocketmq-broadcast-producer-example
+ Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Producer
+ Example demonstrating how to use rocketmq to produce
+ jar
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-json
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ ${maven-deploy-plugin.version}
+
+ true
+
+
+
+
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java
new file mode 100644
index 000000000..3832d85da
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+
+
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQBroadcastConsumeApplication {
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQBroadcastConsumeApplication.class);
+
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQBroadcastConsumeApplication.class, args);
+ }
+
+ @Bean
+ public Supplier>> producer() {
+ return () -> {
+ return Flux.range(0, 100).map(i -> {
+ String key = "KEY" + i;
+ Map headers = new HashMap<>();
+ headers.put(MessageConst.PROPERTY_KEYS, key);
+ headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+ Message msg = new GenericMessage("Hello RocketMQ " + i, headers);
+ return msg;
+ }).log();
+ };
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
new file mode 100644
index 000000000..63ea4d29c
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml
@@ -0,0 +1,22 @@
+server:
+ port: 28085
+spring:
+ application:
+ name: rocketmq-broadcast-producer-example
+ cloud:
+ stream:
+ function:
+ definition: producer;
+ rocketmq:
+ binder:
+ name-server: 192.168.0.200:9876
+ bindings:
+ producer-out-0:
+ producer:
+ group: output_1
+ bindings:
+ producer-out-0:
+ destination: broadcast
+logging:
+ level:
+ org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml
new file mode 100644
index 000000000..dedbe275a
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-examples
+ ${revision}
+ ../../pom.xml
+
+ 4.0.0
+
+
+ rocketmq-delay-consume-example
+ Spring Cloud Starter Stream Alibaba RocketMQ Delay Consume Example
+ Example demonstrating how to use rocketmq to delay consume
+ jar
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ ${maven-deploy-plugin.version}
+
+ true
+
+
+
+
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
new file mode 100644
index 000000000..529530746
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQDelayConsumeApplication {
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQDelayConsumeApplication.class);
+
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
+ }
+
+ @Bean
+ public Supplier>> producer() {
+ return () -> {
+ return Flux.range(0, 100).map(i -> {
+ String key = "KEY" + i;
+ Map headers = new HashMap<>();
+ headers.put(MessageConst.PROPERTY_KEYS, key);
+ headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+ headers.put("DELAY", 2);
+ Message msg = new GenericMessage("Hello RocketMQ " + i, headers);
+ return msg;
+ }).log();
+ };
+ }
+
+ @Bean
+ public Consumer> consumer() {
+ return msg -> {
+ log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload());
+ };
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
new file mode 100644
index 000000000..bffd72cf0
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml
@@ -0,0 +1,25 @@
+server:
+ port: 28086
+spring:
+ application:
+ name: rocketmq-delay-consume-example
+ cloud:
+ stream:
+ function:
+ definition: producer;consumer;
+ rocketmq:
+ binder:
+ name-server: 192.168.0.200:9876
+ bindings:
+ producer-out-0:
+ producer:
+ group: output_1
+ bindings:
+ producer-out-0:
+ destination: delay
+ consumer-in-0:
+ destination: delay
+ group: delay-group
+logging:
+ level:
+ org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml
new file mode 100644
index 000000000..14d80e313
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-examples
+ ${revision}
+ ../../pom.xml
+
+ 4.0.0
+
+
+ rocketmq-orderly-consume-example
+ Spring Cloud Starter Stream Alibaba RocketMQ Orderly Consume Example
+ Example demonstrating how to use rocketmq to produce, and consume orderly
+ jar
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-json
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ ${maven-deploy-plugin.version}
+
+ true
+
+
+
+
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
new file mode 100644
index 000000000..e1b275838
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.orderly;
+
+import java.util.List;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * @author sorie
+ */
+@Component
+public class OrderlyMessageQueueSelector implements MessageQueueSelector {
+ @Override
+ public MessageQueue select(List mqs, Message msg, Object arg) {
+ Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+ int index = id % Math.min(mqs.size(), RocketMQOrderlyConsumeApplication.tags.length);
+ return mqs.get(index);
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java
new file mode 100644
index 000000000..53dc15a40
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.orderly;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQOrderlyConsumeApplication {
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQOrderlyConsumeApplication.class);
+
+ /***
+ * tag array.
+ */
+ public static final String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
+
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQOrderlyConsumeApplication.class, args);
+ }
+
+ @Bean
+ public Supplier>> producer() {
+ return () -> {
+ return Flux.range(0, 100).map(i -> {
+ String key = "KEY" + i;
+ Map headers = new HashMap<>();
+ headers.put(MessageConst.PROPERTY_KEYS, key);
+ headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
+ headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+ Message msg = new GenericMessage("Hello RocketMQ " + i, headers);
+ return msg;
+ }).log();
+ };
+ }
+
+ @Bean
+ public Consumer> consumer() {
+ return msg -> {
+ String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
+ MessageConst.PROPERTY_TAGS).toString();
+ log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " TAG:" +
+ msg.getHeaders().get(tagHeaderKey).toString());
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException ignored) {
+ }
+ };
+ }
+
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml
new file mode 100644
index 000000000..a7eb232b0
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml
@@ -0,0 +1,33 @@
+server:
+ port: 28082
+spring:
+ application:
+ name: rocketmq-orderly-consume-example
+ cloud:
+ stream:
+ function:
+ definition: producer;consumer;
+ rocketmq:
+ binder:
+ name-server: localhost:9876
+ bindings:
+ producer-out-0:
+ producer:
+ group: output_1
+ messageQueueSelector: orderlyMessageQueueSelector
+ consumer-in-0:
+ consumer:
+ # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+ subscription: 'TagA || TagC || TagD'
+ push:
+ orderly: true
+ bindings:
+ producer-out-0:
+ destination: orderly
+ consumer-in-0:
+ destination: orderly
+ group: orderly-consumer
+
+logging:
+ level:
+ org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml
new file mode 100644
index 000000000..534d05021
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-examples
+ ${revision}
+ ../../pom.xml
+
+ 4.0.0
+
+
+ rocketmq-sql-consume-example
+ Spring Cloud Starter Stream Alibaba RocketMQ Sql Consume Example
+ Example demonstrating how to use rocketmq to filter message by sql
+ jar
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ ${maven-deploy-plugin.version}
+
+ true
+
+
+
+
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
new file mode 100644
index 000000000..fea68af68
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.broadcast;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQSqlConsumeApplication {
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQSqlConsumeApplication.class);
+
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQSqlConsumeApplication.class, args);
+ }
+
+ /**
+ * color array.
+ */
+ public static final String[] color = new String[] {"red1", "red2", "red3", "red4", "red5"};
+
+ /**
+ * price array.
+ */
+ public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5};
+
+ @Bean
+ public Supplier>> producer() {
+ return () -> {
+ return Flux.range(0, 100).map(i -> {
+ String key = "KEY" + i;
+ Map headers = new HashMap<>();
+ headers.put(MessageConst.PROPERTY_KEYS, key);
+ headers.put("color", color[i % color.length]);
+ headers.put("price", price[i % price.length]);
+ headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
+ Message msg = new GenericMessage("Hello RocketMQ " + i, headers);
+ return msg;
+ }).log();
+ };
+ }
+
+ @Bean
+ public Consumer> consumer() {
+ return msg -> {
+ String colorHeaderKey = "color";
+ String priceHeaderKey = "price";
+ log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " COLOR:" +
+ msg.getHeaders().get(colorHeaderKey).toString() + " " +
+ "PRICE: " + msg.getHeaders().get(priceHeaderKey).toString());
+ };
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
new file mode 100644
index 000000000..a679d7b57
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml
@@ -0,0 +1,29 @@
+server:
+ port: 28087
+spring:
+ application:
+ name: rocketmq-sql-consume-example
+ cloud:
+ stream:
+ function:
+ definition: producer;consumer;
+ rocketmq:
+ binder:
+ name-server: 192.168.0.200:9876
+ bindings:
+ producer-out-0:
+ producer:
+ group: output_1
+ consumer-in-0:
+ consumer:
+# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
+ subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)
+ bindings:
+ producer-out-0:
+ destination: sql
+ consumer-in-0:
+ destination: sql
+ group: sql-group
+logging:
+ level:
+ org.springframework.context.support: debug