diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index a6d2f3a0e..22834af2b 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.context.SmartLifecycle; +import org.springframework.messaging.Message; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -379,9 +380,7 @@ public class RocketMQListenerBindingContainer log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); - preOnMessage(messageExt); - rocketMQListener - .onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); + rocketMQListener.onMessage(convertToSpringMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime); @@ -407,9 +406,7 @@ public class RocketMQListenerBindingContainer log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); - preOnMessage(messageExt); - rocketMQListener - .onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); + rocketMQListener.onMessage(convertToSpringMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime); @@ -427,14 +424,19 @@ public class RocketMQListenerBindingContainer } /** - * pre handle message before the consumer handle message + * Convert rocketmq {@link MessageExt} to Spring {@link Message} * * @param messageExt the rocketmq message + * @return the converted Spring {@link Message} */ - private void preOnMessage(MessageExt messageExt) { + private Message convertToSpringMessage(MessageExt messageExt) { + + // add reconsume-times header to messageExt int reconsumeTimes = messageExt.getReconsumeTimes(); messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes)); + + return RocketMQUtil.convertToSpringMessage(messageExt); } }