diff --git a/pom.xml b/pom.xml index 5025e743d..f1915043b 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ 4.0.1 - 4.6.1 + 4.9.2 3.8.1 diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index ec0588f31..b6000922c 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -40,6 +40,7 @@ seata-example/account-service rocketmq-example/rocketmq-consume-example rocketmq-example/rocketmq-produce-example + rocketmq-example/rocketmq-comprehensive-example spring-cloud-bus-rocketmq-example spring-cloud-alibaba-dubbo-examples spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/pom.xml new file mode 100644 index 000000000..d3252473f --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/pom.xml @@ -0,0 +1,52 @@ + + + + + com.alibaba.cloud + spring-cloud-alibaba-examples + ${revision} + ../../pom.xml + + 4.0.0 + + + rocketmq-comprehensive-example + Spring Cloud Starter Stream Alibaba RocketMQ Comprehensive Example + Example demonstrating how to use rocketmq to produce, process and consume + jar + + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-json + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-deploy-plugin + ${maven-deploy-plugin.version} + + true + + + + + + diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/java/com/alibaba/cloud/examples/RocketMQComprehensiveApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/java/com/alibaba/cloud/examples/RocketMQComprehensiveApplication.java new file mode 100644 index 000000000..724ea7300 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/java/com/alibaba/cloud/examples/RocketMQComprehensiveApplication.java @@ -0,0 +1,75 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples; + +import java.time.Duration; +import java.util.Arrays; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.support.StringObjectMapBuilder; + +/** + * @author freeman + */ +@SpringBootApplication +public class RocketMQComprehensiveApplication { + private static final Logger log = LoggerFactory + .getLogger(RocketMQComprehensiveApplication.class); + + public static void main(String[] args) { + SpringApplication.run(RocketMQComprehensiveApplication.class, args); + } + + @Bean + public Supplier> producer() { + return () -> Flux.interval(Duration.ofSeconds(2)).map(id -> { + User user = new User(); + user.setId(id.toString()); + user.setName("freeman"); + user.setMeta(new StringObjectMapBuilder() + .put("hobbies", Arrays.asList("movies", "songs")).put("age", 21) + .get()); + return user; + }).log(); + } + + @Bean + public Function, Flux> processor() { + return flux -> flux.map(user -> { + user.setId(String.valueOf( + Long.parseLong(user.getId()) * Long.parseLong(user.getId()))); + user.setName("not freeman"); + user.getMeta().put("hobbies", Arrays.asList("programming")); + return user; + }); + } + + @Bean + public Consumer consumer() { + return num -> log.info(num.toString()); + } + +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/java/com/alibaba/cloud/examples/User.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/java/com/alibaba/cloud/examples/User.java new file mode 100644 index 000000000..83d0c8a12 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/java/com/alibaba/cloud/examples/User.java @@ -0,0 +1,58 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.examples; + +import java.util.Map; + +/** + * @author freeman + */ +public class User { + private String id; + private String name; + private Map meta; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Map getMeta() { + return meta; + } + + public void setMeta(Map meta) { + this.meta = meta; + } + + @Override + public String toString() { + return "User{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", meta=" + meta + + '}'; + } +} diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml new file mode 100644 index 000000000..fa752afd9 --- /dev/null +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml @@ -0,0 +1,36 @@ +server: + port: 28082 +spring: + application: + name: rocketmq-comprehensive-example + cloud: + stream: + function: + definition: producer;consumer;processor + rocketmq: + binder: + name-server: 127.0.0.1:9876 + bindings: + # TODO producer must have a group, need optimization !!! + producer-out-0: + producer: + group: output_1 + processor-out-0: + producer: + group: output_2 + + bindings: + producer-out-0: + destination: num + processor-out-0: + destination: square + processor-in-0: + destination: num + group: processor_group + consumer-in-0: + destination: square + group: consumer_group + +logging: + level: + org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/pom.xml b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/pom.xml index 6af5a64a8..6339a5d36 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/pom.xml +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/pom.xml @@ -24,24 +24,6 @@ true - - org.springframework.boot - spring-boot - true - - - - org.springframework.boot - spring-boot-autoconfigure - true - - - - org.springframework.boot - spring-boot-actuator - true - - org.springframework.boot spring-boot-actuator-autoconfigure @@ -56,18 +38,13 @@ org.apache.rocketmq rocketmq-acl + + org.springframework.boot spring-boot-starter-test test - - - junit - junit - test - - diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java index df5d47b6b..df7374b06 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java @@ -48,7 +48,7 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration { } @Bean - public RocketMQConfigBeanPostProcessor rocketMQConfigBeanPostProcessor() { + public static RocketMQConfigBeanPostProcessor rocketMQConfigBeanPostProcessor() { return new RocketMQConfigBeanPostProcessor(); } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java index b9b85db24..8410b7d86 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java @@ -24,6 +24,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvis import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; +import org.springframework.boot.actuate.health.HealthIndicator; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -46,13 +47,6 @@ public class RocketMQBinderAutoConfiguration { @Autowired private RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; - @Bean - @ConditionalOnEnabledHealthIndicator("rocketmq") - @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator") - public RocketMQBinderHealthIndicator rocketMQBinderHealthIndicator() { - return new RocketMQBinderHealthIndicator(); - } - @Bean public RocketMQTopicProvisioner rocketMQTopicProvisioner() { return new RocketMQTopicProvisioner(); @@ -65,4 +59,16 @@ public class RocketMQBinderAutoConfiguration { extendedBindingProperties, provisioningProvider); } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass(HealthIndicator.class) + @ConditionalOnEnabledHealthIndicator("rocketmq") + static class KafkaBinderHealthIndicatorConfiguration { + + @Bean + public RocketMQBinderHealthIndicator rocketMQBinderHealthIndicator() { + return new RocketMQBinderHealthIndicator(); + } + + } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java index c6e0a29b1..ace7a901d 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java @@ -33,7 +33,7 @@ import org.springframework.messaging.converter.CompositeMessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.CollectionUtils; import org.springframework.util.MimeTypeUtils; -import org.springframework.util.StringUtils; +import org.springframework.util.ObjectUtils; /** * @author zkzlx @@ -138,13 +138,13 @@ public final class RocketMQMessageConverterSupport { if (Objects.nonNull(headers) && !headers.isEmpty()) { Object tag = headers.getOrDefault(Headers.TAGS, headers.get(toRocketHeaderKey(Headers.TAGS))); - if (StringUtils.hasLength(tag.toString())) { + if (!ObjectUtils.isEmpty(tag)) { rocketMsg.setTags(String.valueOf(tag)); } Object keys = headers.getOrDefault(Headers.KEYS, headers.get(toRocketHeaderKey(Headers.KEYS))); - if (StringUtils.hasLength(keys.toString())) { + if (!ObjectUtils.isEmpty(keys)) { rocketMsg.setKeys(keys.toString()); } Object flagObj = headers.getOrDefault(Headers.FLAG, diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java index f850afb4e..7617b3fd6 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -19,7 +19,7 @@ package com.alibaba.cloud.stream.binder.rocketmq; import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner;