Polish #245. Support errorChannel for producing messages

pull/258/head
fangjian0423 6 years ago
parent a322921fe1
commit bb487b312c

@ -92,6 +92,9 @@ public class RocketMQMessageChannelBinder extends
TransactionCheckListener.class));
}
if (errorChannel != null) {
messageHandler.setSendFailureChannel(errorChannel);
}
return messageHandler;
}
else {
@ -107,7 +110,7 @@ public class RocketMQMessageChannelBinder extends
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException(
"'group must be configured for channel + " + destination.getName());
"'group must be configured for channel " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(

@ -0,0 +1,47 @@
/*
* Copyright (C) 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.rocketmq.exception;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* An exception that is the payload of an {@code ErrorMessage} when occurs send failure.
*
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
* @since 0.2.2
*/
public class RocketMQSendFailureException extends MessagingException {
private final org.apache.rocketmq.common.message.Message rocketmqMsg;
public RocketMQSendFailureException(Message<?> message,
org.apache.rocketmq.common.message.Message rocketmqMsg, Throwable cause) {
super(message, cause);
this.rocketmqMsg = rocketmqMsg;
}
public org.apache.rocketmq.common.message.Message getRocketmqMsg() {
return rocketmqMsg;
}
@Override
public String toString() {
return super.toString() + " [rocketmqMsg=" + this.rocketmqMsg + "]";
}
}

@ -33,20 +33,28 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
import org.springframework.cloud.stream.binder.rocketmq.exception.RocketMQSendFailureException;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MutableMessage;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
private DefaultMQProducer producer;
private ProducerInstrumentation producerInstrumentation;
@ -57,6 +65,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private TransactionCheckListener transactionCheckListener;
private MessageChannel sendFailureChannel;
private final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties;
private final String destination;
@ -131,8 +141,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override
protected void handleMessageInternal(org.springframework.messaging.Message<?> message)
throws Exception {
Message toSend = null;
try {
Message toSend;
if (message.getPayload() instanceof byte[]) {
toSend = new Message(destination, (byte[]) message.getPayload());
}
@ -166,7 +176,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
}
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
throw new MQClientException("message hasn't been sent", null);
if (getSendFailureChannel() != null) {
this.getSendFailureChannel().send(message);
}
else {
throw new RocketMQSendFailureException(message, toSend,
new MQClientException("message hasn't been sent", null));
}
}
if (message instanceof MutableMessage) {
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
@ -184,18 +200,60 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
.ifPresent(p -> p.markSentFailure());
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
throw new MessagingException(e.getMessage(), e);
if (getSendFailureChannel() != null) {
getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage(
new RocketMQSendFailureException(message, toSend, e), null));
}
else {
throw new RocketMQSendFailureException(message, toSend, e);
}
}
}
/**
* Using in RocketMQ Transactional Mode. Set RocketMQ localTransactionExecuter in
* {@link DefaultMQProducer#sendMessageInTransaction}.
* @param localTransactionExecuter the executer running when produce msg.
*/
public void setLocalTransactionExecuter(
LocalTransactionExecuter localTransactionExecuter) {
this.localTransactionExecuter = localTransactionExecuter;
}
/**
* Using in RocketMQ Transactional Mode. Set RocketMQ transactionCheckListener in
* {@link TransactionMQProducer#setTransactionCheckListener}.
* @param transactionCheckListener the listener set in {@link TransactionMQProducer}.
*/
public void setTransactionCheckListener(
TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
/**
* Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
* to this channel with a payload of a {@link RocketMQSendFailureException} with the
* failed message and cause.
* @param sendFailureChannel the failure channel.
* @since 0.2.2
*/
public void setSendFailureChannel(MessageChannel sendFailureChannel) {
this.sendFailureChannel = sendFailureChannel;
}
/**
* Set the error message strategy implementation to use when sending error messages
* after send failures. Cannot be null.
* @param errorMessageStrategy the implementation.
* @since 0.2.2
*/
public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
this.errorMessageStrategy = errorMessageStrategy;
}
public MessageChannel getSendFailureChannel() {
return sendFailureChannel;
}
}
Loading…
Cancel
Save