Merge pull request #2103 from joeqiaoyao/rocketmq

feat: 配置添加unitName,支持同一应用连接多个集群
pull/2302/head
TheoneFx 4 years ago committed by GitHub
commit 72e8a3bb36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -89,6 +89,7 @@ public final class RocketMQConsumerFactory {
consumer.setPullInterval(consumerProperties.getPush().getPullInterval()); consumer.setPullInterval(consumerProperties.getPush().getPullInterval());
consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency());
consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency());
consumer.setUnitName(consumerProperties.getUnitName());
return consumer; return consumer;
} }
@ -145,6 +146,7 @@ public final class RocketMQConsumerFactory {
// The internal queues are cached by a maximum of 1000 // The internal queues are cached by a maximum of 1000
consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension() consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension()
.getPull().getPullThresholdForAll()); .getPull().getPullThresholdForAll());
consumer.setUnitName(consumerProperties.getUnitName());
return consumer; return consumer;
} }

@ -118,6 +118,7 @@ public final class RocketMQProduceFactory {
producerProperties.getRetryAnotherBroker()); producerProperties.getRetryAnotherBroker());
producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
producer.setUseTLS(producerProperties.getUseTLS()); producer.setUseTLS(producerProperties.getUseTLS());
producer.setUnitName(producerProperties.getUnitName());
CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean( CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean(
producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class); producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class);
if (null != checkForbiddenHook) { if (null != checkForbiddenHook) {

@ -56,6 +56,11 @@ public class RocketMQCommonProperties implements Serializable {
private String namespace; private String namespace;
/**
* The property of "unitName".
*/
private String unitName;
private String accessChannel = AccessChannel.LOCAL.name(); private String accessChannel = AccessChannel.LOCAL.name();
/** /**
@ -199,4 +204,11 @@ public class RocketMQCommonProperties implements Serializable {
this.customizedTraceTopic = customizedTraceTopic; this.customizedTraceTopic = customizedTraceTopic;
} }
public String getUnitName() {
return unitName;
}
public void setUnitName(String unitName) {
this.unitName = unitName;
}
} }

@ -64,6 +64,9 @@ public final class RocketMQUtils {
mqProperties.setCustomizedTraceTopic( mqProperties.setCustomizedTraceTopic(
binderConfigurationProperties.getCustomizedTraceTopic()); binderConfigurationProperties.getCustomizedTraceTopic());
} }
if (StringUtils.isEmpty(mqProperties.getUnitName())) {
mqProperties.setUnitName(binderConfigurationProperties.getUnitName());
}
mqProperties.setNameServer(getNameServerStr(mqProperties.getNameServer())); mqProperties.setNameServer(getNameServerStr(mqProperties.getNameServer()));
return mqProperties; return mqProperties;
} }

Loading…
Cancel
Save