Reuse rocketmq producer in 2021.x (#2949)

*Reuse rocketmq producer
pull/2957/head
Bowen Li 2 years ago committed by GitHub
parent 2fe103cb00
commit b871b74718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,9 @@
package com.alibaba.cloud.stream.binder.rocketmq.integration.outbound;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
@ -24,6 +27,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerPrope
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@ -46,12 +50,14 @@ import org.springframework.util.StringUtils;
*/
public final class RocketMQProduceFactory {
private RocketMQProduceFactory() {
}
private final static Logger log = LoggerFactory
.getLogger(RocketMQProduceFactory.class);
private static final Map<String, DefaultMQProducer> PRODUCER_REUSABLE_MAP = new ConcurrentHashMap<>();
private RocketMQProduceFactory() {
}
/**
* init for the producer,including convert producer params.
* @param topic topic
@ -63,7 +69,6 @@ public final class RocketMQProduceFactory {
if (!StringUtils.hasLength(producerProperties.getGroup())) {
producerProperties.setGroup(RocketMQConst.DEFAULT_GROUP);
}
Assert.notNull(producerProperties.getNameServer(),
"Property 'nameServer' is required");
@ -99,10 +104,15 @@ public final class RocketMQProduceFactory {
}
}
else {
producer = new DefaultMQProducer(producerProperties.getNamespace(),
String key = getKey(producerProperties);
if (PRODUCER_REUSABLE_MAP.containsKey(key)) {
return PRODUCER_REUSABLE_MAP.get(key);
}
producer = new ReusableMQProducer(producerProperties.getNamespace(),
producerProperties.getGroup(), rpcHook,
producerProperties.getEnableMsgTrace(),
producerProperties.getCustomizedTraceTopic());
producerProperties.getCustomizedTraceTopic(), key);
PRODUCER_REUSABLE_MAP.put(key, producer);
}
producer.setVipChannelEnabled(
@ -137,4 +147,48 @@ public final class RocketMQProduceFactory {
return producer;
}
/**
* get the key from producerProperties.
* @param producerProperties producer properties
* @return key
*/
private static String getKey(RocketMQProducerProperties producerProperties) {
return producerProperties.getNameServer() + "," + producerProperties.getGroup()
+ producerProperties.getSendCallBack();
}
/**
* This is a special kind of MQProducer that can be reused among different threads.
* The start and shutdown method can be invoked multiple times, but the real start and
* shutdown logics will only be executed once.
*/
protected static class ReusableMQProducer extends DefaultMQProducer {
private final AtomicInteger atomicInteger = new AtomicInteger();
private final String key;
public ReusableMQProducer(String namespace, String group, RPCHook rpcHook,
boolean enableMsgTrace, String customizedTraceTopic, String key) {
super(namespace, group, rpcHook, enableMsgTrace, customizedTraceTopic);
this.key = key;
}
@Override
public void start() throws MQClientException {
if (atomicInteger.getAndIncrement() == 0) {
super.start();
}
}
@Override
public void shutdown() {
if (atomicInteger.decrementAndGet() == 0) {
PRODUCER_REUSABLE_MAP.remove(key);
super.shutdown();
}
}
}
}

@ -46,11 +46,15 @@ public class RocketMQCommonProperties implements Serializable {
/**
* Consumers of the same role is required to have exactly same subscriptions and
* consumerGroup to correctly achieve load balance. It's required and needs to be
* globally unique. Producer group conceptually aggregates all producer instances of
* globally unique.
* <br>
* Producer group conceptually aggregates all producer instances of
* exactly same role, which is particularly important when transactional messages are
* involved. For non-transactional messages, it does not matter as long as it's unique
* per process. See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a>
* for further discussion.
* for further discussion. However, group for non-transactional messages can indicate
* whether the internal RocketMQProducer should be reused (Only the bindings that use
* the same group can be reused).
*/
private String group;

Loading…
Cancel
Save