From b871b74718cf617d61c606990fe459a4ad21d15e Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Thu, 8 Dec 2022 19:33:54 +0800 Subject: [PATCH] Reuse rocketmq producer in 2021.x (#2949) *Reuse rocketmq producer --- .../outbound/RocketMQProduceFactory.java | 66 +++++++++++++++++-- .../properties/RocketMQCommonProperties.java | 8 ++- 2 files changed, 66 insertions(+), 8 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java index 5eabbd403..5b17ea1a0 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java @@ -17,6 +17,9 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound; import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst; import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; @@ -24,6 +27,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerPrope import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.hook.CheckForbiddenHook; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -46,12 +50,14 @@ import org.springframework.util.StringUtils; */ public final class RocketMQProduceFactory { - private RocketMQProduceFactory() { - } - private final static Logger log = LoggerFactory .getLogger(RocketMQProduceFactory.class); + private static final Map PRODUCER_REUSABLE_MAP = new ConcurrentHashMap<>(); + + private RocketMQProduceFactory() { + } + /** * init for the producer,including convert producer params. * @param topic topic @@ -63,7 +69,6 @@ public final class RocketMQProduceFactory { if (!StringUtils.hasLength(producerProperties.getGroup())) { producerProperties.setGroup(RocketMQConst.DEFAULT_GROUP); } - Assert.notNull(producerProperties.getNameServer(), "Property 'nameServer' is required"); @@ -99,10 +104,15 @@ public final class RocketMQProduceFactory { } } else { - producer = new DefaultMQProducer(producerProperties.getNamespace(), + String key = getKey(producerProperties); + if (PRODUCER_REUSABLE_MAP.containsKey(key)) { + return PRODUCER_REUSABLE_MAP.get(key); + } + producer = new ReusableMQProducer(producerProperties.getNamespace(), producerProperties.getGroup(), rpcHook, producerProperties.getEnableMsgTrace(), - producerProperties.getCustomizedTraceTopic()); + producerProperties.getCustomizedTraceTopic(), key); + PRODUCER_REUSABLE_MAP.put(key, producer); } producer.setVipChannelEnabled( @@ -137,4 +147,48 @@ public final class RocketMQProduceFactory { return producer; } + /** + * get the key from producerProperties. + * @param producerProperties producer properties + * @return key + */ + private static String getKey(RocketMQProducerProperties producerProperties) { + return producerProperties.getNameServer() + "," + producerProperties.getGroup() + + producerProperties.getSendCallBack(); + } + + /** + * This is a special kind of MQProducer that can be reused among different threads. + * The start and shutdown method can be invoked multiple times, but the real start and + * shutdown logics will only be executed once. + */ + protected static class ReusableMQProducer extends DefaultMQProducer { + + private final AtomicInteger atomicInteger = new AtomicInteger(); + + private final String key; + + public ReusableMQProducer(String namespace, String group, RPCHook rpcHook, + boolean enableMsgTrace, String customizedTraceTopic, String key) { + super(namespace, group, rpcHook, enableMsgTrace, customizedTraceTopic); + this.key = key; + } + + @Override + public void start() throws MQClientException { + if (atomicInteger.getAndIncrement() == 0) { + super.start(); + } + } + + @Override + public void shutdown() { + if (atomicInteger.decrementAndGet() == 0) { + PRODUCER_REUSABLE_MAP.remove(key); + super.shutdown(); + } + } + + } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java index 4973dd759..34405ce4c 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java @@ -46,11 +46,15 @@ public class RocketMQCommonProperties implements Serializable { /** * Consumers of the same role is required to have exactly same subscriptions and * consumerGroup to correctly achieve load balance. It's required and needs to be - * globally unique. Producer group conceptually aggregates all producer instances of + * globally unique. + *
+ * Producer group conceptually aggregates all producer instances of * exactly same role, which is particularly important when transactional messages are * involved. For non-transactional messages, it does not matter as long as it's unique * per process. See here - * for further discussion. + * for further discussion. However, group for non-transactional messages can indicate + * whether the internal RocketMQProducer should be reused (Only the bindings that use + * the same group can be reused). */ private String group;