From bd800de133b67aa0cce15c48bb46f5deac9e7260 Mon Sep 17 00:00:00 2001 From: sorie Date: Tue, 15 Mar 2022 21:40:48 +0800 Subject: [PATCH] Fixs group must be configured for channel #2391 And delete rocketmq-comprehensive-example's consumer group configurations. --- .../src/main/resources/application.yml | 3 --- .../RocketMQMessageChannelBinder.java | 26 ++++++++++++++++--- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml index fa752afd9..8211f4de5 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-comprehensive-example/src/main/resources/application.yml @@ -18,7 +18,6 @@ spring: processor-out-0: producer: group: output_2 - bindings: producer-out-0: destination: num @@ -26,10 +25,8 @@ spring: destination: square processor-in-0: destination: num - group: processor_group consumer-in-0: destination: square - group: consumer_group logging: level: diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 7e1cea97e..160c4f809 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,6 +16,9 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import java.util.UUID; + +import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst; import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler; import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter; @@ -28,6 +31,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindi import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; @@ -48,6 +52,7 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; + /** * A {@link org.springframework.cloud.stream.binder.Binder} that uses RocketMQ as the * underlying middleware. @@ -113,11 +118,17 @@ public class RocketMQMessageChannelBinder extends String group, ExtendedConsumerProperties extendedConsumerProperties) throws Exception { - // todo support anymous consumer - if (!StringUtils.hasLength(group)) { + boolean anonymous = !StringUtils.hasLength(group); + /*** + * When using DLQ, at least the group property must be provided for proper naming of the DLQ destination + * According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference + */ + if (anonymous && NamespaceUtil.isDLQTopic(destination.getName())) { throw new RuntimeException( - "'group must be configured for channel " + destination.getName()); + "group must be configured for DLQ" + destination.getName()); } + group = anonymous ? nextDefaultConsumerGroup() : group; + RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties, extendedConsumerProperties.getExtension()); extendedConsumerProperties.getExtension().setGroup(group); @@ -173,6 +184,14 @@ public class RocketMQMessageChannelBinder extends }; } + /** + * generate next default consumer group. + * @return next default consumer group name. + */ + private static String nextDefaultConsumerGroup() { + return RocketMQConst.DEFAULT_GROUP + UUID.randomUUID().toString(); + } + /** * Binders can return an {@link ErrorMessageStrategy} for building error messages; * binder implementations typically might add extra headers to the error message. @@ -203,5 +222,4 @@ public class RocketMQMessageChannelBinder extends public Class getExtendedPropertiesEntryClass() { return this.extendedBindingProperties.getExtendedPropertiesEntryClass(); } - }