Refactor Rocketmq: use StreamBridge.send() instead of functional producer

pull/2570/head
sorie 3 years ago
parent 677847a477
commit 029ea6fd53

@ -42,6 +42,7 @@
<module>rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-consumer2-example</module>
<module>rocketmq-example/rocketmq-delay-consume-example</module>
<module>rocketmq-example/rocketmq-sql-consume-example</module>
<module>rocketmq-example/rocketmq-example-common</module>
<module>spring-cloud-bus-rocketmq-example</module>
<module>spring-cloud-alibaba-dubbo-examples</module>

@ -31,6 +31,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>rocketmq-example-common</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -18,6 +18,7 @@ package com.alibaba.cloud.examples.broadcast;
import java.util.function.Consumer;
import com.alibaba.cloud.examples.common.SimpleMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,9 +40,9 @@ public class RocketMQBroadcastConsumer1Application {
}
@Bean
public Consumer<Message<String>> consumer() {
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload());
log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}

@ -30,6 +30,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>rocketmq-example-common</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -18,6 +18,7 @@ package com.alibaba.cloud.examples.broadcast;
import java.util.function.Consumer;
import com.alibaba.cloud.examples.common.SimpleMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,10 +38,11 @@ public class RocketMQBroadcastConsumer2Application {
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastConsumer2Application.class, args);
}
@Bean
public Consumer<Message<String>> consumer() {
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload());
log.info(Thread.currentThread().getName() + " Consumer2 Receive New Messages: " + msg.getPayload().getMsg());
};
}
}

@ -30,6 +30,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>rocketmq-example-common</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -18,15 +18,18 @@ package com.alibaba.cloud.examples.broadcast;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import com.alibaba.cloud.examples.common.SimpleMsg;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
@ -39,22 +42,23 @@ import org.springframework.messaging.support.GenericMessage;
public class RocketMQBroadcastProducerApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQBroadcastProducerApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
}
@Bean
public Supplier<Flux<Message<String>>> producer() {
return () -> {
return Flux.range(0, 100).map(i -> {
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
return msg;
}).log();
Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
}

@ -5,8 +5,6 @@ spring:
name: rocketmq-broadcast-producer-example
cloud:
stream:
function:
definition: producer;
rocketmq:
binder:
name-server: localhost:9876

@ -33,6 +33,11 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>rocketmq-example-common</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -19,15 +19,17 @@ package com.alibaba.cloud.examples.broadcast;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import com.alibaba.cloud.examples.common.SimpleMsg;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
@ -38,30 +40,49 @@ import org.springframework.messaging.support.GenericMessage;
public class RocketMQDelayConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQDelayConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQDelayConsumeApplication.class, args);
}
@Bean
public Supplier<Flux<Message<String>>> producer() {
return () -> {
return Flux.range(0, 100).map(i -> {
public ApplicationRunner producerDelay() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
headers.put("DELAY", 2);
Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
return msg;
}).log();
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<String>> consumer() {
public ApplicationRunner producerSchedule() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
// send message after 3000ms
long delayTime = System.currentTimeMillis() + 3000;
headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload());
log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getMsg());
};
}
}

@ -6,7 +6,7 @@ spring:
cloud:
stream:
function:
definition: producer;consumer;
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-examples</artifactId>
<version>${revision}</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-example-common</artifactId>
<name>Spring Cloud Starter Stream Alibaba RocketMQ Example COMMON</name>
<description>Some rocketMQ exmaple common codes</description>
<packaging>jar</packaging>
</project>

@ -0,0 +1,37 @@
/*
* Copyright 2013-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.examples.common;
/**
* @author sorie
*/
public class SimpleMsg {
private String msg;
public SimpleMsg(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}

@ -0,0 +1,29 @@
server:
port: 28087
spring:
application:
name: rocketmq-sql-consume-example
cloud:
stream:
function:
definition: producer;consumer;
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
consumer-in-0:
consumer:
# tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
subscription: sql:(color in ('red1', 'red2', 'red4') and price>3)
bindings:
producer-out-0:
destination: sql
consumer-in-0:
destination: sql
group: sql-group
logging:
level:
org.springframework.context.support: debug

@ -30,6 +30,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>rocketmq-example-common</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -22,6 +22,8 @@ import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
@ -32,10 +34,13 @@ import org.springframework.stereotype.Component;
*/
@Component
public class OrderlyMessageQueueSelector implements MessageQueueSelector {
private static final Logger log = LoggerFactory
.getLogger(OrderlyMessageQueueSelector.class);
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
int index = id % Math.min(mqs.size(), RocketMQOrderlyConsumeApplication.tags.length);
String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
int index = id % RocketMQOrderlyConsumeApplication.tags.length % mqs.size();
return mqs.get(index);
}
}

@ -19,16 +19,18 @@ package com.alibaba.cloud.examples.orderly;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import com.alibaba.cloud.examples.common.SimpleMsg;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
@ -40,6 +42,9 @@ public class RocketMQOrderlyConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQOrderlyConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
/***
* tag array.
*/
@ -50,26 +55,26 @@ public class RocketMQOrderlyConsumeApplication {
}
@Bean
public Supplier<Flux<Message<String>>> producer() {
return () -> {
return Flux.range(0, 100).map(i -> {
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
return msg;
}).log();
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<String>> consumer() {
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
MessageConst.PROPERTY_TAGS).toString();
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " TAG:" +
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
msg.getHeaders().get(tagHeaderKey).toString());
try {
Thread.sleep(100);

@ -6,7 +6,7 @@ spring:
cloud:
stream:
function:
definition: producer;consumer;
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876

@ -32,7 +32,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>rocketmq-example-common</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
<build>

@ -19,15 +19,17 @@ package com.alibaba.cloud.examples.broadcast;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import com.alibaba.cloud.examples.common.SimpleMsg;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
@ -38,7 +40,8 @@ import org.springframework.messaging.support.GenericMessage;
public class RocketMQSqlConsumeApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQSqlConsumeApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQSqlConsumeApplication.class, args);
}
@ -54,27 +57,27 @@ public class RocketMQSqlConsumeApplication {
public static final Integer[] price = new Integer[] {1, 2, 3, 4, 5};
@Bean
public Supplier<Flux<Message<String>>> producer() {
return () -> {
return Flux.range(0, 100).map(i -> {
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put("color", color[i % color.length]);
headers.put("price", price[i % price.length]);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<String> msg = new GenericMessage("Hello RocketMQ " + i, headers);
return msg;
}).log();
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer<Message<String>> consumer() {
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
String colorHeaderKey = "color";
String priceHeaderKey = "price";
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload() + " COLOR:" +
log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " COLOR:" +
msg.getHeaders().get(colorHeaderKey).toString() + " " +
"PRICE: " + msg.getHeaders().get(priceHeaderKey).toString());
};

@ -6,7 +6,7 @@ spring:
cloud:
stream:
function:
definition: producer;consumer;
definition: consumer;
rocketmq:
binder:
name-server: localhost:9876

Loading…
Cancel
Save