diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 38776db54..088ee58ca 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -34,6 +34,7 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; +import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; @@ -83,7 +84,7 @@ public class RocketMQMessageChannelBinder extends MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { - // if producerGroup is empty, using destination + // if producerGroup is empty, using destination String extendedProducerGroup = producerProperties.getExtension().getGroup(); String producerGroup = StringUtils.isEmpty(extendedProducerGroup) ? destination.getName() @@ -206,6 +207,17 @@ public class RocketMQMessageChannelBinder extends return rocketInboundChannelAdapter; } + @Override + protected PolledConsumerResources createPolledConsumerResources(String name, + String group, ConsumerDestination destination, + ExtendedConsumerProperties consumerProperties) { + RocketMQMessageSource rocketMQMessageSource = new RocketMQMessageSource( + rocketBinderConfigurationProperties, consumerProperties, name, group); + return new PolledConsumerResources(rocketMQMessageSource, + registerErrorInfrastructure(destination, group, consumerProperties, + true)); + } + @Override public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) { return extendedBindingProperties.getExtendedConsumerProperties(channelName);