diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml index 7447538b3..f89f2de50 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml @@ -17,8 +17,6 @@ spring: destination: pollable pollable-in-0: destination: pollable - group: pollable-group - logging: level: org.springframework.context.support: debug diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 2cf593225..4b9d7d3f3 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -125,7 +125,7 @@ public class RocketMQMessageChannelBinder extends throw new RuntimeException( "group must be configured for DLQ" + destination.getName()); } - group = anonymous ? anonymousGroup(destination.getName()) : group; + group = anonymous ? RocketMQUtils.anonymousGroup(destination.getName()) : group; RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties, extendedConsumerProperties.getExtension()); @@ -182,15 +182,6 @@ public class RocketMQMessageChannelBinder extends }; } - /** - * generate anonymous group. - * @param destination not null - * @return anonymous group name. - */ - private static String anonymousGroup(final String destination) { - return RocketMQConst.DEFAULT_GROUP + "_" + destination; - } - /** * Binders can return an {@link ErrorMessageStrategy} for building error messages; * binder implementations typically might add extra headers to the error message. diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java index 074ee1aa5..db3018145 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.slf4j.Logger; @@ -97,15 +98,28 @@ public final class RocketMQConsumerFactory { /** * todo Compatible with versions less than 4.6 ? + * @param topic * @param extendedConsumerProperties extendedConsumerProperties * @return DefaultLitePullConsumer */ public static DefaultLitePullConsumer initPullConsumer( + String topic, ExtendedConsumerProperties extendedConsumerProperties) { RocketMQConsumerProperties consumerProperties = extendedConsumerProperties .getExtension(); - Assert.notNull(consumerProperties.getGroup(), - "Property 'group' is required - consumerGroup"); + boolean anonymous = !StringUtils.hasLength(consumerProperties.getGroup()); + /*** + * When using DLQ, at least the group property must be provided for proper naming of the DLQ destination + * According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference + */ + if (anonymous && NamespaceUtil.isDLQTopic(topic)) { + throw new RuntimeException( + "group must be configured for DLQ" + topic); + } + if (anonymous) { + consumerProperties.setGroup(RocketMQUtils.anonymousGroup(topic)); + } + Assert.notNull(consumerProperties.getNameServer(), "Property 'nameServer' is required"); AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java index c94a3ee87..286256260 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java @@ -87,7 +87,7 @@ public class RocketMQMessageSource extends AbstractMessageSource "pull consumer already running. " + this.toString()); } this.consumer = RocketMQConsumerFactory - .initPullConsumer(extendedConsumerProperties); + .initPullConsumer(topic, extendedConsumerProperties); // This parameter must be 1, otherwise doReceive cannot be handled singly. // this.consumer.setPullBatchSize(1); this.consumer.subscribe(topic, messageSelector); diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java index b431db74f..1e7e41984 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java @@ -100,4 +100,12 @@ public final class RocketMQUtils { return MessageSelector.byTag(expression); } + /** + * generate anonymous group. + * @param destination not null + * @return anonymous group name. + */ + public static String anonymousGroup(final String destination) { + return RocketMQConst.DEFAULT_GROUP + "_" + destination; + } }