From a216edc863f97f588e25f1bd5062d6ddd1f5df5f Mon Sep 17 00:00:00 2001 From: sorie Date: Sat, 9 Apr 2022 16:28:37 +0800 Subject: [PATCH] some rocketmq example. --- spring-cloud-alibaba-examples/pom.xml | 8 ++ .../pom.xml | 53 ++++++++++++ ...RocketMQBroadcastConsumer1Application.java | 47 +++++++++++ .../src/main/resources/application.yml | 23 ++++++ .../pom.xml | 52 ++++++++++++ ...RocketMQBroadcastConsumer2Application.java | 46 +++++++++++ .../src/main/resources/application.yml | 23 ++++++ .../pom.xml | 52 ++++++++++++ .../RocketMQBroadcastConsumeApplication.java | 60 ++++++++++++++ .../src/main/resources/application.yml | 22 +++++ .../rocketmq-delay-consume-example/pom.xml | 55 +++++++++++++ .../RocketMQDelayConsumeApplication.java | 67 +++++++++++++++ .../src/main/resources/application.yml | 25 ++++++ .../rocketmq-orderly-consume-example/pom.xml | 52 ++++++++++++ .../orderly/OrderlyMessageQueueSelector.java | 41 ++++++++++ .../RocketMQOrderlyConsumeApplication.java | 82 +++++++++++++++++++ .../src/main/resources/application.yml | 33 ++++++++ .../rocketmq-sql-consume-example/pom.xml | 55 +++++++++++++ .../sql/RocketMQSqlConsumeApplication.java | 82 +++++++++++++++++++ .../src/main/resources/application.yml | 29 +++++++ 20 files changed, 907 insertions(+) create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java create mode 100644 spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index 157fc502c..6006f8338 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -35,6 +35,14 @@ seata-example/account-service rocketmq-example/rocketmq-consume-example rocketmq-example/rocketmq-produce-example + rocketmq-example/rocketmq-comprehensive-example + rocketmq-example/rocketmq-orderly-consume-example + rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example + rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example + rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example + rocketmq-example/rocketmq-delay-consume-example + rocketmq-example/rocketmq-sql-consume-example + spring-cloud-bus-rocketmq-example spring-cloud-alibaba-dubbo-examples spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml new file mode 100644 index 000000000..4f9cb958f --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml @@ -0,0 +1,53 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../../pom.xml + + 4.0.0 + + + rocketmq-broadcast-consumer2-example + Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Cosumer1 + Example demonstrating how to broadcast consumption + + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-json + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java new file mode 100644 index 000000000..053065b99 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java @@ -0,0 +1,47 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.broadcast; + +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; + +/** + * @author sorie + */ +@SpringBootApplication +public class RocketMQBroadcastConsumer1Application { + private static final Logger log = LoggerFactory + .getLogger(RocketMQBroadcastConsumer1Application.class); + + public static void main(String[] args) { + SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args); + } + + @Bean + public Consumer> consumer() { + return msg -> { + log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload()); + }; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml new file mode 100644 index 000000000..598cecb48 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/resources/application.yml @@ -0,0 +1,23 @@ +server: + port: 28084 +spring: + application: + name: rocketmq-broadcast-consumer1-example + cloud: + stream: + function: + definition: consumer; + rocketmq: + binder: + name-server: 192.168.0.200:9876 + bindings: + consumer-in-0: + consumer: + messageModel: BROADCASTING + bindings: + consumer-in-0: + destination: broadcast + group: broadcast-consumer +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml new file mode 100644 index 000000000..9bcd14626 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml @@ -0,0 +1,52 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../../pom.xml + + 4.0.0 + + + rocketmq-broadcast-consumer1-example + Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Consumer2 + Example demonstrating how to broadcast consumption + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-json + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java new file mode 100644 index 000000000..cbebc43cd --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.broadcast; + +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; + +/** + * @author sorie + */ +@SpringBootApplication +public class RocketMQBroadcastConsumer2Application { + private static final Logger log = LoggerFactory + .getLogger(RocketMQBroadcastConsumer2Application.class); + + public static void main(String[] args) { + SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args); + } + @Bean + public Consumer> consumer() { + return msg -> { + log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload()); + }; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml new file mode 100644 index 000000000..5060f2378 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/resources/application.yml @@ -0,0 +1,23 @@ +server: + port: 28083 +spring: + application: + name: rocketmq-broadcast-consumer2-example + cloud: + stream: + function: + definition: consumer; + rocketmq: + binder: + name-server: 192.168.0.200:9876 + bindings: + consumer-in-0: + consumer: + messageModel: BROADCASTING + bindings: + consumer-in-0: + destination: broadcast + group: broadcast-consumer +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml new file mode 100644 index 000000000..3aad09fe0 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml @@ -0,0 +1,52 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../../pom.xml + + 4.0.0 + + + rocketmq-broadcast-producer-example + Spring Cloud Starter Stream Alibaba RocketMQ Broadcasting Consume Example Producer + Example demonstrating how to use rocketmq to produce + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-json + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java new file mode 100644 index 000000000..3832d85da --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumeApplication.java @@ -0,0 +1,60 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.broadcast; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.rocketmq.common.message.MessageConst; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + + +/** + * @author sorie + */ +@SpringBootApplication +public class RocketMQBroadcastConsumeApplication { + private static final Logger log = LoggerFactory + .getLogger(RocketMQBroadcastConsumeApplication.class); + + public static void main(String[] args) { + SpringApplication.run(RocketMQBroadcastConsumeApplication.class, args); + } + + @Bean + public Supplier>> producer() { + return () -> { + return Flux.range(0, 100).map(i -> { + String key = "KEY" + i; + Map headers = new HashMap<>(); + headers.put(MessageConst.PROPERTY_KEYS, key); + headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); + Message msg = new GenericMessage("Hello RocketMQ " + i, headers); + return msg; + }).log(); + }; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml new file mode 100644 index 000000000..63ea4d29c --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml @@ -0,0 +1,22 @@ +server: + port: 28085 +spring: + application: + name: rocketmq-broadcast-producer-example + cloud: + stream: + function: + definition: producer; + rocketmq: + binder: + name-server: 192.168.0.200:9876 + bindings: + producer-out-0: + producer: + group: output_1 + bindings: + producer-out-0: + destination: broadcast +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml new file mode 100644 index 000000000..dedbe275a --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml @@ -0,0 +1,55 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../pom.xml + + 4.0.0 + + + rocketmq-delay-consume-example + Spring Cloud Starter Stream Alibaba RocketMQ Delay Consume Example + Example demonstrating how to use rocketmq to delay consume + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java new file mode 100644 index 000000000..529530746 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java @@ -0,0 +1,67 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.broadcast; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.rocketmq.common.message.MessageConst; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +/** + * @author sorie + */ +@SpringBootApplication +public class RocketMQDelayConsumeApplication { + private static final Logger log = LoggerFactory + .getLogger(RocketMQDelayConsumeApplication.class); + + public static void main(String[] args) { + SpringApplication.run(RocketMQDelayConsumeApplication.class, args); + } + + @Bean + public Supplier>> producer() { + return () -> { + return Flux.range(0, 100).map(i -> { + String key = "KEY" + i; + Map headers = new HashMap<>(); + headers.put(MessageConst.PROPERTY_KEYS, key); + headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); + headers.put("DELAY", 2); + Message msg = new GenericMessage("Hello RocketMQ " + i, headers); + return msg; + }).log(); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload()); + }; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml new file mode 100644 index 000000000..bffd72cf0 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml @@ -0,0 +1,25 @@ +server: + port: 28086 +spring: + application: + name: rocketmq-delay-consume-example + cloud: + stream: + function: + definition: producer;consumer; + rocketmq: + binder: + name-server: 192.168.0.200:9876 + bindings: + producer-out-0: + producer: + group: output_1 + bindings: + producer-out-0: + destination: delay + consumer-in-0: + destination: delay + group: delay-group +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml new file mode 100644 index 000000000..14d80e313 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml @@ -0,0 +1,52 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../pom.xml + + 4.0.0 + + + rocketmq-orderly-consume-example + Spring Cloud Starter Stream Alibaba RocketMQ Orderly Consume Example + Example demonstrating how to use rocketmq to produce, and consume orderly + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-json + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java new file mode 100644 index 000000000..e1b275838 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java @@ -0,0 +1,41 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.orderly; + +import java.util.List; + +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageQueue; + +import org.springframework.messaging.MessageHeaders; +import org.springframework.stereotype.Component; + + +/** + * @author sorie + */ +@Component +public class OrderlyMessageQueueSelector implements MessageQueueSelector { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID); + int index = id % Math.min(mqs.size(), RocketMQOrderlyConsumeApplication.tags.length); + return mqs.get(index); + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java new file mode 100644 index 000000000..53dc15a40 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java @@ -0,0 +1,82 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.orderly; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport; +import org.apache.rocketmq.common.message.MessageConst; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +/** + * @author sorie + */ +@SpringBootApplication +public class RocketMQOrderlyConsumeApplication { + private static final Logger log = LoggerFactory + .getLogger(RocketMQOrderlyConsumeApplication.class); + + /*** + * tag array. + */ + public static final String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; + + public static void main(String[] args) { + SpringApplication.run(RocketMQOrderlyConsumeApplication.class, args); + } + + @Bean + public Supplier>> producer() { + return () -> { + return Flux.range(0, 100).map(i -> { + String key = "KEY" + i; + Map headers = new HashMap<>(); + headers.put(MessageConst.PROPERTY_KEYS, key); + headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]); + headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); + Message msg = new GenericMessage("Hello RocketMQ " + i, headers); + return msg; + }).log(); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey( + MessageConst.PROPERTY_TAGS).toString(); + log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " TAG:" + + msg.getHeaders().get(tagHeaderKey).toString()); + try { + Thread.sleep(100); + } + catch (InterruptedException ignored) { + } + }; + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml new file mode 100644 index 000000000..a7eb232b0 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml @@ -0,0 +1,33 @@ +server: + port: 28082 +spring: + application: + name: rocketmq-orderly-consume-example + cloud: + stream: + function: + definition: producer;consumer; + rocketmq: + binder: + name-server: localhost:9876 + bindings: + producer-out-0: + producer: + group: output_1 + messageQueueSelector: orderlyMessageQueueSelector + consumer-in-0: + consumer: + # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } . + subscription: 'TagA || TagC || TagD' + push: + orderly: true + bindings: + producer-out-0: + destination: orderly + consumer-in-0: + destination: orderly + group: orderly-consumer + +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml new file mode 100644 index 000000000..534d05021 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml @@ -0,0 +1,55 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../pom.xml + + 4.0.0 + + + rocketmq-sql-consume-example + Spring Cloud Starter Stream Alibaba RocketMQ Sql Consume Example + Example demonstrating how to use rocketmq to filter message by sql + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java new file mode 100644 index 000000000..fea68af68 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java @@ -0,0 +1,82 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.broadcast; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.rocketmq.common.message.MessageConst; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; +/** + * @author sorie + */ +@SpringBootApplication +public class RocketMQSqlConsumeApplication { + private static final Logger log = LoggerFactory + .getLogger(RocketMQSqlConsumeApplication.class); + + public static void main(String[] args) { + SpringApplication.run(RocketMQSqlConsumeApplication.class, args); + } + + /** + * color array. + */ + public static final String[] color = new String[] {"red1", "red2", "red3", "red4", "red5"}; + + /** + * price array. + */ + public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5}; + + @Bean + public Supplier>> producer() { + return () -> { + return Flux.range(0, 100).map(i -> { + String key = "KEY" + i; + Map headers = new HashMap<>(); + headers.put(MessageConst.PROPERTY_KEYS, key); + headers.put("color", color[i % color.length]); + headers.put("price", price[i % price.length]); + headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); + Message msg = new GenericMessage("Hello RocketMQ " + i, headers); + return msg; + }).log(); + }; + } + + @Bean + public Consumer> consumer() { + return msg -> { + String colorHeaderKey = "color"; + String priceHeaderKey = "price"; + log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " COLOR:" + + msg.getHeaders().get(colorHeaderKey).toString() + " " + + "PRICE: " + msg.getHeaders().get(priceHeaderKey).toString()); + }; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml new file mode 100644 index 000000000..a679d7b57 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml @@ -0,0 +1,29 @@ +server: + port: 28087 +spring: + application: + name: rocketmq-sql-consume-example + cloud: + stream: + function: + definition: producer;consumer; + rocketmq: + binder: + name-server: 192.168.0.200:9876 + bindings: + producer-out-0: + producer: + group: output_1 + consumer-in-0: + consumer: +# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } . + subscription: sql:(color in ('red1', 'red2', 'red4') and price>3) + bindings: + producer-out-0: + destination: sql + consumer-in-0: + destination: sql + group: sql-group +logging: + level: + org.springframework.context.support: debug