diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index ded2be10d..1321c84ba 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -92,6 +92,9 @@ public class RocketMQMessageChannelBinder extends
TransactionCheckListener.class));
}
+ if (errorChannel != null) {
+ messageHandler.setSendFailureChannel(errorChannel);
+ }
return messageHandler;
}
else {
@@ -107,7 +110,7 @@ public class RocketMQMessageChannelBinder extends
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException(
- "'group must be configured for channel + " + destination.getName());
+ "'group must be configured for channel " + destination.getName());
}
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java
new file mode 100644
index 000000000..d8d7fc5dd
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/exception/RocketMQSendFailureException.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binder.rocketmq.exception;
+
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessagingException;
+
+/**
+ * An exception that is the payload of an {@code ErrorMessage} when occurs send failure.
+ *
+ * @author Jim
+ * @since 0.2.2
+ */
+public class RocketMQSendFailureException extends MessagingException {
+
+ private final org.apache.rocketmq.common.message.Message rocketmqMsg;
+
+ public RocketMQSendFailureException(Message> message,
+ org.apache.rocketmq.common.message.Message rocketmqMsg, Throwable cause) {
+ super(message, cause);
+ this.rocketmqMsg = rocketmqMsg;
+ }
+
+ public org.apache.rocketmq.common.message.Message getRocketmqMsg() {
+ return rocketmqMsg;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + " [rocketmqMsg=" + this.rocketmqMsg + "]";
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
index ba8054ad7..47268539e 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
@@ -33,20 +33,28 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
+import org.springframework.cloud.stream.binder.rocketmq.exception.RocketMQSendFailureException;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
+import org.springframework.integration.support.DefaultErrorMessageStrategy;
+import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MutableMessage;
+import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
+import org.springframework.messaging.support.ErrorMessage;
+import org.springframework.util.Assert;
/**
* @author Jim
*/
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
+ private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
+
private DefaultMQProducer producer;
private ProducerInstrumentation producerInstrumentation;
@@ -57,6 +65,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private TransactionCheckListener transactionCheckListener;
+ private MessageChannel sendFailureChannel;
+
private final ExtendedProducerProperties producerProperties;
private final String destination;
@@ -131,8 +141,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
@Override
protected void handleMessageInternal(org.springframework.messaging.Message> message)
throws Exception {
+ Message toSend = null;
try {
- Message toSend;
if (message.getPayload() instanceof byte[]) {
toSend = new Message(destination, (byte[]) message.getPayload());
}
@@ -166,7 +176,13 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
}
if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
- throw new MQClientException("message hasn't been sent", null);
+ if (getSendFailureChannel() != null) {
+ this.getSendFailureChannel().send(message);
+ }
+ else {
+ throw new RocketMQSendFailureException(message, toSend,
+ new MQClientException("message hasn't been sent", null));
+ }
}
if (message instanceof MutableMessage) {
RocketMQMessageHeaderAccessor.putSendResult((MutableMessage) message,
@@ -184,18 +200,60 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
.ifPresent(p -> p.markSentFailure());
logger.error(
"RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
- throw new MessagingException(e.getMessage(), e);
+ if (getSendFailureChannel() != null) {
+ getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage(
+ new RocketMQSendFailureException(message, toSend, e), null));
+ }
+ else {
+ throw new RocketMQSendFailureException(message, toSend, e);
+ }
}
}
+ /**
+ * Using in RocketMQ Transactional Mode. Set RocketMQ localTransactionExecuter in
+ * {@link DefaultMQProducer#sendMessageInTransaction}.
+ * @param localTransactionExecuter the executer running when produce msg.
+ */
public void setLocalTransactionExecuter(
LocalTransactionExecuter localTransactionExecuter) {
this.localTransactionExecuter = localTransactionExecuter;
}
+ /**
+ * Using in RocketMQ Transactional Mode. Set RocketMQ transactionCheckListener in
+ * {@link TransactionMQProducer#setTransactionCheckListener}.
+ * @param transactionCheckListener the listener set in {@link TransactionMQProducer}.
+ */
public void setTransactionCheckListener(
TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
+
+ /**
+ * Set the failure channel. After a send failure, an {@link ErrorMessage} will be sent
+ * to this channel with a payload of a {@link RocketMQSendFailureException} with the
+ * failed message and cause.
+ * @param sendFailureChannel the failure channel.
+ * @since 0.2.2
+ */
+ public void setSendFailureChannel(MessageChannel sendFailureChannel) {
+ this.sendFailureChannel = sendFailureChannel;
+ }
+
+ /**
+ * Set the error message strategy implementation to use when sending error messages
+ * after send failures. Cannot be null.
+ * @param errorMessageStrategy the implementation.
+ * @since 0.2.2
+ */
+ public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
+ Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
+ this.errorMessageStrategy = errorMessageStrategy;
+ }
+
+ public MessageChannel getSendFailureChannel() {
+ return sendFailureChannel;
+ }
}
\ No newline at end of file