|
|
|
@ -25,6 +25,7 @@ import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
|
|
|
|
|
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
|
|
|
|
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
|
|
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
|
|
|
|
|
import org.apache.rocketmq.common.protocol.NamespaceUtil;
|
|
|
|
|
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
|
|
|
|
import org.apache.rocketmq.remoting.RPCHook;
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
@ -97,15 +98,28 @@ public final class RocketMQConsumerFactory {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* todo Compatible with versions less than 4.6 ?
|
|
|
|
|
* @param topic
|
|
|
|
|
* @param extendedConsumerProperties extendedConsumerProperties
|
|
|
|
|
* @return DefaultLitePullConsumer
|
|
|
|
|
*/
|
|
|
|
|
public static DefaultLitePullConsumer initPullConsumer(
|
|
|
|
|
String topic,
|
|
|
|
|
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
|
|
|
|
|
RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
|
|
|
|
|
.getExtension();
|
|
|
|
|
Assert.notNull(consumerProperties.getGroup(),
|
|
|
|
|
"Property 'group' is required - consumerGroup");
|
|
|
|
|
boolean anonymous = !StringUtils.hasLength(consumerProperties.getGroup());
|
|
|
|
|
/***
|
|
|
|
|
* When using DLQ, at least the group property must be provided for proper naming of the DLQ destination
|
|
|
|
|
* According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference
|
|
|
|
|
*/
|
|
|
|
|
if (anonymous && NamespaceUtil.isDLQTopic(topic)) {
|
|
|
|
|
throw new RuntimeException(
|
|
|
|
|
"group must be configured for DLQ" + topic);
|
|
|
|
|
}
|
|
|
|
|
if (anonymous) {
|
|
|
|
|
consumerProperties.setGroup(RocketMQUtils.anonymousGroup(topic));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Assert.notNull(consumerProperties.getNameServer(),
|
|
|
|
|
"Property 'nameServer' is required");
|
|
|
|
|
AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
|
|
|
|
|