diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index 6006f8338..89babb45b 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -42,6 +42,7 @@ rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example rocketmq-example/rocketmq-delay-consume-example rocketmq-example/rocketmq-sql-consume-example + rocketmq-example/rocketmq-example-common spring-cloud-bus-rocketmq-example spring-cloud-alibaba-dubbo-examples diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml index d4894c03a..156055b6b 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/pom.xml @@ -31,6 +31,11 @@ org.springframework.boot spring-boot-starter-json + + com.alibaba.cloud + rocketmq-example-common + ${revision} + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java index 053065b99..e37fc0aa4 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer1-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer1Application.java @@ -18,6 +18,7 @@ package com.alibaba.cloud.examples.broadcast; import java.util.function.Consumer; +import com.alibaba.cloud.examples.common.SimpleMsg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +40,9 @@ public class RocketMQBroadcastConsumer1Application { } @Bean - public Consumer> consumer() { + public Consumer> consumer() { return msg -> { - log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload()); + log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg()); }; } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml index ecd738686..156112602 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/pom.xml @@ -30,6 +30,11 @@ org.springframework.boot spring-boot-starter-json + + com.alibaba.cloud + rocketmq-example-common + ${revision} + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java index cbebc43cd..cf5f88484 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastConsumer2Application.java @@ -18,6 +18,7 @@ package com.alibaba.cloud.examples.broadcast; import java.util.function.Consumer; +import com.alibaba.cloud.examples.common.SimpleMsg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,10 +38,11 @@ public class RocketMQBroadcastConsumer2Application { public static void main(String[] args) { SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args); } + @Bean - public Consumer> consumer() { + public Consumer> consumer() { return msg -> { - log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload()); + log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg()); }; } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml index 3aad09fe0..f410675f0 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/pom.xml @@ -30,6 +30,11 @@ org.springframework.boot spring-boot-starter-json + + com.alibaba.cloud + rocketmq-example-common + ${revision} + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java index 692fac32c..9f52ea911 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/java/com/alibaba/cloud/examples/broadcast/RocketMQBroadcastProducerApplication.java @@ -18,15 +18,18 @@ package com.alibaba.cloud.examples.broadcast; import java.util.HashMap; import java.util.Map; -import java.util.function.Supplier; +import com.alibaba.cloud.examples.common.SimpleMsg; import org.apache.rocketmq.common.message.MessageConst; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; @@ -39,22 +42,23 @@ import org.springframework.messaging.support.GenericMessage; public class RocketMQBroadcastProducerApplication { private static final Logger log = LoggerFactory .getLogger(RocketMQBroadcastProducerApplication.class); - + @Autowired + private StreamBridge streamBridge; public static void main(String[] args) { SpringApplication.run(RocketMQBroadcastProducerApplication.class, args); } @Bean - public Supplier>> producer() { - return () -> { - return Flux.range(0, 100).map(i -> { + public ApplicationRunner producer() { + return args -> { + for (int i = 0; i < 100; i++) { String key = "KEY" + i; Map headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_KEYS, key); headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); - Message msg = new GenericMessage("Hello RocketMQ " + i, headers); - return msg; - }).log(); + Message msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers); + streamBridge.send("producer-out-0", msg); + } }; } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml index e1945ab90..37fd8c8ef 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example/src/main/resources/application.yml @@ -5,8 +5,6 @@ spring: name: rocketmq-broadcast-producer-example cloud: stream: - function: - definition: producer; rocketmq: binder: name-server: localhost:9876 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml index dedbe275a..db4b0b7f7 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/pom.xml @@ -33,6 +33,11 @@ spring-boot-starter-actuator + + com.alibaba.cloud + rocketmq-example-common + ${revision} + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java index 529530746..9ef0f8ac3 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/java/com/alibaba/cloud/examples/delay/RocketMQDelayConsumeApplication.java @@ -19,15 +19,17 @@ package com.alibaba.cloud.examples.broadcast; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Supplier; +import com.alibaba.cloud.examples.common.SimpleMsg; import org.apache.rocketmq.common.message.MessageConst; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; @@ -38,30 +40,49 @@ import org.springframework.messaging.support.GenericMessage; public class RocketMQDelayConsumeApplication { private static final Logger log = LoggerFactory .getLogger(RocketMQDelayConsumeApplication.class); + @Autowired + private StreamBridge streamBridge; public static void main(String[] args) { SpringApplication.run(RocketMQDelayConsumeApplication.class, args); } @Bean - public Supplier>> producer() { - return () -> { - return Flux.range(0, 100).map(i -> { + public ApplicationRunner producerDelay() { + return args -> { + for (int i = 0; i < 100; i++) { String key = "KEY" + i; Map headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_KEYS, key); headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); headers.put("DELAY", 2); - Message msg = new GenericMessage("Hello RocketMQ " + i, headers); - return msg; - }).log(); + Message msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers); + streamBridge.send("producer-out-0", msg); + } }; } @Bean - public Consumer> consumer() { + public ApplicationRunner producerSchedule() { + return args -> { + for (int i = 0; i < 100; i++) { + String key = "KEY" + i; + Map headers = new HashMap<>(); + headers.put(MessageConst.PROPERTY_KEYS, key); + headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); + // send message after 3000ms + long delayTime = System.currentTimeMillis() + 3000; + headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime); + Message msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers); + streamBridge.send("producer-out-0", msg); + } + }; + } + + @Bean + public Consumer> consumer() { return msg -> { - log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload()); + log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getMsg()); }; } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml index 25ed6dd9b..1fcd79b9f 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-delay-consume-example/src/main/resources/application.yml @@ -6,7 +6,7 @@ spring: cloud: stream: function: - definition: producer;consumer; + definition: consumer; rocketmq: binder: name-server: localhost:9876 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/pom.xml new file mode 100644 index 000000000..a002bca53 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/pom.xml @@ -0,0 +1,19 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../pom.xml + + 4.0.0 + + + rocketmq-example-common + Spring Cloud Starter Stream Alibaba RocketMQ Example COMMON + Some rocketMQ exmaple common codes + jar + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/java/com/alibaba/cloud/examples/common/SimpleMsg.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/java/com/alibaba/cloud/examples/common/SimpleMsg.java new file mode 100644 index 000000000..3ec591aad --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/java/com/alibaba/cloud/examples/common/SimpleMsg.java @@ -0,0 +1,37 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples.common; + +/** + * @author sorie + */ +public class SimpleMsg { + + private String msg; + + public SimpleMsg(String msg) { + this.msg = msg; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/resources/application.yml new file mode 100644 index 000000000..2ff74a231 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-example-common/src/main/resources/application.yml @@ -0,0 +1,29 @@ +server: + port: 28087 +spring: + application: + name: rocketmq-sql-consume-example + cloud: + stream: + function: + definition: producer;consumer; + rocketmq: + binder: + name-server: localhost:9876 + bindings: + producer-out-0: + producer: + group: output_1 + consumer-in-0: + consumer: +# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } . + subscription: sql:(color in ('red1', 'red2', 'red4') and price>3) + bindings: + producer-out-0: + destination: sql + consumer-in-0: + destination: sql + group: sql-group +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml index 14d80e313..83d93942b 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/pom.xml @@ -30,6 +30,11 @@ org.springframework.boot spring-boot-starter-json + + com.alibaba.cloud + rocketmq-example-common + ${revision} + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java index e1b275838..8611e8d6c 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/OrderlyMessageQueueSelector.java @@ -22,6 +22,8 @@ import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; @@ -32,10 +34,13 @@ import org.springframework.stereotype.Component; */ @Component public class OrderlyMessageQueueSelector implements MessageQueueSelector { + private static final Logger log = LoggerFactory + .getLogger(OrderlyMessageQueueSelector.class); @Override public MessageQueue select(List mqs, Message msg, Object arg) { Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID); - int index = id % Math.min(mqs.size(), RocketMQOrderlyConsumeApplication.tags.length); + String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS); + int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size(); return mqs.get(index); } } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java index 53dc15a40..b87f45b21 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/java/com/alibaba/cloud/examples/orderly/RocketMQOrderlyConsumeApplication.java @@ -19,16 +19,18 @@ package com.alibaba.cloud.examples.orderly; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Supplier; +import com.alibaba.cloud.examples.common.SimpleMsg; import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport; import org.apache.rocketmq.common.message.MessageConst; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; @@ -40,6 +42,9 @@ public class RocketMQOrderlyConsumeApplication { private static final Logger log = LoggerFactory .getLogger(RocketMQOrderlyConsumeApplication.class); + @Autowired + private StreamBridge streamBridge; + /*** * tag array. */ @@ -50,26 +55,26 @@ public class RocketMQOrderlyConsumeApplication { } @Bean - public Supplier>> producer() { - return () -> { - return Flux.range(0, 100).map(i -> { + public ApplicationRunner producer() { + return args -> { + for (int i = 0; i < 100; i++) { String key = "KEY" + i; Map headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_KEYS, key); headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]); headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); - Message msg = new GenericMessage("Hello RocketMQ " + i, headers); - return msg; - }).log(); + Message msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers); + streamBridge.send("producer-out-0", msg); + } }; } @Bean - public Consumer> consumer() { + public Consumer> consumer() { return msg -> { String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey( MessageConst.PROPERTY_TAGS).toString(); - log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " TAG:" + + log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" + msg.getHeaders().get(tagHeaderKey).toString()); try { Thread.sleep(100); diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml index a7eb232b0..c14ee5d73 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-orderly-consume-example/src/main/resources/application.yml @@ -6,7 +6,7 @@ spring: cloud: stream: function: - definition: producer;consumer; + definition: consumer; rocketmq: binder: name-server: localhost:9876 diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml index 534d05021..294c8c862 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/pom.xml @@ -32,7 +32,11 @@ org.springframework.boot spring-boot-starter-actuator - + + com.alibaba.cloud + rocketmq-example-common + ${revision} + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java index fea68af68..78a07def7 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/java/com/alibaba/cloud/examples/sql/RocketMQSqlConsumeApplication.java @@ -19,15 +19,17 @@ package com.alibaba.cloud.examples.broadcast; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; -import java.util.function.Supplier; +import com.alibaba.cloud.examples.common.SimpleMsg; import org.apache.rocketmq.common.message.MessageConst; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; @@ -38,7 +40,8 @@ import org.springframework.messaging.support.GenericMessage; public class RocketMQSqlConsumeApplication { private static final Logger log = LoggerFactory .getLogger(RocketMQSqlConsumeApplication.class); - + @Autowired + private StreamBridge streamBridge; public static void main(String[] args) { SpringApplication.run(RocketMQSqlConsumeApplication.class, args); } @@ -54,27 +57,27 @@ public class RocketMQSqlConsumeApplication { public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5}; @Bean - public Supplier>> producer() { - return () -> { - return Flux.range(0, 100).map(i -> { + public ApplicationRunner producer() { + return args -> { + for (int i = 0; i < 100; i++) { String key = "KEY" + i; Map headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_KEYS, key); headers.put("color", color[i % color.length]); headers.put("price", price[i % price.length]); headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); - Message msg = new GenericMessage("Hello RocketMQ " + i, headers); - return msg; - }).log(); + Message msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers); + streamBridge.send("producer-out-0", msg); + } }; } @Bean - public Consumer> consumer() { + public Consumer> consumer() { return msg -> { String colorHeaderKey = "color"; String priceHeaderKey = "price"; - log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " COLOR:" + + log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " COLOR:" + msg.getHeaders().get(colorHeaderKey).toString() + " " + "PRICE: " + msg.getHeaders().get(priceHeaderKey).toString()); }; diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml index 2ff74a231..b7f41e841 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-sql-consume-example/src/main/resources/application.yml @@ -6,7 +6,7 @@ spring: cloud: stream: function: - definition: producer;consumer; + definition: consumer; rocketmq: binder: name-server: localhost:9876