|
|
|
@ -16,6 +16,9 @@
|
|
|
|
|
|
|
|
|
|
package com.alibaba.cloud.stream.binder.rocketmq;
|
|
|
|
|
|
|
|
|
|
import java.util.UUID;
|
|
|
|
|
|
|
|
|
|
import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
|
|
|
|
|
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
|
|
|
|
|
import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
|
|
|
|
|
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter;
|
|
|
|
@ -28,6 +31,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindi
|
|
|
|
|
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
|
|
|
|
|
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
|
|
|
|
|
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
|
|
|
|
|
import org.apache.rocketmq.common.protocol.NamespaceUtil;
|
|
|
|
|
|
|
|
|
|
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
|
|
|
|
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
|
|
|
@ -48,6 +52,7 @@ import org.springframework.messaging.MessageHandler;
|
|
|
|
|
import org.springframework.messaging.MessagingException;
|
|
|
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A {@link org.springframework.cloud.stream.binder.Binder} that uses RocketMQ as the
|
|
|
|
|
* underlying middleware.
|
|
|
|
@ -113,11 +118,17 @@ public class RocketMQMessageChannelBinder extends
|
|
|
|
|
String group,
|
|
|
|
|
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties)
|
|
|
|
|
throws Exception {
|
|
|
|
|
// todo support anymous consumer
|
|
|
|
|
if (!StringUtils.hasLength(group)) {
|
|
|
|
|
boolean anonymous = !StringUtils.hasLength(group);
|
|
|
|
|
/***
|
|
|
|
|
* When using DLQ, at least the group property must be provided for proper naming of the DLQ destination
|
|
|
|
|
* According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference
|
|
|
|
|
*/
|
|
|
|
|
if (anonymous && NamespaceUtil.isDLQTopic(destination.getName())) {
|
|
|
|
|
throw new RuntimeException(
|
|
|
|
|
"'group must be configured for channel " + destination.getName());
|
|
|
|
|
"group must be configured for DLQ" + destination.getName());
|
|
|
|
|
}
|
|
|
|
|
group = anonymous ? nextDefaultConsumerGroup() : group;
|
|
|
|
|
|
|
|
|
|
RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties,
|
|
|
|
|
extendedConsumerProperties.getExtension());
|
|
|
|
|
extendedConsumerProperties.getExtension().setGroup(group);
|
|
|
|
@ -173,6 +184,14 @@ public class RocketMQMessageChannelBinder extends
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* generate next default consumer group.
|
|
|
|
|
* @return next default consumer group name.
|
|
|
|
|
*/
|
|
|
|
|
private static String nextDefaultConsumerGroup() {
|
|
|
|
|
return RocketMQConst.DEFAULT_GROUP + UUID.randomUUID().toString();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Binders can return an {@link ErrorMessageStrategy} for building error messages;
|
|
|
|
|
* binder implementations typically might add extra headers to the error message.
|
|
|
|
@ -203,5 +222,4 @@ public class RocketMQMessageChannelBinder extends
|
|
|
|
|
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
|
|
|
|
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|