From 32e4a39294357ad7790fdf534a5b75e4cb57f873 Mon Sep 17 00:00:00 2001 From: fangjian0423 Date: Wed, 9 Jan 2019 14:23:13 +0800 Subject: [PATCH] Polish #245. Optimize error handle of sending message --- .../RocketMQInboundChannelAdapter.java | 31 +++++++++----- .../RocketMQConsumerProperties.java | 42 +++++++++++++++++++ 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java index ea88fe13b..55861e377 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java @@ -232,10 +232,8 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { RocketMQInboundChannelAdapter.this.destination) .markConsumedFailure(); }); - throw new RuntimeException( - "RocketMQ Message hasn't been processed successfully. Caused by ", - e); } + return null; } private Acknowledgement doSendMsgs(final List msgs, @@ -307,9 +305,16 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { public ConsumeConcurrentlyStatus consumeMessage(final List msgs, ConsumeConcurrentlyContext context) { Acknowledgement acknowledgement = consumeMessage(msgs); - context.setDelayLevelWhenNextConsume( - acknowledgement.getConsumeConcurrentlyDelayLevel()); - return acknowledgement.getConsumeConcurrentlyStatus(); + if (acknowledgement != null) { + context.setDelayLevelWhenNextConsume( + acknowledgement.getConsumeConcurrentlyDelayLevel()); + return acknowledgement.getConsumeConcurrentlyStatus(); + } + else { + context.setDelayLevelWhenNextConsume(consumerProperties.getExtension() + .getError().getDelayLevelWhenNextConsume()); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } } } @@ -320,11 +325,17 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport { public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { Acknowledgement acknowledgement = consumeMessage(msgs); - context.setSuspendCurrentQueueTimeMillis( - (acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill())); - return acknowledgement.getConsumeOrderlyStatus(); + if (acknowledgement != null) { + context.setSuspendCurrentQueueTimeMillis( + (acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill())); + return acknowledgement.getConsumeOrderlyStatus(); + } + else { + context.setSuspendCurrentQueueTimeMillis(consumerProperties.getExtension() + .getError().getSuspendCurrentQueueTimeMillis()); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } } - } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 6343f1535..d8e62a1cf 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -52,6 +52,40 @@ public class RocketMQConsumerProperties { private Boolean enabled = true; + private ErrorProp error; + + public static class ErrorProp { + + /** + * Reconsume later timeMillis in ConsumeOrderlyContext. + */ + private Long suspendCurrentQueueTimeMillis = 1000L; + + /** + * Message consume retry strategy in ConsumeConcurrentlyContext. + * + * -1,no retry,put into DLQ directly 0,broker control retry frequency >0,client + * control retry frequency + */ + private Integer delayLevelWhenNextConsume = 0; + + public Long getSuspendCurrentQueueTimeMillis() { + return suspendCurrentQueueTimeMillis; + } + + public void setSuspendCurrentQueueTimeMillis(Long suspendCurrentQueueTimeMillis) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + } + + public Integer getDelayLevelWhenNextConsume() { + return delayLevelWhenNextConsume; + } + + public void setDelayLevelWhenNextConsume(Integer delayLevelWhenNextConsume) { + this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + } + } + public String getTags() { return tags; } @@ -91,4 +125,12 @@ public class RocketMQConsumerProperties { public void setBroadcasting(Boolean broadcasting) { this.broadcasting = broadcasting; } + + public ErrorProp getError() { + return error; + } + + public void setError(ErrorProp error) { + this.error = error; + } }