extract method to convert message

pull/815/head
xiejiashuai 6 years ago
parent a1a215f6e7
commit 5bbbad0054

@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -379,9 +380,7 @@ public class RocketMQListenerBindingContainer
log.debug("received msg: {}", messageExt); log.debug("received msg: {}", messageExt);
try { try {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
preOnMessage(messageExt); rocketMQListener.onMessage(convertToSpringMessage(messageExt));
rocketMQListener
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
long costTime = System.currentTimeMillis() - now; long costTime = System.currentTimeMillis() - now;
log.debug("consume {} message key:[{}] cost: {} ms", log.debug("consume {} message key:[{}] cost: {} ms",
messageExt.getMsgId(), messageExt.getKeys(), costTime); messageExt.getMsgId(), messageExt.getKeys(), costTime);
@ -407,9 +406,7 @@ public class RocketMQListenerBindingContainer
log.debug("received msg: {}", messageExt); log.debug("received msg: {}", messageExt);
try { try {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
preOnMessage(messageExt); rocketMQListener.onMessage(convertToSpringMessage(messageExt));
rocketMQListener
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
long costTime = System.currentTimeMillis() - now; long costTime = System.currentTimeMillis() - now;
log.debug("consume {} message key:[{}] cost: {} ms", log.debug("consume {} message key:[{}] cost: {} ms",
messageExt.getMsgId(), messageExt.getKeys(), costTime); messageExt.getMsgId(), messageExt.getKeys(), costTime);
@ -427,14 +424,19 @@ public class RocketMQListenerBindingContainer
} }
/** /**
* pre handle message before the consumer handle message * Convert rocketmq {@link MessageExt} to Spring {@link Message}
* *
* @param messageExt the rocketmq message * @param messageExt the rocketmq message
* @return the converted Spring {@link Message}
*/ */
private void preOnMessage(MessageExt messageExt) { private Message convertToSpringMessage(MessageExt messageExt) {
// add reconsume-times header to messageExt
int reconsumeTimes = messageExt.getReconsumeTimes(); int reconsumeTimes = messageExt.getReconsumeTimes();
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
String.valueOf(reconsumeTimes)); String.valueOf(reconsumeTimes));
return RocketMQUtil.convertToSpringMessage(messageExt);
} }
} }

Loading…
Cancel
Save