Polish #245. Optimize error handle of sending message

pull/258/head
fangjian0423 6 years ago
parent 2a05e7ff5a
commit 32e4a39294

@ -232,10 +232,8 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
RocketMQInboundChannelAdapter.this.destination)
.markConsumedFailure();
});
throw new RuntimeException(
"RocketMQ Message hasn't been processed successfully. Caused by ",
e);
}
return null;
}
private Acknowledgement doSendMsgs(final List<MessageExt> msgs,
@ -307,9 +305,16 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setDelayLevelWhenNextConsume(
acknowledgement.getConsumeConcurrentlyDelayLevel());
return acknowledgement.getConsumeConcurrentlyStatus();
if (acknowledgement != null) {
context.setDelayLevelWhenNextConsume(
acknowledgement.getConsumeConcurrentlyDelayLevel());
return acknowledgement.getConsumeConcurrentlyStatus();
}
else {
context.setDelayLevelWhenNextConsume(consumerProperties.getExtension()
.getError().getDelayLevelWhenNextConsume());
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
@ -320,11 +325,17 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
Acknowledgement acknowledgement = consumeMessage(msgs);
context.setSuspendCurrentQueueTimeMillis(
(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
return acknowledgement.getConsumeOrderlyStatus();
if (acknowledgement != null) {
context.setSuspendCurrentQueueTimeMillis(
(acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
return acknowledgement.getConsumeOrderlyStatus();
}
else {
context.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension()
.getError().getSuspendCurrentQueueTimeMillis());
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
}

@ -52,6 +52,40 @@ public class RocketMQConsumerProperties {
private Boolean enabled = true;
private ErrorProp error;
public static class ErrorProp {
/**
* Reconsume later timeMillis in ConsumeOrderlyContext.
*/
private Long suspendCurrentQueueTimeMillis = 1000L;
/**
* Message consume retry strategy in ConsumeConcurrentlyContext.
*
* -1,no retry,put into DLQ directly 0,broker control retry frequency >0,client
* control retry frequency
*/
private Integer delayLevelWhenNextConsume = 0;
public Long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(Long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
public Integer getDelayLevelWhenNextConsume() {
return delayLevelWhenNextConsume;
}
public void setDelayLevelWhenNextConsume(Integer delayLevelWhenNextConsume) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
}
public String getTags() {
return tags;
}
@ -91,4 +125,12 @@ public class RocketMQConsumerProperties {
public void setBroadcasting(Boolean broadcasting) {
this.broadcasting = broadcasting;
}
public ErrorProp getError() {
return error;
}
public void setError(ErrorProp error) {
this.error = error;
}
}

Loading…
Cancel
Save