From e3c791179b6b9e5a08493d6677b70431a7ac8c37 Mon Sep 17 00:00:00 2001 From: panzhi33 Date: Tue, 13 Apr 2021 17:51:24 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=87=E7=BA=A7rocketmq-spring-boot-starter?= =?UTF-8?q?=E7=89=88=E6=9C=AC=EF=BC=8C=E5=95=86=E4=B8=9A=E7=89=88=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E8=BD=A8=E8=BF=B9=E6=97=A0=E9=9C=80=E6=89=8B=E5=8A=A8?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E8=BD=A8=E8=BF=B9topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../src/main/asciidoc-zh/rocketmq.adoc | 6 ++++++ .../src/main/asciidoc/rocketmq.adoc | 6 +++++- .../stream/binder/rocketmq/RocketMQBinderUtils.java | 7 +++++++ .../rocketmq/RocketMQMessageChannelBinder.java | 4 ++++ .../RocketMQComponent4BinderAutoConfiguration.java | 6 ++++++ .../consuming/RocketMQListenerBindingContainer.java | 5 +++++ .../RocketMQBinderConfigurationProperties.java | 13 +++++++++++++ .../rocketmq/RocketMQAutoConfigurationTests.java | 4 +++- 9 files changed, 50 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 0208a34b4..3fb48600e 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ 4.0.1 - 2.0.2 + 2.0.4 3.7.0 diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc index 340da4d8b..7823c649f 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc @@ -249,6 +249,11 @@ spring.cloud.stream.rocketmq.binder.customized-trace-topic:: 消息轨迹开启后存储的 topic 名称。 + Default: `RMQ_SYS_TRACE_TOPIC`. ++ +spring.cloud.stream.rocketmq.binder.access-channel:: +商业版rocketmq消息轨迹topic自适应,值为CLOUD ++ +Default: null. ==== RocketMQ Consumer Properties @@ -346,6 +351,7 @@ NOTE: 0.1.2 & 0.2.2 & 0.9.0 才支持该功能 spring.cloud.stream.rocketmq.binder.access-key=YourAccessKey spring.cloud.stream.rocketmq.binder.secret-key=YourSecretKey spring.cloud.stream.rocketmq.binder.name-server=NameServerInMQ +spring.cloud.stream.rocketmq.binder.access-channel=CLOUD ``` NOTE: topic 和 group 请以 实例id% 为前缀进行配置。比如 topic 为 "test",需要配置成 "实例id%test" diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc index 540d42c41..08cad42cf 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc @@ -246,7 +246,11 @@ spring.cloud.stream.rocketmq.binder.customized-trace-topic:: The trace topic for message trace. + Default: `RMQ_SYS_TRACE_TOPIC`. - ++ +spring.cloud.stream.rocketmq.binder.access-channel:: +The commercial version of rocketmq message trajectory topic is adaptive,the value is CLOUD ++ +Default: null. ==== RocketMQ Consumer Properties diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java index c7daff0ef..83a1c3604 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java @@ -76,6 +76,13 @@ public final class RocketMQBinderUtils { result.setEnableMsgTrace( rocketBinderConfigurationProperties.isEnableMsgTrace()); } + if (StringUtils.isEmpty(rocketMQProperties.getAccessChannel())) { + result.setAccessChannel(rocketBinderConfigurationProperties.getAccessChannel()); + } + else { + result.setAccessChannel( + rocketMQProperties.getAccessChannel()); + } return result; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index e5a2e24b1..7d0d997cc 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -38,6 +38,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.RPCHook; @@ -158,6 +159,9 @@ public class RocketMQMessageChannelBinder extends producerProperties.getExtension().isRetryNextServer()); producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); + if (!StringUtils.isEmpty(mergedProperties.getAccessChannel())) { + producer.setAccessChannel(AccessChannel.valueOf(mergedProperties.getAccessChannel())); + } rocketMQTemplate.setProducer(producer); if (producerProperties.isPartitioned()) { rocketMQTemplate diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java index 6968ead86..5b1fb513a 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java @@ -20,6 +20,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; import org.apache.rocketmq.spring.config.RocketMQConfigUtils; @@ -59,6 +60,8 @@ public class RocketMQComponent4BinderAutoConfiguration { "${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}"); String sk = environment.resolveRequiredPlaceholders( "${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}"); + String accessChannel = environment.resolveRequiredPlaceholders( + "${spring.cloud.stream.rocketmq.binder.access-channel:${rocketmq.access-channel:}}"); if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP, new AclClientRPCHook(new SessionCredentials(ak, sk))); @@ -71,6 +74,9 @@ public class RocketMQComponent4BinderAutoConfiguration { configNameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER; } producer.setNamesrvAddr(configNameServer); + if (!StringUtils.isEmpty(configNameServer)) { + producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); + } return producer; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index fb167c697..ecf5f2a01 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -26,6 +26,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -238,6 +239,10 @@ public class RocketMQListenerBindingContainer consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); + if (!StringUtils.isEmpty(rocketBinderConfigurationProperties.getAccessChannel())) { + consumer.setAccessChannel(AccessChannel.valueOf(rocketBinderConfigurationProperties.getAccessChannel())); + } + switch (messageModel) { case BROADCASTING: consumer.setMessageModel( diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index 3a9dcb403..686b46a7e 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -47,6 +47,11 @@ public class RocketMQBinderConfigurationProperties { */ private String secretKey; + /** + * Enum type for accessChannel, values: LOCAL, CLOUD + */ + private String accessChannel; + /** * Switch flag instance for message trace. */ @@ -82,6 +87,14 @@ public class RocketMQBinderConfigurationProperties { this.secretKey = secretKey; } + public String getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(String accessChannel) { + this.accessChannel = accessChannel; + } + public boolean isEnableMsgTrace() { return enableMsgTrace; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java index 8207f9893..6feee3b97 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -50,7 +50,8 @@ public class RocketMQAutoConfigurationTests { "spring.cloud.stream.bindings.input2.content-type=application/json", "spring.cloud.stream.bindings.input2.group=test-group2", "spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false", - "spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1"); + "spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1", + "spring.cloud.stream.rocketmq.binder.access-channel=CLOUD"); @Test public void testProperties() { @@ -68,6 +69,7 @@ public class RocketMQAutoConfigurationTests { .getOrderly()).isFalse(); assertThat(bindingProperties.getExtendedConsumerProperties("input1") .getOrderly()).isTrue(); + assertThat(binderConfigurationProperties.getAccessChannel()).isEqualTo("CLOUD"); }); }