Polish #541, enhance error handler for Polled Consumer

pull/611/head
fangjian0423 6 years ago
parent 52da6f43c4
commit ad944513b7

@ -43,9 +43,13 @@ import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProdu
import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -218,6 +222,28 @@ public class RocketMQMessageChannelBinder extends
true));
}
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
ConsumerDestination destination, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> properties) {
return message -> {
if (message.getPayload() instanceof MessagingException) {
AcknowledgmentCallback ack = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(
((MessagingException) message.getPayload())
.getFailedMessage());
if (ack != null) {
if (properties.getExtension().shouldRequeue()) {
ack.acknowledge(Status.REQUEUE);
}
else {
ack.acknowledge(Status.REJECT);
}
}
}
};
}
@Override
public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);

@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.rocketmq.properties;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@ -51,7 +52,9 @@ public class RocketMQConsumerProperties {
private Boolean orderly = false;
/**
* for concurrently listener. message consume retry strategy
* for concurrently listener. message consume retry strategy. see
* {@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}. -1 means dlq(or
* discard, see {@link this#shouldRequeue}), others means requeue
*/
private int delayLevelWhenNextConsume = 0;
@ -141,4 +144,8 @@ public class RocketMQConsumerProperties {
public void setFromStore(boolean fromStore) {
this.fromStore = fromStore;
}
public boolean shouldRequeue() {
return delayLevelWhenNextConsume != -1;
}
}

Loading…
Cancel
Save