add reconsume times header

pull/815/head
xiejiashuai 6 years ago
parent 8832644850
commit aa4b8790e2

@ -16,21 +16,29 @@
package com.alibaba.cloud.stream.binder.rocketmq; package com.alibaba.cloud.stream.binder.rocketmq;
import static org.apache.rocketmq.spring.support.RocketMQHeaders.PREFIX;
/** /**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a>
*/ */
public interface RocketMQBinderConstants { public interface RocketMQBinderConstants {
/** /**
* Header key * Header key
*/ */
String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG"; String ROCKET_TRANSACTIONAL_ARG = "TRANSACTIONAL_ARG";
/**
* Default value
*/
String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
/** String DEFAULT_GROUP = PREFIX + "binder_default_group_name";
* Default value
*/
String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
String DEFAULT_GROUP = "rocketmq_binder_default_group_name"; /**
* RocketMQ re-consume times
*/
String ROCKETMQ_RECONSUME_TIMES = PREFIX + "RECONSUME_TIMES";
} }

@ -16,19 +16,14 @@
package com.alibaba.cloud.stream.binder.rocketmq.consuming; package com.alibaba.cloud.stream.binder.rocketmq.consuming;
import java.util.List; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import java.util.Objects; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
@ -49,374 +44,388 @@ import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import java.util.Objects;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
/** /**
* A class that Listen on rocketmq message
* <p>
* this class will delegate {@link RocketMQListener} to handle message
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @author <a href="mailto:jiashuai.xie01@gmail.com">Xiejiashuai</a>
* @see RocketMQListener
*/ */
public class RocketMQListenerBindingContainer public class RocketMQListenerBindingContainer
implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
private final static Logger log = LoggerFactory private final static Logger log = LoggerFactory
.getLogger(RocketMQListenerBindingContainer.class); .getLogger(RocketMQListenerBindingContainer.class);
private long suspendCurrentQueueTimeMillis = 1000; private long suspendCurrentQueueTimeMillis = 1000;
/** /**
* Message consume retry strategy<br> * Message consume retry strategy<br>
* -1,no retry,put into DLQ directly<br> * -1,no retry,put into DLQ directly<br>
* 0,broker control retry frequency<br> * 0,broker control retry frequency<br>
* >0,client control retry frequency. * >0,client control retry frequency.
*/ */
private int delayLevelWhenNextConsume = 0; private int delayLevelWhenNextConsume = 0;
private String nameServer; private String nameServer;
private String consumerGroup; private String consumerGroup;
private String topic; private String topic;
private int consumeThreadMax = 64; private int consumeThreadMax = 64;
private String charset = "UTF-8"; private String charset = "UTF-8";
private RocketMQListener rocketMQListener; private RocketMQListener rocketMQListener;
private DefaultMQPushConsumer consumer; private DefaultMQPushConsumer consumer;
private boolean running; private boolean running;
private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties; private final ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties;
private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder; private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
// The following properties came from RocketMQConsumerProperties. // The following properties came from RocketMQConsumerProperties.
private ConsumeMode consumeMode; private ConsumeMode consumeMode;
private SelectorType selectorType; private SelectorType selectorType;
private String selectorExpression; private String selectorExpression;
private MessageModel messageModel; private MessageModel messageModel;
public RocketMQListenerBindingContainer( public RocketMQListenerBindingContainer(
ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties, ExtendedConsumerProperties<RocketMQConsumerProperties> rocketMQConsumerProperties,
RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties, RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
RocketMQMessageChannelBinder rocketMQMessageChannelBinder) { RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
this.rocketMQConsumerProperties = rocketMQConsumerProperties; this.rocketMQConsumerProperties = rocketMQConsumerProperties;
this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties; this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder; this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly() this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
? ConsumeMode.ORDERLY ? ConsumeMode.ORDERLY
: ConsumeMode.CONCURRENTLY; : ConsumeMode.CONCURRENTLY;
if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) { if (StringUtils.isEmpty(rocketMQConsumerProperties.getExtension().getSql())) {
this.selectorType = SelectorType.TAG; this.selectorType = SelectorType.TAG;
this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags(); this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
} } else {
else { this.selectorType = SelectorType.SQL92;
this.selectorType = SelectorType.SQL92; this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql(); }
} this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting() ? MessageModel.BROADCASTING
? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
: MessageModel.CLUSTERING; }
}
@Override
@Override public void setupMessageListener(RocketMQListener<?> rocketMQListener) {
public void setupMessageListener(RocketMQListener<?> rocketMQListener) { this.rocketMQListener = rocketMQListener;
this.rocketMQListener = rocketMQListener; }
}
@Override
@Override public void destroy() throws Exception {
public void destroy() throws Exception { this.setRunning(false);
this.setRunning(false); if (Objects.nonNull(consumer)) {
if (Objects.nonNull(consumer)) { consumer.shutdown();
consumer.shutdown(); }
} log.info("container destroyed, {}", this.toString());
log.info("container destroyed, {}", this.toString()); }
}
@Override
@Override public void afterPropertiesSet() throws Exception {
public void afterPropertiesSet() throws Exception { initRocketMQPushConsumer();
initRocketMQPushConsumer(); }
}
@Override
@Override public boolean isAutoStartup() {
public boolean isAutoStartup() { return true;
return true; }
}
@Override
@Override public void stop(Runnable callback) {
public void stop(Runnable callback) { stop();
stop(); callback.run();
callback.run(); }
}
@Override
@Override public void start() {
public void start() { if (this.isRunning()) {
if (this.isRunning()) { throw new IllegalStateException(
throw new IllegalStateException( "container already running. " + this.toString());
"container already running. " + this.toString()); }
}
try {
try { consumer.start();
consumer.start(); } catch (MQClientException e) {
} throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
catch (MQClientException e) { }
throw new IllegalStateException("Failed to start RocketMQ push consumer", e); this.setRunning(true);
}
this.setRunning(true); log.info("running container: {}", this.toString());
}
log.info("running container: {}", this.toString());
} @Override
public void stop() {
@Override if (this.isRunning()) {
public void stop() { if (Objects.nonNull(consumer)) {
if (this.isRunning()) { consumer.shutdown();
if (Objects.nonNull(consumer)) { }
consumer.shutdown(); setRunning(false);
} }
setRunning(false); }
}
} @Override
public boolean isRunning() {
@Override return running;
public boolean isRunning() { }
return running;
} private void setRunning(boolean running) {
this.running = running;
private void setRunning(boolean running) { }
this.running = running;
} @Override
public int getPhase() {
@Override return Integer.MAX_VALUE;
public int getPhase() { }
return Integer.MAX_VALUE;
} private void initRocketMQPushConsumer() throws MQClientException {
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
private void initRocketMQPushConsumer() throws MQClientException { Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(topic, "Property 'topic' is required");
Assert.notNull(nameServer, "Property 'nameServer' is required");
Assert.notNull(topic, "Property 'topic' is required"); String ak = rocketBinderConfigurationProperties.getAccessKey();
String sk = rocketBinderConfigurationProperties.getSecretKey();
String ak = rocketBinderConfigurationProperties.getAccessKey(); if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
String sk = rocketBinderConfigurationProperties.getSecretKey(); RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk)); new AllocateMessageQueueAveragely(),
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, rocketBinderConfigurationProperties.isEnableMsgTrace(),
new AllocateMessageQueueAveragely(), rocketBinderConfigurationProperties.getCustomizedTraceTopic());
rocketBinderConfigurationProperties.isEnableMsgTrace(), consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
rocketBinderConfigurationProperties.getCustomizedTraceTopic()); topic + "|" + UtilAll.getPid()));
consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumer.setVipChannelEnabled(false);
topic + "|" + UtilAll.getPid())); } else {
consumer.setVipChannelEnabled(false); consumer = new DefaultMQPushConsumer(consumerGroup,
} rocketBinderConfigurationProperties.isEnableMsgTrace(),
else { rocketBinderConfigurationProperties.getCustomizedTraceTopic());
consumer = new DefaultMQPushConsumer(consumerGroup, }
rocketBinderConfigurationProperties.isEnableMsgTrace(),
rocketBinderConfigurationProperties.getCustomizedTraceTopic()); consumer.setNamesrvAddr(nameServer);
} consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); switch (messageModel) {
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); case BROADCASTING:
consumer.setMessageModel(
switch (messageModel) { org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
case BROADCASTING: break;
consumer.setMessageModel( case CLUSTERING:
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); consumer.setMessageModel(
break; org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
case CLUSTERING: break;
consumer.setMessageModel( default:
org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); throw new IllegalArgumentException("Property 'messageModel' was wrong.");
break; }
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong."); switch (selectorType) {
} case TAG:
consumer.subscribe(topic, selectorExpression);
switch (selectorType) { break;
case TAG: case SQL92:
consumer.subscribe(topic, selectorExpression); consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
break; break;
case SQL92: default:
consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); throw new IllegalArgumentException("Property 'selectorType' was wrong.");
break; }
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong."); switch (consumeMode) {
} case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
switch (consumeMode) { break;
case ORDERLY: case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly()); consumer.setMessageListener(new DefaultMessageListenerConcurrently());
break; break;
case CONCURRENTLY: default:
consumer.setMessageListener(new DefaultMessageListenerConcurrently()); throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
break; }
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong."); if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
} ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
.prepareStart(consumer);
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { }
((RocketMQPushConsumerLifecycleListener) rocketMQListener)
.prepareStart(consumer); }
}
@Override
} public String toString() {
return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
@Override + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
public String toString() { + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
+ '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + messageModel + '}';
+ ", consumeMode=" + consumeMode + ", selectorType=" + selectorType }
+ ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
+ messageModel + '}'; public long getSuspendCurrentQueueTimeMillis() {
} return suspendCurrentQueueTimeMillis;
}
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis; public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
} this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; public int getDelayLevelWhenNextConsume() {
} return delayLevelWhenNextConsume;
}
public int getDelayLevelWhenNextConsume() {
return delayLevelWhenNextConsume; public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
} this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; public String getNameServer() {
} return nameServer;
}
public String getNameServer() {
return nameServer; public void setNameServer(String nameServer) {
} this.nameServer = nameServer;
}
public void setNameServer(String nameServer) {
this.nameServer = nameServer; public String getConsumerGroup() {
} return consumerGroup;
}
public String getConsumerGroup() {
return consumerGroup; public void setConsumerGroup(String consumerGroup) {
} this.consumerGroup = consumerGroup;
}
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup; public String getTopic() {
} return topic;
}
public String getTopic() {
return topic; public void setTopic(String topic) {
} this.topic = topic;
}
public void setTopic(String topic) {
this.topic = topic; public int getConsumeThreadMax() {
} return consumeThreadMax;
}
public int getConsumeThreadMax() {
return consumeThreadMax; public void setConsumeThreadMax(int consumeThreadMax) {
} this.consumeThreadMax = consumeThreadMax;
}
public void setConsumeThreadMax(int consumeThreadMax) {
this.consumeThreadMax = consumeThreadMax; public String getCharset() {
} return charset;
}
public String getCharset() {
return charset; public void setCharset(String charset) {
} this.charset = charset;
}
public void setCharset(String charset) {
this.charset = charset; public RocketMQListener getRocketMQListener() {
} return rocketMQListener;
}
public RocketMQListener getRocketMQListener() {
return rocketMQListener; public void setRocketMQListener(RocketMQListener rocketMQListener) {
} this.rocketMQListener = rocketMQListener;
}
public void setRocketMQListener(RocketMQListener rocketMQListener) {
this.rocketMQListener = rocketMQListener; public DefaultMQPushConsumer getConsumer() {
} return consumer;
}
public DefaultMQPushConsumer getConsumer() {
return consumer; public void setConsumer(DefaultMQPushConsumer consumer) {
} this.consumer = consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer; public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() {
} return rocketMQConsumerProperties;
}
public ExtendedConsumerProperties<RocketMQConsumerProperties> getRocketMQConsumerProperties() {
return rocketMQConsumerProperties; public ConsumeMode getConsumeMode() {
} return consumeMode;
}
public ConsumeMode getConsumeMode() {
return consumeMode; public SelectorType getSelectorType() {
} return selectorType;
}
public SelectorType getSelectorType() {
return selectorType; public String getSelectorExpression() {
} return selectorExpression;
}
public String getSelectorExpression() {
return selectorExpression; public MessageModel getMessageModel() {
} return messageModel;
}
public MessageModel getMessageModel() {
return messageModel; public class DefaultMessageListenerConcurrently
} implements MessageListenerConcurrently {
public class DefaultMessageListenerConcurrently @SuppressWarnings({"unchecked", "Duplicates"})
implements MessageListenerConcurrently { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
@SuppressWarnings("unchecked") ConsumeConcurrentlyContext context) {
@Override for (MessageExt messageExt : msgs) {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, log.debug("received msg: {}", messageExt);
ConsumeConcurrentlyContext context) { try {
for (MessageExt messageExt : msgs) { long now = System.currentTimeMillis();
log.debug("received msg: {}", messageExt); preOnMessage(messageExt);
try { rocketMQListener
long now = System.currentTimeMillis(); .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
rocketMQListener long costTime = System.currentTimeMillis() - now;
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime);
long costTime = System.currentTimeMillis() - now; } catch (Exception e) {
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); log.warn("consume message failed. messageExt:{}", messageExt, e);
} context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER;
log.warn("consume message failed. messageExt:{}", messageExt, e); }
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); }
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} }
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
}
@SuppressWarnings({"unchecked", "Duplicates"})
public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
@SuppressWarnings("unchecked") ConsumeOrderlyContext context) {
@Override for (MessageExt messageExt : msgs) {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, log.debug("received msg: {}", messageExt);
ConsumeOrderlyContext context) { try {
for (MessageExt messageExt : msgs) { long now = System.currentTimeMillis();
log.debug("received msg: {}", messageExt); preOnMessage(messageExt);
try { rocketMQListener
long now = System.currentTimeMillis(); .onMessage(RocketMQUtil.convertToSpringMessage(messageExt));
rocketMQListener long costTime = System.currentTimeMillis() - now;
.onMessage(RocketMQUtil.convertToSpringMessage(messageExt)); log.debug("consume {} message key:[{}] cost: {} ms", messageExt.getMsgId(), messageExt.getKeys(), costTime);
long costTime = System.currentTimeMillis() - now; } catch (Exception e) {
log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); log.warn("consume message failed. messageExt:{}", messageExt, e);
} context.setSuspendCurrentQueueTimeMillis(
catch (Exception e) { suspendCurrentQueueTimeMillis);
log.warn("consume message failed. messageExt:{}", messageExt, e); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
context.setSuspendCurrentQueueTimeMillis( }
suspendCurrentQueueTimeMillis); }
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
} return ConsumeOrderlyStatus.SUCCESS;
} }
}
return ConsumeOrderlyStatus.SUCCESS;
} /**
} * pre handle message before the consumer handle message
*
* @param messageExt the rocketmq message
*/
private void preOnMessage(MessageExt messageExt) {
int reconsumeTimes = messageExt.getReconsumeTimes();
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes));
}
} }

Loading…
Cancel
Save