diff --git a/pom.xml b/pom.xml index 70017da3f..4c6f1f82d 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ 4.0.1 - 2.0.2 + 2.0.4 3.7.0 diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc index 340da4d8b..7823c649f 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq.adoc @@ -249,6 +249,11 @@ spring.cloud.stream.rocketmq.binder.customized-trace-topic:: 消息轨迹开启后存储的 topic 名称。 + Default: `RMQ_SYS_TRACE_TOPIC`. ++ +spring.cloud.stream.rocketmq.binder.access-channel:: +商业版rocketmq消息轨迹topic自适应,值为CLOUD ++ +Default: null. ==== RocketMQ Consumer Properties @@ -346,6 +351,7 @@ NOTE: 0.1.2 & 0.2.2 & 0.9.0 才支持该功能 spring.cloud.stream.rocketmq.binder.access-key=YourAccessKey spring.cloud.stream.rocketmq.binder.secret-key=YourSecretKey spring.cloud.stream.rocketmq.binder.name-server=NameServerInMQ +spring.cloud.stream.rocketmq.binder.access-channel=CLOUD ``` NOTE: topic 和 group 请以 实例id% 为前缀进行配置。比如 topic 为 "test",需要配置成 "实例id%test" diff --git a/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc b/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc index 305813e8f..4362375d9 100644 --- a/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc +++ b/spring-cloud-alibaba-docs/src/main/asciidoc/rocketmq.adoc @@ -246,7 +246,11 @@ spring.cloud.stream.rocketmq.binder.customized-trace-topic:: The trace topic for message trace. + Default: `RMQ_SYS_TRACE_TOPIC`. - ++ +spring.cloud.stream.rocketmq.binder.access-channel:: +The commercial version of rocketmq message trajectory topic is adaptive,the value is CLOUD ++ +Default: null. ==== RocketMQ Consumer Properties diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java index c7daff0ef..83a1c3604 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java @@ -76,6 +76,13 @@ public final class RocketMQBinderUtils { result.setEnableMsgTrace( rocketBinderConfigurationProperties.isEnableMsgTrace()); } + if (StringUtils.isEmpty(rocketMQProperties.getAccessChannel())) { + result.setAccessChannel(rocketBinderConfigurationProperties.getAccessChannel()); + } + else { + result.setAccessChannel( + rocketMQProperties.getAccessChannel()); + } return result; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index e5a2e24b1..7d0d997cc 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -38,6 +38,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.remoting.RPCHook; @@ -158,6 +159,9 @@ public class RocketMQMessageChannelBinder extends producerProperties.getExtension().isRetryNextServer()); producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); + if (!StringUtils.isEmpty(mergedProperties.getAccessChannel())) { + producer.setAccessChannel(AccessChannel.valueOf(mergedProperties.getAccessChannel())); + } rocketMQTemplate.setProducer(producer); if (producerProperties.isPartitioned()) { rocketMQTemplate diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java index 6968ead86..5b1fb513a 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java @@ -20,6 +20,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; import org.apache.rocketmq.spring.config.RocketMQConfigUtils; @@ -59,6 +60,8 @@ public class RocketMQComponent4BinderAutoConfiguration { "${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}"); String sk = environment.resolveRequiredPlaceholders( "${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}"); + String accessChannel = environment.resolveRequiredPlaceholders( + "${spring.cloud.stream.rocketmq.binder.access-channel:${rocketmq.access-channel:}}"); if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP, new AclClientRPCHook(new SessionCredentials(ak, sk))); @@ -71,6 +74,9 @@ public class RocketMQComponent4BinderAutoConfiguration { configNameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER; } producer.setNamesrvAddr(configNameServer); + if (!StringUtils.isEmpty(configNameServer)) { + producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); + } return producer; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index fb167c697..ecf5f2a01 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -26,6 +26,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -238,6 +239,10 @@ public class RocketMQListenerBindingContainer consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); + if (!StringUtils.isEmpty(rocketBinderConfigurationProperties.getAccessChannel())) { + consumer.setAccessChannel(AccessChannel.valueOf(rocketBinderConfigurationProperties.getAccessChannel())); + } + switch (messageModel) { case BROADCASTING: consumer.setMessageModel( diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index 3a9dcb403..ae735840c 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -47,6 +47,11 @@ public class RocketMQBinderConfigurationProperties { */ private String secretKey; + /** + * Enum type for accessChannel, values: LOCAL, CLOUD. + */ + private String accessChannel; + /** * Switch flag instance for message trace. */ @@ -82,6 +87,14 @@ public class RocketMQBinderConfigurationProperties { this.secretKey = secretKey; } + public String getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(String accessChannel) { + this.accessChannel = accessChannel; + } + public boolean isEnableMsgTrace() { return enableMsgTrace; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java index 8207f9893..6feee3b97 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -50,7 +50,8 @@ public class RocketMQAutoConfigurationTests { "spring.cloud.stream.bindings.input2.content-type=application/json", "spring.cloud.stream.bindings.input2.group=test-group2", "spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false", - "spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1"); + "spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1", + "spring.cloud.stream.rocketmq.binder.access-channel=CLOUD"); @Test public void testProperties() { @@ -68,6 +69,7 @@ public class RocketMQAutoConfigurationTests { .getOrderly()).isFalse(); assertThat(bindingProperties.getExtendedConsumerProperties("input1") .getOrderly()).isTrue(); + assertThat(binderConfigurationProperties.getAccessChannel()).isEqualTo("CLOUD"); }); }