From ead86ec2db587b38db4f432dc8490ba151778f4e Mon Sep 17 00:00:00 2001 From: sorie Date: Sun, 17 Apr 2022 12:55:17 +0800 Subject: [PATCH] rocketmq anonymous group supports. --- .../RocketMQMessageChannelBinder.java | 29 ++++-- .../rocketmq/constant/RocketMQConst.java | 2 +- .../RocketMQMessageChannelBinderTest.java | 95 +++++++++++++++++++ .../rocketmq/TestConsumerDestination.java | 32 +++++++ 4 files changed, 149 insertions(+), 9 deletions(-) create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinderTest.java create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/TestConsumerDestination.java diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 71f287150..4b669dd99 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,6 +16,7 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst; import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler; import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter; @@ -28,6 +29,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindi import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; +import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; @@ -48,6 +50,7 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; + /** * A {@link org.springframework.cloud.stream.binder.Binder} that uses RocketMQ as the * underlying middleware. @@ -113,15 +116,17 @@ public class RocketMQMessageChannelBinder extends String group, ExtendedConsumerProperties extendedConsumerProperties) throws Exception { - /** - * todo support anymous consumer - * but anymous consumer will create diff SubscriptionGroup - * consumption progress will be recalculated. - */ - if (!StringUtils.hasLength(group)) { + boolean anonymous = !StringUtils.hasLength(group); + /*** + * When using DLQ, at least the group property must be provided for proper naming of the DLQ destination + * According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference + */ + if (anonymous && NamespaceUtil.isDLQTopic(destination.getName())) { throw new RuntimeException( - "'group must be configured for channel " + destination.getName()); + "group must be configured for DLQ" + destination.getName()); } + group = anonymous ? anonymousGroup(destination.getName()) : group; + RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties, extendedConsumerProperties.getExtension()); extendedConsumerProperties.getExtension().setGroup(group); @@ -177,6 +182,15 @@ public class RocketMQMessageChannelBinder extends }; } + /** + * generate anonymous group. + * @param destination not null + * @return anonymous group name. + */ + private static String anonymousGroup(final String destination) { + return RocketMQConst.DEFAULT_GROUP + "_" + destination; + } + /** * Binders can return an {@link ErrorMessageStrategy} for building error messages; * binder implementations typically might add extra headers to the error message. @@ -207,5 +221,4 @@ public class RocketMQMessageChannelBinder extends public Class getExtendedPropertiesEntryClass() { return this.extendedBindingProperties.getExtendedPropertiesEntryClass(); } - } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java index d0a3b88e0..d8e0b881f 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java @@ -31,7 +31,7 @@ public class RocketMQConst extends MessageConst { /** * Default group for SCS RocketMQ Binder. */ - public static final String DEFAULT_GROUP = "binder_default_group_name"; + public static final String DEFAULT_GROUP = "anonymous"; /** * user args for SCS RocketMQ Binder. diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinderTest.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinderTest.java new file mode 100644 index 000000000..da6122308 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinderTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq; + +import javax.annotation.Resource; + +import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.ExtendedBindingHandlerMappingsProviderConfiguration; +import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration; +import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.core.MessageProducer; + +import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE; + + +@SpringBootTest(classes = RocketMQMessageChannelBinderTest.TestConfig.class, + webEnvironment = NONE, + properties = { + "spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876", + "spring.cloud.stream.bindings.output.destination=TopicOrderTest", + "spring.cloud.stream.bindings.output.content-type=application/json", + + "spring.cloud.stream.bindings.input1.destination=TopicOrderTest", + "spring.cloud.stream.bindings.input1.content-type=application/json", + "spring.cloud.stream.bindings.input1.group=test-group1", + "spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true", + "spring.cloud.stream.bindings.input1.consumer.maxAttempts=1", + "spring.cloud.stream.bindings.input2.destination=TopicOrderTest", + "spring.cloud.stream.bindings.input2.content-type=application/json", + "spring.cloud.stream.bindings.input2.group=test-group2", + "spring.cloud.stream.rocketmq.bindings.input2.consumer.push.orderly=false", + "spring.cloud.stream.rocketmq.bindings.input2.consumer.subscription=tag1" + }) +public class RocketMQMessageChannelBinderTest { + @Resource + RocketMQMessageChannelBinder binder; + @Test + public void createConsumerEndpoint() throws Exception { + TestConsumerDestination destination = new TestConsumerDestination("test"); + MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, "test", + new ExtendedConsumerProperties<>(new RocketMQConsumerProperties())); + Assertions.assertNotNull(consumerEndpoint); + } + + @Test + public void createAnymousConsumerEndpoint() throws Exception { + ExtendedConsumerProperties extendedConsumerProperties + = new ExtendedConsumerProperties<>(new RocketMQConsumerProperties()); + + TestConsumerDestination destination = new TestConsumerDestination("test"); + MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, null, + extendedConsumerProperties); + Assertions.assertNotNull(consumerEndpoint); + Assertions.assertEquals(RocketMQConst.DEFAULT_GROUP + "_test", extendedConsumerProperties.getExtension().getGroup()); + } + + @Test + public void createDLQAnymousConsumerEndpoint() throws Exception { + TestConsumerDestination destination = new TestConsumerDestination("%DLQ%test"); + Assertions.assertThrows(RuntimeException.class, () -> { + MessageProducer consumerEndpoint = binder.createConsumerEndpoint(destination, null, + new ExtendedConsumerProperties<>(new RocketMQConsumerProperties())); + }); + } + + @Configuration + @EnableAutoConfiguration + @ImportAutoConfiguration({ ExtendedBindingHandlerMappingsProviderConfiguration.class, + RocketMQBinderAutoConfiguration.class}) + public static class TestConfig { + + } +} diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/TestConsumerDestination.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/TestConsumerDestination.java new file mode 100644 index 000000000..9c68b9eac --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/TestConsumerDestination.java @@ -0,0 +1,32 @@ +/* + * Copyright 2013-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.cloud.stream.binder.rocketmq; + +import org.springframework.cloud.stream.provisioning.ConsumerDestination; + +public class TestConsumerDestination implements ConsumerDestination { + private String name; + + public TestConsumerDestination(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } +}