From 87d0939411e284d59a64a999ab359a20b3596702 Mon Sep 17 00:00:00 2001 From: joeqiaoyao Date: Tue, 1 Jun 2021 21:06:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=85=8D=E7=BD=AE=E6=B7=BB=E5=8A=A0uni?= =?UTF-8?q?tName=EF=BC=8C=E6=94=AF=E6=8C=81=E5=90=8C=E4=B8=80=E5=BA=94?= =?UTF-8?q?=E7=94=A8=E8=BF=9E=E6=8E=A5=E5=A4=9A=E4=B8=AA=E9=9B=86=E7=BE=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../integration/inbound/RocketMQConsumerFactory.java | 2 ++ .../integration/outbound/RocketMQProduceFactory.java | 1 + .../properties/RocketMQCommonProperties.java | 12 ++++++++++++ .../stream/binder/rocketmq/utils/RocketMQUtils.java | 3 +++ 4 files changed, 18 insertions(+) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java index 7f7e1bf26..2aeb1a19f 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java @@ -89,6 +89,7 @@ public final class RocketMQConsumerFactory { consumer.setPullInterval(consumerProperties.getPush().getPullInterval()); consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency()); + consumer.setUnitName(consumerProperties.getUnitName()); return consumer; } @@ -145,6 +146,7 @@ public final class RocketMQConsumerFactory { // The internal queues are cached by a maximum of 1000 consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension() .getPull().getPullThresholdForAll()); + consumer.setUnitName(consumerProperties.getUnitName()); return consumer; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java index 463868438..509aae652 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java @@ -118,6 +118,7 @@ public final class RocketMQProduceFactory { producerProperties.getRetryAnotherBroker()); producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); producer.setUseTLS(producerProperties.getUseTLS()); + producer.setUnitName(producerProperties.getUnitName()); CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean( producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class); if (null != checkForbiddenHook) { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java index 9e9114641..4973dd759 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java @@ -56,6 +56,11 @@ public class RocketMQCommonProperties implements Serializable { private String namespace; + /** + * The property of "unitName". + */ + private String unitName; + private String accessChannel = AccessChannel.LOCAL.name(); /** @@ -199,4 +204,11 @@ public class RocketMQCommonProperties implements Serializable { this.customizedTraceTopic = customizedTraceTopic; } + public String getUnitName() { + return unitName; + } + + public void setUnitName(String unitName) { + this.unitName = unitName; + } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java index 1aed546a0..6bc969d24 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java @@ -64,6 +64,9 @@ public final class RocketMQUtils { mqProperties.setCustomizedTraceTopic( binderConfigurationProperties.getCustomizedTraceTopic()); } + if (StringUtils.isEmpty(mqProperties.getUnitName())) { + mqProperties.setUnitName(binderConfigurationProperties.getUnitName()); + } mqProperties.setNameServer(getNameServerStr(mqProperties.getNameServer())); return mqProperties; }