Merge pull request #2034 from panzhi33/master

升级rocketmq-spring-boot-starter版本,商业版消息轨迹无需手动配置轨迹topic
pull/2254/head
zkzlx 3 years ago committed by GitHub
commit a530705983
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -90,7 +90,7 @@
<curator.version>4.0.1</curator.version> <curator.version>4.0.1</curator.version>
<!-- Apache RocketMQ --> <!-- Apache RocketMQ -->
<rocketmq.starter.version>2.0.2</rocketmq.starter.version> <rocketmq.starter.version>2.0.4</rocketmq.starter.version>
<!-- Maven Plugin Versions --> <!-- Maven Plugin Versions -->
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>

@ -249,6 +249,11 @@ spring.cloud.stream.rocketmq.binder.customized-trace-topic::
消息轨迹开启后存储的 topic 名称。 消息轨迹开启后存储的 topic 名称。
+ +
Default: `RMQ_SYS_TRACE_TOPIC`. Default: `RMQ_SYS_TRACE_TOPIC`.
+
spring.cloud.stream.rocketmq.binder.access-channel::
商业版rocketmq消息轨迹topic自适应值为CLOUD
+
Default: null.
==== RocketMQ Consumer Properties ==== RocketMQ Consumer Properties
@ -346,6 +351,7 @@ NOTE: 0.1.2 & 0.2.2 & 0.9.0 才支持该功能
spring.cloud.stream.rocketmq.binder.access-key=YourAccessKey spring.cloud.stream.rocketmq.binder.access-key=YourAccessKey
spring.cloud.stream.rocketmq.binder.secret-key=YourSecretKey spring.cloud.stream.rocketmq.binder.secret-key=YourSecretKey
spring.cloud.stream.rocketmq.binder.name-server=NameServerInMQ spring.cloud.stream.rocketmq.binder.name-server=NameServerInMQ
spring.cloud.stream.rocketmq.binder.access-channel=CLOUD
``` ```
NOTE: topic 和 group 请以 实例id% 为前缀进行配置。比如 topic 为 "test",需要配置成 "实例id%test" NOTE: topic 和 group 请以 实例id% 为前缀进行配置。比如 topic 为 "test",需要配置成 "实例id%test"

@ -246,7 +246,11 @@ spring.cloud.stream.rocketmq.binder.customized-trace-topic::
The trace topic for message trace. The trace topic for message trace.
+ +
Default: `RMQ_SYS_TRACE_TOPIC`. Default: `RMQ_SYS_TRACE_TOPIC`.
+
spring.cloud.stream.rocketmq.binder.access-channel::
The commercial version of rocketmq message trajectory topic is adaptive,the value is CLOUD
+
Default: null.
==== RocketMQ Consumer Properties ==== RocketMQ Consumer Properties

@ -76,6 +76,13 @@ public final class RocketMQBinderUtils {
result.setEnableMsgTrace( result.setEnableMsgTrace(
rocketBinderConfigurationProperties.isEnableMsgTrace()); rocketBinderConfigurationProperties.isEnableMsgTrace());
} }
if (StringUtils.isEmpty(rocketMQProperties.getAccessChannel())) {
result.setAccessChannel(rocketBinderConfigurationProperties.getAccessChannel());
}
else {
result.setAccessChannel(
rocketMQProperties.getAccessChannel());
}
return result; return result;
} }

@ -38,6 +38,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
@ -158,6 +159,9 @@ public class RocketMQMessageChannelBinder extends
producerProperties.getExtension().isRetryNextServer()); producerProperties.getExtension().isRetryNextServer());
producer.setMaxMessageSize( producer.setMaxMessageSize(
producerProperties.getExtension().getMaxMessageSize()); producerProperties.getExtension().getMaxMessageSize());
if (!StringUtils.isEmpty(mergedProperties.getAccessChannel())) {
producer.setAccessChannel(AccessChannel.valueOf(mergedProperties.getAccessChannel()));
}
rocketMQTemplate.setProducer(producer); rocketMQTemplate.setProducer(producer);
if (producerProperties.isPartitioned()) { if (producerProperties.isPartitioned()) {
rocketMQTemplate rocketMQTemplate

@ -20,6 +20,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.config.RocketMQConfigUtils; import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
@ -59,6 +60,8 @@ public class RocketMQComponent4BinderAutoConfiguration {
"${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}"); "${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}");
String sk = environment.resolveRequiredPlaceholders( String sk = environment.resolveRequiredPlaceholders(
"${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}"); "${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}");
String accessChannel = environment.resolveRequiredPlaceholders(
"${spring.cloud.stream.rocketmq.binder.access-channel:${rocketmq.access-channel:}}");
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP, producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP,
new AclClientRPCHook(new SessionCredentials(ak, sk))); new AclClientRPCHook(new SessionCredentials(ak, sk)));
@ -71,6 +74,9 @@ public class RocketMQComponent4BinderAutoConfiguration {
configNameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER; configNameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
} }
producer.setNamesrvAddr(configNameServer); producer.setNamesrvAddr(configNameServer);
if (!StringUtils.isEmpty(configNameServer)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
return producer; return producer;
} }

@ -26,6 +26,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@ -238,6 +239,10 @@ public class RocketMQListenerBindingContainer
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
if (!StringUtils.isEmpty(rocketBinderConfigurationProperties.getAccessChannel())) {
consumer.setAccessChannel(AccessChannel.valueOf(rocketBinderConfigurationProperties.getAccessChannel()));
}
switch (messageModel) { switch (messageModel) {
case BROADCASTING: case BROADCASTING:
consumer.setMessageModel( consumer.setMessageModel(

@ -47,6 +47,11 @@ public class RocketMQBinderConfigurationProperties {
*/ */
private String secretKey; private String secretKey;
/**
* Enum type for accessChannel, values: LOCAL, CLOUD.
*/
private String accessChannel;
/** /**
* Switch flag instance for message trace. * Switch flag instance for message trace.
*/ */
@ -82,6 +87,14 @@ public class RocketMQBinderConfigurationProperties {
this.secretKey = secretKey; this.secretKey = secretKey;
} }
public String getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(String accessChannel) {
this.accessChannel = accessChannel;
}
public boolean isEnableMsgTrace() { public boolean isEnableMsgTrace() {
return enableMsgTrace; return enableMsgTrace;
} }

@ -50,7 +50,8 @@ public class RocketMQAutoConfigurationTests {
"spring.cloud.stream.bindings.input2.content-type=application/json", "spring.cloud.stream.bindings.input2.content-type=application/json",
"spring.cloud.stream.bindings.input2.group=test-group2", "spring.cloud.stream.bindings.input2.group=test-group2",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false", "spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false",
"spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1"); "spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1",
"spring.cloud.stream.rocketmq.binder.access-channel=CLOUD");
@Test @Test
public void testProperties() { public void testProperties() {
@ -68,6 +69,7 @@ public class RocketMQAutoConfigurationTests {
.getOrderly()).isFalse(); .getOrderly()).isFalse();
assertThat(bindingProperties.getExtendedConsumerProperties("input1") assertThat(bindingProperties.getExtendedConsumerProperties("input1")
.getOrderly()).isTrue(); .getOrderly()).isTrue();
assertThat(binderConfigurationProperties.getAccessChannel()).isEqualTo("CLOUD");
}); });
} }

Loading…
Cancel
Save