diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java index 7f7e1bf26..2aeb1a19f 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java @@ -89,6 +89,7 @@ public final class RocketMQConsumerFactory { consumer.setPullInterval(consumerProperties.getPush().getPullInterval()); consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency()); + consumer.setUnitName(consumerProperties.getUnitName()); return consumer; } @@ -145,6 +146,7 @@ public final class RocketMQConsumerFactory { // The internal queues are cached by a maximum of 1000 consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension() .getPull().getPullThresholdForAll()); + consumer.setUnitName(consumerProperties.getUnitName()); return consumer; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java index 463868438..509aae652 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java @@ -118,6 +118,7 @@ public final class RocketMQProduceFactory { producerProperties.getRetryAnotherBroker()); producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); producer.setUseTLS(producerProperties.getUseTLS()); + producer.setUnitName(producerProperties.getUnitName()); CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean( producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class); if (null != checkForbiddenHook) { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java index 9e9114641..4973dd759 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java @@ -56,6 +56,11 @@ public class RocketMQCommonProperties implements Serializable { private String namespace; + /** + * The property of "unitName". + */ + private String unitName; + private String accessChannel = AccessChannel.LOCAL.name(); /** @@ -199,4 +204,11 @@ public class RocketMQCommonProperties implements Serializable { this.customizedTraceTopic = customizedTraceTopic; } + public String getUnitName() { + return unitName; + } + + public void setUnitName(String unitName) { + this.unitName = unitName; + } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java index 55e5a5d15..b431db74f 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java @@ -64,6 +64,9 @@ public final class RocketMQUtils { mqProperties.setCustomizedTraceTopic( binderConfigurationProperties.getCustomizedTraceTopic()); } + if (StringUtils.isEmpty(mqProperties.getUnitName())) { + mqProperties.setUnitName(binderConfigurationProperties.getUnitName()); + } mqProperties.setNameServer(getNameServerStr(mqProperties.getNameServer())); return mqProperties; }