diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java index 72c974203..e2a527dde 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java @@ -16,8 +16,11 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX; + /** * @author Jim + * @author Xiejiashuai */ public interface RocketMQBinderConstants { @@ -31,6 +34,11 @@ public interface RocketMQBinderConstants { */ String DEFAULT_NAME_SERVER = "127.0.0.1:9876"; - String DEFAULT_GROUP = "rocketmq_binder_default_group_name"; + String DEFAULT_GROUP = PREFIX + "binder_default_group_name"; + + /** + * RocketMQ re-consume times + */ + String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES"; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 1a11324f8..5bf540e07 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -16,6 +16,8 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming; +import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES; + import java.util.List; import java.util.Objects; @@ -23,12 +25,7 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; @@ -46,6 +43,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.context.SmartLifecycle; +import org.springframework.messaging.Message; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -54,7 +52,13 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigu import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; /** + * A class that Listen on rocketmq message + *

+ * this class will delegate {@link RocketMQListener} to handle message + * * @author Jim + * @author Xiejiashuai + * @see RocketMQListener */ public class RocketMQListenerBindingContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { @@ -368,7 +372,7 @@ public class RocketMQListenerBindingContainer public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "Duplicates" }) @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -376,10 +380,10 @@ public class RocketMQListenerBindingContainer log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); - rocketMQListener - .onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); + rocketMQListener.onMessage(convertToSpringMessage(messageExt)); long costTime = System.currentTimeMillis() - now; - log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + log.debug("consume {} message key:[{}] cost: {} ms", + messageExt.getMsgId(), messageExt.getKeys(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); @@ -394,7 +398,7 @@ public class RocketMQListenerBindingContainer public class DefaultMessageListenerOrderly implements MessageListenerOrderly { - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "Duplicates" }) @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { @@ -402,10 +406,10 @@ public class RocketMQListenerBindingContainer log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); - rocketMQListener - .onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); + rocketMQListener.onMessage(convertToSpringMessage(messageExt)); long costTime = System.currentTimeMillis() - now; - log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + log.info("consume {} message key:[{}] cost: {} ms", + messageExt.getMsgId(), messageExt.getKeys(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); @@ -419,4 +423,20 @@ public class RocketMQListenerBindingContainer } } + /** + * Convert rocketmq {@link MessageExt} to Spring {@link Message} + * + * @param messageExt the rocketmq message + * @return the converted Spring {@link Message} + */ + private Message convertToSpringMessage(MessageExt messageExt) { + + // add reconsume-times header to messageExt + int reconsumeTimes = messageExt.getReconsumeTimes(); + messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, + String.valueOf(reconsumeTimes)); + + return RocketMQUtil.convertToSpringMessage(messageExt); + } + }