diff --git a/pom.xml b/pom.xml
index dbeaab9d7..0262e51ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@
spring-cloud-alibaba-sentinel-datasource
spring-cloud-alibaba-nacos-config
spring-cloud-alibaba-nacos-discovery
+ spring-cloud-stream-binder-rocketmq
spring-cloud-alibaba-examples
spring-cloud-alibaba-test
spring-cloud-alibaba-docs
diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml
index 33b5ad6af..0bd867eb2 100644
--- a/spring-cloud-alibaba-dependencies/pom.xml
+++ b/spring-cloud-alibaba-dependencies/pom.xml
@@ -24,6 +24,8 @@
4.0.1
1.0.0
2.16.0
+ 4.3.1
+ 4.0.3
@@ -115,6 +117,11 @@
sentinel-dubbo-api
${project.version}
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${rocketmq.version}
+
@@ -202,6 +209,14 @@
spring-cloud-starter-alicloud-acm
${project.version}
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+ ${metrics.core}
+
+
diff --git a/spring-cloud-stream-binder-rocketmq/pom.xml b/spring-cloud-stream-binder-rocketmq/pom.xml
new file mode 100644
index 000000000..9ff00a155
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/pom.xml
@@ -0,0 +1,77 @@
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-alibaba
+ 0.2.1.BUILD-SNAPSHOT
+
+ 4.0.0
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rocketmq
+ Spring Cloud Alibaba RocketMQ Binder
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+
+ org.springframework.boot
+ spring-boot-configuration-processor
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-autoconfigure
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-actuator
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-actuator-autoconfigure
+ provided
+ true
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
+
+
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
new file mode 100644
index 000000000..f03fc48eb
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
@@ -0,0 +1,26 @@
+package org.springframework.cloud.stream.binder.rocketmq;
+
+/**
+ * @author Jim
+ */
+public interface RocketMQBinderConstants {
+
+ /**
+ * Header key
+ */
+ String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE";
+
+ String ROCKET_FLAG = "ROCKET_FLAG";
+
+ String ROCKET_SEND_RESULT = "ROCKET_SEND_RESULT";
+
+ String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
+
+ /**
+ * Instrumentation key
+ */
+ String LASTSEND_TIMESTAMP = "lastSend.timestamp";
+
+ String ENDPOINT_ID = "rocketmq-binder";
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
new file mode 100644
index 000000000..da5cf1c75
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -0,0 +1,101 @@
+package org.springframework.cloud.stream.binder.rocketmq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
+import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
+import org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
+import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+import org.springframework.cloud.stream.provisioning.ProducerDestination;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQMessageChannelBinder extends
+ AbstractMessageChannelBinder,
+ ExtendedProducerProperties, RocketMQTopicProvisioner>
+ implements ExtendedPropertiesBinder {
+
+ private static final Logger logger = LoggerFactory.getLogger(RocketMQMessageChannelBinder.class);
+
+ private final RocketMQExtendedBindingProperties extendedBindingProperties;
+ private final RocketMQTopicProvisioner rocketTopicProvisioner;
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+ private final InstrumentationManager instrumentationManager;
+ private final ConsumersManager consumersManager;
+
+ public RocketMQMessageChannelBinder(ConsumersManager consumersManager,
+ RocketMQExtendedBindingProperties extendedBindingProperties,
+ RocketMQTopicProvisioner provisioningProvider,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
+ InstrumentationManager instrumentationManager) {
+ super(null, provisioningProvider);
+ this.consumersManager = consumersManager;
+ this.extendedBindingProperties = extendedBindingProperties;
+ this.rocketTopicProvisioner = provisioningProvider;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ this.instrumentationManager = instrumentationManager;
+ }
+
+ @Override
+ protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
+ ExtendedProducerProperties
+ producerProperties,
+ MessageChannel errorChannel) throws Exception {
+ if (producerProperties.getExtension().getEnabled()) {
+ return new RocketMQMessageHandler(destination.getName(), producerProperties.getExtension(),
+ rocketBinderConfigurationProperties, instrumentationManager);
+ } else {
+ throw new RuntimeException(
+ "Binding for channel " + destination.getName() + "has been disabled, message can't be delivered");
+ }
+ }
+
+ @Override
+ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
+ ExtendedConsumerProperties
+ consumerProperties)
+ throws Exception {
+ if (group == null || "".equals(group)) {
+ throw new RuntimeException("'group' must be configured for channel + " + destination.getName());
+ }
+
+ RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(consumersManager,
+ consumerProperties, destination.getName(), group, instrumentationManager);
+
+ ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, group,
+ consumerProperties);
+ if (consumerProperties.getMaxAttempts() > 1) {
+ rocketInboundChannelAdapter.setRetryTemplate(buildRetryTemplate(consumerProperties));
+ rocketInboundChannelAdapter.setRecoveryCallback(errorInfrastructure.getRecoverer());
+ } else {
+ rocketInboundChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
+ }
+
+ return rocketInboundChannelAdapter;
+ }
+
+ @Override
+ public RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
+ return extendedBindingProperties.getExtendedConsumerProperties(channelName);
+ }
+
+ @Override
+ public RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
+ return extendedBindingProperties.getExtendedProducerProperties(channelName);
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
new file mode 100644
index 000000000..480ec1941
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQMessageHeaderAccessor.java
@@ -0,0 +1,106 @@
+package org.springframework.cloud.stream.binder.rocketmq;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.Acknowledgement;
+import org.springframework.integration.support.MutableMessage;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageHeaderAccessor;
+
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ACKNOWLEDGEMENT_KEY;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ORIGINAL_ROCKET_MESSAGE;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_FLAG;
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKET_SEND_RESULT;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQMessageHeaderAccessor extends MessageHeaderAccessor {
+
+ public RocketMQMessageHeaderAccessor() {
+ super();
+ }
+
+ public RocketMQMessageHeaderAccessor(Message> message) {
+ super(message);
+ }
+
+ public Acknowledgement getAcknowledgement(Message message) {
+ return message.getHeaders().get(ACKNOWLEDGEMENT_KEY, Acknowledgement.class);
+ }
+
+ public RocketMQMessageHeaderAccessor withAcknowledgment(Acknowledgement acknowledgment) {
+ setHeader(ACKNOWLEDGEMENT_KEY, acknowledgment);
+ return this;
+ }
+
+ public String getTags() {
+ return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_TAGS, "");
+ }
+
+ public RocketMQMessageHeaderAccessor withTags(String tag) {
+ setHeader(MessageConst.PROPERTY_TAGS, tag);
+ return this;
+ }
+
+ public String getKeys() {
+ return (String)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_KEYS, "");
+ }
+
+ public RocketMQMessageHeaderAccessor withKeys(String keys) {
+ setHeader(MessageConst.PROPERTY_KEYS, keys);
+ return this;
+ }
+
+ public MessageExt getRocketMessage() {
+ return getMessageHeaders().get(ORIGINAL_ROCKET_MESSAGE, MessageExt.class);
+ }
+
+ public RocketMQMessageHeaderAccessor withRocketMessage(MessageExt message) {
+ setHeader(ORIGINAL_ROCKET_MESSAGE, message);
+ return this;
+ }
+
+ public Integer getDelayTimeLevel() {
+ return (Integer)getMessageHeaders().getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
+ }
+
+ public RocketMQMessageHeaderAccessor withDelayTimeLevel(Integer delayTimeLevel) {
+ setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayTimeLevel);
+ return this;
+ }
+
+ public Integer getFlag() {
+ return (Integer)getMessageHeaders().getOrDefault(ROCKET_FLAG, 0);
+ }
+
+ public RocketMQMessageHeaderAccessor withFlag(Integer delayTimeLevel) {
+ setHeader(ROCKET_FLAG, delayTimeLevel);
+ return this;
+ }
+
+ public SendResult getSendResult() {
+ return getMessageHeaders().get(ROCKET_SEND_RESULT, SendResult.class);
+ }
+
+ public static void putSendResult(MutableMessage message, SendResult sendResult) {
+ message.getHeaders().put(ROCKET_SEND_RESULT, sendResult);
+ }
+
+ public Map getUserProperties() {
+ Map result = new HashMap<>();
+ for (Map.Entry entry : this.toMap().entrySet()) {
+ if (entry.getValue() instanceof String && !MessageConst.STRING_HASH_SET.contains(entry.getKey()) && !entry
+ .getKey().equals(MessageHeaders.CONTENT_TYPE)) {
+ result.put(entry.getKey(), (String)entry.getValue());
+ }
+ }
+ return result;
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
new file mode 100644
index 000000000..43303a0f2
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderEndpoint.java
@@ -0,0 +1,39 @@
+package org.springframework.cloud.stream.binder.rocketmq.actuator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.codahale.metrics.MetricRegistry;
+import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
+import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
+
+import static org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ENDPOINT_ID;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+@Endpoint(id = ENDPOINT_ID)
+public class RocketMQBinderEndpoint {
+
+ private MetricRegistry metricRegistry = new MetricRegistry();
+ private Map runtime = new ConcurrentHashMap<>();
+
+ @ReadOperation
+ public Map invoke() {
+ Map result = new HashMap<>();
+ result.put("metrics", metricRegistry().getMetrics());
+ result.put("runtime", runtime());
+ return result;
+ }
+
+ public MetricRegistry metricRegistry() {
+ return metricRegistry;
+ }
+
+ public Map runtime() {
+ return runtime;
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
new file mode 100644
index 000000000..7a67902d6
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/actuator/RocketMQBinderHealthIndicator.java
@@ -0,0 +1,37 @@
+package org.springframework.cloud.stream.binder.rocketmq.actuator;
+
+import org.springframework.boot.actuate.health.AbstractHealthIndicator;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQBinderHealthIndicator extends AbstractHealthIndicator {
+
+ private final InstrumentationManager instrumentationManager;
+
+ public RocketMQBinderHealthIndicator(InstrumentationManager instrumentationManager) {
+ this.instrumentationManager = instrumentationManager;
+ }
+
+ @Override
+ protected void doHealthCheck(Health.Builder builder) throws Exception {
+ if (instrumentationManager.getHealthInstrumentations().stream().
+ allMatch(Instrumentation::isUp)) {
+ builder.up();
+ return;
+ }
+ if (instrumentationManager.getHealthInstrumentations().stream().
+ allMatch(Instrumentation::isOutOfService)) {
+ builder.outOfService();
+ return;
+ }
+ builder.down();
+ instrumentationManager.getHealthInstrumentations().stream().
+ filter(instrumentation -> !instrumentation.isStarted()).
+ forEach(instrumentation1 -> builder.withException(instrumentation1.getStartException()));
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
new file mode 100644
index 000000000..e1b289661
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
@@ -0,0 +1,54 @@
+package org.springframework.cloud.stream.binder.rocketmq.config;
+
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
+import org.springframework.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+@Configuration
+@EnableConfigurationProperties({RocketMQBinderConfigurationProperties.class, RocketMQExtendedBindingProperties.class})
+public class RocketMQBinderAutoConfiguration {
+
+ private final RocketMQExtendedBindingProperties extendedBindingProperties;
+
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ @Autowired
+ public RocketMQBinderAutoConfiguration(RocketMQExtendedBindingProperties extendedBindingProperties,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
+ this.extendedBindingProperties = extendedBindingProperties;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, this.rocketBinderConfigurationProperties.getLogLevel());
+ }
+
+ @Bean
+ public RocketMQTopicProvisioner provisioningProvider() {
+ return new RocketMQTopicProvisioner();
+ }
+
+ @Bean
+ public RocketMQMessageChannelBinder rocketMessageChannelBinder(RocketMQTopicProvisioner provisioningProvider,
+ InstrumentationManager instrumentationManager,
+ ConsumersManager consumersManager) {
+ RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(consumersManager, extendedBindingProperties,
+ provisioningProvider, rocketBinderConfigurationProperties, instrumentationManager);
+ return binder;
+ }
+
+ @Bean
+ public ConsumersManager consumersManager(InstrumentationManager instrumentationManager) {
+ return new ConsumersManager(instrumentationManager, rocketBinderConfigurationProperties);
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java
new file mode 100644
index 000000000..0df4f51c5
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/config/RocketMQBinderEndpointAutoConfiguration.java
@@ -0,0 +1,33 @@
+package org.springframework.cloud.stream.binder.rocketmq.config;
+
+import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderEndpoint;
+import org.springframework.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author Jim
+ */
+@Configuration
+@AutoConfigureAfter(EndpointAutoConfiguration.class)
+public class RocketMQBinderEndpointAutoConfiguration {
+
+ @Bean
+ public RocketMQBinderEndpoint rocketBinderEndpoint() {
+ return new RocketMQBinderEndpoint();
+ }
+
+ @Bean
+ public RocketMQBinderHealthIndicator rocketBinderHealthIndicator(InstrumentationManager instrumentationManager) {
+ return new RocketMQBinderHealthIndicator(instrumentationManager);
+ }
+
+ @Bean
+ public InstrumentationManager instrumentationManager(RocketMQBinderEndpoint rocketBinderEndpoint) {
+ return new InstrumentationManager(rocketBinderEndpoint.metricRegistry(), rocketBinderEndpoint.runtime());
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java
new file mode 100644
index 000000000..f2c16ed1b
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/Acknowledgement.java
@@ -0,0 +1,65 @@
+package org.springframework.cloud.stream.binder.rocketmq.consuming;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class Acknowledgement {
+
+ /**
+ * for {@link ConsumeConcurrentlyContext} using
+ */
+ private ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ /**
+ * Message consume retry strategy
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency
+ */
+ private Integer consumeConcurrentlyDelayLevel = 0;
+
+ /**
+ * for {@link ConsumeOrderlyContext} using
+ */
+ private ConsumeOrderlyStatus consumeOrderlyStatus = ConsumeOrderlyStatus.SUCCESS;
+ private Long consumeOrderlySuspendCurrentQueueTimeMill = -1L;
+
+ public Acknowledgement setConsumeConcurrentlyStatus(ConsumeConcurrentlyStatus consumeConcurrentlyStatus) {
+ this.consumeConcurrentlyStatus = consumeConcurrentlyStatus;
+ return this;
+ }
+
+ public ConsumeConcurrentlyStatus getConsumeConcurrentlyStatus() {
+ return consumeConcurrentlyStatus;
+ }
+
+ public ConsumeOrderlyStatus getConsumeOrderlyStatus() {
+ return consumeOrderlyStatus;
+ }
+
+ public Acknowledgement setConsumeOrderlyStatus(ConsumeOrderlyStatus consumeOrderlyStatus) {
+ this.consumeOrderlyStatus = consumeOrderlyStatus;
+ return this;
+ }
+
+ public Integer getConsumeConcurrentlyDelayLevel() {
+ return consumeConcurrentlyDelayLevel;
+ }
+
+ public void setConsumeConcurrentlyDelayLevel(Integer consumeConcurrentlyDelayLevel) {
+ this.consumeConcurrentlyDelayLevel = consumeConcurrentlyDelayLevel;
+ }
+
+ public Long getConsumeOrderlySuspendCurrentQueueTimeMill() {
+ return consumeOrderlySuspendCurrentQueueTimeMill;
+ }
+
+ public void setConsumeOrderlySuspendCurrentQueueTimeMill(Long consumeOrderlySuspendCurrentQueueTimeMill) {
+ this.consumeOrderlySuspendCurrentQueueTimeMill = consumeOrderlySuspendCurrentQueueTimeMill;
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java
new file mode 100644
index 000000000..9acd6076f
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListener.java
@@ -0,0 +1,76 @@
+package org.springframework.cloud.stream.binder.rocketmq.consuming;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.StringUtils;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public abstract class CloudStreamMessageListener implements MessageListener {
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private ConsumerPropertiesWrapper consumerPropertiesWrapper;
+
+ private final InstrumentationManager instrumentationManager;
+
+ private final Consumer sendMsgAction;
+
+ CloudStreamMessageListener(InstrumentationManager instrumentationManager, Consumer sendMsgAction) {
+ this.instrumentationManager = instrumentationManager;
+ this.sendMsgAction = sendMsgAction;
+ }
+
+ public String getTagsString() {
+ return String.join(" || ", consumerPropertiesWrapper.getTagsSet());
+ }
+
+ public void setConsumerPropertiesWrapper(String group, String topic, String tags) {
+ this.consumerPropertiesWrapper = new ConsumerPropertiesWrapper(group, topic, tags);
+ }
+
+ Acknowledgement consumeMessage(final List msgs) {
+ List acknowledgements = new ArrayList<>();
+ msgs.forEach(msg -> {
+ logger.info("consuming msg:\n" + msg);
+ logger.debug("message body:\n" + new String(msg.getBody()));
+ if (consumerPropertiesWrapper != null && msg.getTopic().equals(consumerPropertiesWrapper.getTopic())) {
+ if (StringUtils.isEmpty(consumerPropertiesWrapper.getTags()) || consumerPropertiesWrapper.getTagsSet()
+ .contains(msg.getTags())) {
+ try {
+ Acknowledgement acknowledgement = new Acknowledgement();
+ Message toChannel = MessageBuilder.withPayload(msg.getBody()).
+ setHeaders(new RocketMQMessageHeaderAccessor().
+ withAcknowledgment(acknowledgement).
+ withTags(msg.getTags()).
+ withKeys(msg.getKeys()).
+ withFlag(msg.getFlag()).
+ withRocketMessage(msg)
+ ).build();
+ acknowledgements.add(acknowledgement);
+ sendMsgAction.accept(toChannel);
+ instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic())
+ .markConsumed();
+ } catch (Exception e) {
+ logger.error("Rocket Message hasn't been processed successfully. Caused by ", e);
+ instrumentationManager.getConsumerInstrumentation(consumerPropertiesWrapper.getTopic())
+ .markConsumedFailure();
+ throw e;
+ }
+ }
+ }
+ });
+ return acknowledgements.get(0);
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java
new file mode 100644
index 000000000..8c4df7d88
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerConcurrently.java
@@ -0,0 +1,33 @@
+package org.springframework.cloud.stream.binder.rocketmq.consuming;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.messaging.Message;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class CloudStreamMessageListenerConcurrently extends CloudStreamMessageListener implements
+ MessageListenerConcurrently {
+
+ public CloudStreamMessageListenerConcurrently(InstrumentationManager instrumentationManager,
+ Consumer sendMsgAction) {
+ super(instrumentationManager, sendMsgAction);
+ }
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(final List msgs,
+ ConsumeConcurrentlyContext context) {
+ Acknowledgement acknowledgement = consumeMessage(msgs);
+ context.setDelayLevelWhenNextConsume(acknowledgement.getConsumeConcurrentlyDelayLevel());
+ return acknowledgement.getConsumeConcurrentlyStatus();
+ }
+
+}
\ No newline at end of file
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java
new file mode 100644
index 000000000..c63fb9bd9
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/CloudStreamMessageListenerOrderly.java
@@ -0,0 +1,31 @@
+package org.springframework.cloud.stream.binder.rocketmq.consuming;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.messaging.Message;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class CloudStreamMessageListenerOrderly extends CloudStreamMessageListener implements MessageListenerOrderly {
+
+ public CloudStreamMessageListenerOrderly(InstrumentationManager instrumentationManager,
+ Consumer sendMsgAction) {
+ super(instrumentationManager, sendMsgAction);
+ }
+
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
+ Acknowledgement acknowledgement = consumeMessage(msgs);
+ context.setSuspendCurrentQueueTimeMillis((acknowledgement.getConsumeOrderlySuspendCurrentQueueTimeMill()));
+ return acknowledgement.getConsumeOrderlyStatus();
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java
new file mode 100644
index 000000000..0e3b9111c
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumerPropertiesWrapper.java
@@ -0,0 +1,42 @@
+package org.springframework.cloud.stream.binder.rocketmq.consuming;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+class ConsumerPropertiesWrapper {
+ private final String tags;
+ private final String group;
+ private final String topic;
+ private final Set tagsSet;
+
+ ConsumerPropertiesWrapper(String group, String topic, String tags) {
+ this.tags = tags;
+ this.group = group;
+ this.topic = topic;
+ tagsSet = tags == null ? new HashSet<>() : Arrays.stream(tags.split("\\|\\|")).map(String::trim).collect(
+ Collectors.toSet());
+ }
+
+ String getTags() {
+ return tags;
+ }
+
+ String getGroup() {
+ return group;
+ }
+
+ String getTopic() {
+ return topic;
+ }
+
+ Set getTagsSet() {
+ return tagsSet;
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java
new file mode 100644
index 000000000..3b6c26476
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/consuming/ConsumersManager.java
@@ -0,0 +1,102 @@
+package org.springframework.cloud.stream.binder.rocketmq.consuming;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerGroupInstrumentation;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class ConsumersManager {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final Map consumerGroups = new HashMap<>();
+ private final Map started = new HashMap<>();
+ private final Map, ExtendedConsumerProperties> propertiesMap
+ = new HashMap<>();
+ private final InstrumentationManager instrumentationManager;
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ public ConsumersManager(InstrumentationManager instrumentationManager,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
+ this.instrumentationManager = instrumentationManager;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ }
+
+ public synchronized DefaultMQPushConsumer getOrCreateConsumer(String group, String topic,
+ ExtendedConsumerProperties consumerProperties) {
+ propertiesMap.put(new AbstractMap.SimpleEntry<>(group, topic), consumerProperties);
+ ConsumerGroupInstrumentation instrumentation = instrumentationManager.getConsumerGroupInstrumentation(group);
+ instrumentationManager.addHealthInstrumentation(instrumentation);
+
+ if (consumerGroups.containsKey(group)) {
+ return consumerGroups.get(group);
+ }
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+ consumer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
+ consumerGroups.put(group, consumer);
+ started.put(group, false);
+ consumer.setConsumeThreadMax(consumerProperties.getConcurrency());
+ consumer.setConsumeThreadMin(consumerProperties.getConcurrency());
+ logger.info("Rocket consuming for SCS group {} created", group);
+ return consumer;
+ }
+
+ public synchronized void startConsumers() throws MQClientException {
+ for (String group : getConsumerGroups()) {
+ start(group);
+ }
+ }
+
+ public synchronized void startConsumer(String group) throws MQClientException {
+ start(group);
+ }
+
+ public synchronized void stopConsumer(String group) {
+ stop(group);
+ }
+
+ private void stop(String group) {
+ if (consumerGroups.get(group) != null) {
+ consumerGroups.get(group).shutdown();
+ started.put(group, false);
+ }
+ }
+
+ private synchronized void start(String group) throws MQClientException {
+ if (started.get(group)) {
+ return;
+ }
+ ConsumerGroupInstrumentation groupInstrumentation = instrumentationManager.getConsumerGroupInstrumentation(
+ group);
+ instrumentationManager.addHealthInstrumentation(groupInstrumentation);
+ try {
+ consumerGroups.get(group).start();
+ started.put(group, true);
+ groupInstrumentation.markStartedSuccessfully();
+ } catch (MQClientException e) {
+ groupInstrumentation.markStartFailed(e);
+ logger.error("Rocket Consumer hasn't been started. Caused by " + e.getErrorMessage(), e);
+ throw e;
+ }
+ }
+
+ public synchronized Set getConsumerGroups() {
+ return consumerGroups.keySet();
+ }
+}
+
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
new file mode 100644
index 000000000..c5494a4be
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
@@ -0,0 +1,106 @@
+package org.springframework.cloud.stream.binder.rocketmq.integration;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListener;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerConcurrently;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.CloudStreamMessageListenerOrderly;
+import org.springframework.cloud.stream.binder.rocketmq.consuming.ConsumersManager;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.ConsumerInstrumentation;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import org.springframework.integration.endpoint.MessageProducerSupport;
+import org.springframework.retry.RecoveryCallback;
+import org.springframework.retry.support.RetryTemplate;
+import org.springframework.util.StringUtils;
+
+/**
+ * @author Jim
+ */
+public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
+
+ private ConsumerInstrumentation consumerInstrumentation;
+
+ private final ExtendedConsumerProperties consumerProperties;
+
+ private final String destination;
+
+ private final String group;
+
+ private final InstrumentationManager instrumentationManager;
+
+ private final ConsumersManager consumersManager;
+
+ private RetryTemplate retryTemplate;
+
+ private RecoveryCallback extends Object> recoveryCallback;
+
+ public RocketMQInboundChannelAdapter(ConsumersManager consumersManager,
+ ExtendedConsumerProperties consumerProperties,
+ String destination, String group,
+ InstrumentationManager instrumentationManager) {
+ this.consumersManager = consumersManager;
+ this.consumerProperties = consumerProperties;
+ this.destination = destination;
+ this.group = group;
+ this.instrumentationManager = instrumentationManager;
+ }
+
+ @Override
+ protected void doStart() {
+ if (!consumerProperties.getExtension().getEnabled()) {
+ return;
+ }
+
+ String tags = consumerProperties == null ? null : consumerProperties.getExtension().getTags();
+ Boolean isOrderly = consumerProperties == null ? false : consumerProperties.getExtension().getOrderly();
+
+ DefaultMQPushConsumer consumer = consumersManager.getOrCreateConsumer(group, destination, consumerProperties);
+
+ final CloudStreamMessageListener listener = isOrderly ? new CloudStreamMessageListenerOrderly(
+ instrumentationManager, msg -> sendMessage(msg))
+ : new CloudStreamMessageListenerConcurrently(instrumentationManager, msg -> sendMessage(msg));
+
+ listener.setConsumerPropertiesWrapper(group, destination, tags);
+
+ consumerInstrumentation = instrumentationManager.getConsumerInstrumentation(destination);
+ instrumentationManager.addHealthInstrumentation(consumerInstrumentation);
+
+ try {
+ if (!StringUtils.isEmpty(consumerProperties.getExtension().getSql())) {
+ consumer.subscribe(destination, MessageSelector.bySql(consumerProperties.getExtension().getSql()));
+ } else {
+ consumer.subscribe(destination, listener.getTagsString());
+ }
+ consumerInstrumentation.markStartedSuccessfully();
+ } catch (MQClientException e) {
+ consumerInstrumentation.markStartFailed(e);
+ logger.error("Rocket Consumer hasn't been subscribed. Caused by " + e.getErrorMessage(), e);
+ throw new RuntimeException("Rocket Consumer hasn't been subscribed.", e);
+ }
+
+ consumer.registerMessageListener(listener);
+
+ try {
+ consumersManager.startConsumer(group);
+ } catch (MQClientException e) {
+ logger.error("Rocket Consumer startup failed. Caused by " + e.getErrorMessage(), e);
+ throw new RuntimeException("Rocket Consumer startup failed.", e);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ consumersManager.stopConsumer(group);
+ }
+
+ public void setRetryTemplate(RetryTemplate retryTemplate) {
+ this.retryTemplate = retryTemplate;
+ }
+
+ public void setRecoveryCallback(RecoveryCallback extends Object> recoveryCallback) {
+ this.recoveryCallback = recoveryCallback;
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
new file mode 100644
index 000000000..01bc72458
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
@@ -0,0 +1,131 @@
+package org.springframework.cloud.stream.binder.rocketmq.integration;
+
+import java.time.Instant;
+import java.util.Map;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
+import org.springframework.cloud.stream.binder.rocketmq.RocketMQMessageHeaderAccessor;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import org.springframework.cloud.stream.binder.rocketmq.metrics.ProducerInstrumentation;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
+import org.springframework.context.Lifecycle;
+import org.springframework.integration.handler.AbstractMessageHandler;
+import org.springframework.integration.support.MutableMessage;
+import org.springframework.messaging.MessagingException;
+
+/**
+ * @author Jim
+ */
+public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
+
+ private DefaultMQProducer producer;
+
+ private ProducerInstrumentation producerInstrumentation;
+
+ private final RocketMQProducerProperties producerProperties;
+
+ private final String destination;
+
+ private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ private final InstrumentationManager instrumentationManager;
+
+ protected volatile boolean running = false;
+
+ public RocketMQMessageHandler(String destination, RocketMQProducerProperties producerProperties,
+ RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
+ InstrumentationManager instrumentationManager) {
+ this.destination = destination;
+ this.producerProperties = producerProperties;
+ this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
+ this.instrumentationManager = instrumentationManager;
+ }
+
+ @Override
+ public void start() {
+ producer = new DefaultMQProducer(destination);
+
+ producerInstrumentation = instrumentationManager.getProducerInstrumentation(destination);
+ instrumentationManager.addHealthInstrumentation(producerInstrumentation);
+
+ producer.setNamesrvAddr(rocketBinderConfigurationProperties.getNamesrvAddr());
+
+ if (producerProperties.getMaxMessageSize() > 0) {
+ producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
+ }
+
+ try {
+ producer.start();
+ producerInstrumentation.markStartedSuccessfully();
+ } catch (MQClientException e) {
+ producerInstrumentation.markStartFailed(e);
+ logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage());
+ throw new MessagingException(e.getMessage(), e);
+ }
+ running = true;
+ }
+
+ @Override
+ public void stop() {
+ if (producer != null) {
+ producer.shutdown();
+ }
+ running = false;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ protected void handleMessageInternal(org.springframework.messaging.Message> message) throws Exception {
+ try {
+ Message toSend;
+ if (message.getPayload() instanceof byte[]) {
+ toSend = new Message(destination, (byte[])message.getPayload());
+ } else if (message.getPayload() instanceof String) {
+ toSend = new Message(destination, ((String)message.getPayload()).getBytes());
+ } else {
+ throw new UnsupportedOperationException(
+ "Payload class isn't supported: " + message.getPayload().getClass());
+ }
+ RocketMQMessageHeaderAccessor headerAccessor = new RocketMQMessageHeaderAccessor(message);
+ headerAccessor.setLeaveMutable(true);
+ toSend.setDelayTimeLevel(headerAccessor.getDelayTimeLevel());
+ toSend.setTags(headerAccessor.getTags());
+ toSend.setKeys(headerAccessor.getKeys());
+ toSend.setFlag(headerAccessor.getFlag());
+ for (Map.Entry entry : headerAccessor.getUserProperties().entrySet()) {
+ toSend.putUserProperty(entry.getKey(), entry.getValue());
+ }
+
+ SendResult sendRes = producer.send(toSend);
+
+ if (!sendRes.getSendStatus().equals(SendStatus.SEND_OK)) {
+ throw new MQClientException("message hasn't been sent", null);
+ }
+ if (message instanceof MutableMessage) {
+ RocketMQMessageHeaderAccessor.putSendResult((MutableMessage)message, sendRes);
+ }
+ instrumentationManager.getRuntime().put(RocketMQBinderConstants.LASTSEND_TIMESTAMP,
+ Instant.now().toEpochMilli());
+ producerInstrumentation.markSent();
+ } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException |
+ UnsupportedOperationException e) {
+ producerInstrumentation.markSentFailure();
+ logger.error("Rocket Message hasn't been sent. Caused by " + e.getMessage());
+ throw new MessagingException(e.getMessage(), e);
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java
new file mode 100644
index 000000000..5b3f9efa9
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerGroupInstrumentation.java
@@ -0,0 +1,34 @@
+package org.springframework.cloud.stream.binder.rocketmq.metrics;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class ConsumerGroupInstrumentation extends Instrumentation {
+ private MetricRegistry metricRegistry;
+
+ private AtomicBoolean delayedStart = new AtomicBoolean(false);
+
+ public ConsumerGroupInstrumentation(MetricRegistry metricRegistry, String name) {
+ super(name);
+ this.metricRegistry = metricRegistry;
+ }
+
+ public void markDelayedStart() {
+ delayedStart.set(true);
+ }
+
+ @Override
+ public boolean isUp() {
+ return started.get() || delayedStart.get();
+ }
+
+ @Override
+ public boolean isOutOfService() {
+ return !started.get() && startException == null && !delayedStart.get();
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
new file mode 100644
index 000000000..7edc2b5f7
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ConsumerInstrumentation.java
@@ -0,0 +1,37 @@
+package org.springframework.cloud.stream.binder.rocketmq.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * @author juven.xuxb
+ * @author Jim
+ */
+public class ConsumerInstrumentation extends Instrumentation {
+
+ private final Counter totalConsumed;
+ private final Counter totalConsumedFailures;
+ private final Meter consumedPerSecond;
+ private final Meter consumedFailuresPerSecond;
+
+ public ConsumerInstrumentation(MetricRegistry registry, String baseMetricName) {
+ super(baseMetricName);
+ this.totalConsumed = registry.counter(name(baseMetricName, "totalConsumed"));
+ this.consumedPerSecond = registry.meter(name(baseMetricName, "consumedPerSecond"));
+ this.totalConsumedFailures = registry.counter(name(baseMetricName, "totalConsumedFailures"));
+ this.consumedFailuresPerSecond = registry.meter(name(baseMetricName, "consumedFailuresPerSecond"));
+ }
+
+ public void markConsumed() {
+ totalConsumed.inc();
+ consumedPerSecond.mark();
+ }
+
+ public void markConsumedFailure() {
+ totalConsumedFailures.inc();
+ consumedFailuresPerSecond.mark();
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java
new file mode 100644
index 000000000..d91c15835
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/Instrumentation.java
@@ -0,0 +1,50 @@
+package org.springframework.cloud.stream.binder.rocketmq.metrics;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class Instrumentation {
+ private final String name;
+ protected final AtomicBoolean started = new AtomicBoolean(false);
+ protected Exception startException = null;
+
+ Instrumentation(String name) {
+ this.name = name;
+ }
+
+ public boolean isDown() {
+ return startException != null;
+ }
+
+ public boolean isUp() {
+ return started.get();
+ }
+
+ public boolean isOutOfService() {
+ return !started.get() && startException == null;
+ }
+
+ public void markStartedSuccessfully() {
+ started.set(true);
+ }
+
+ public void markStartFailed(Exception e) {
+ started.set(false);
+ startException = e;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isStarted() {
+ return started.get();
+ }
+
+ public Exception getStartException() {
+ return startException;
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
new file mode 100644
index 000000000..0e5d84a36
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/InstrumentationManager.java
@@ -0,0 +1,57 @@
+package org.springframework.cloud.stream.binder.rocketmq.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class InstrumentationManager {
+ private final MetricRegistry metricRegistry;
+ private final Map runtime;
+ private final Map producerInstrumentations = new HashMap<>();
+ private final Map consumeInstrumentations = new HashMap<>();
+ private final Map consumerGroupsInstrumentations = new HashMap<>();
+
+ private final Map healthInstrumentations = new HashMap<>();
+
+ public InstrumentationManager(MetricRegistry metricRegistry, Map runtime) {
+ this.metricRegistry = metricRegistry;
+ this.runtime = runtime;
+ }
+
+ public ProducerInstrumentation getProducerInstrumentation(String destination) {
+ String key = "scs-rocketmq.producer." + destination;
+ producerInstrumentations.putIfAbsent(key, new ProducerInstrumentation(metricRegistry, key));
+ return producerInstrumentations.get(key);
+ }
+
+ public ConsumerInstrumentation getConsumerInstrumentation(String destination) {
+ String key = "scs-rocketmq.consumer." + destination;
+ consumeInstrumentations.putIfAbsent(key, new ConsumerInstrumentation(metricRegistry, key));
+ return consumeInstrumentations.get(key);
+ }
+
+ public ConsumerGroupInstrumentation getConsumerGroupInstrumentation(String group) {
+ String key = "scs-rocketmq.consumerGroup." + group;
+ consumerGroupsInstrumentations.putIfAbsent(key, new ConsumerGroupInstrumentation(metricRegistry, key));
+ return consumerGroupsInstrumentations.get(key);
+ }
+
+ public Set getHealthInstrumentations() {
+ return healthInstrumentations.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toSet());
+ }
+
+ public void addHealthInstrumentation(Instrumentation instrumentation) {
+ healthInstrumentations.put(instrumentation.getName(), instrumentation);
+ }
+
+ public Map getRuntime() {
+ return runtime;
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
new file mode 100644
index 000000000..7444d9ce7
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/metrics/ProducerInstrumentation.java
@@ -0,0 +1,37 @@
+package org.springframework.cloud.stream.binder.rocketmq.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+/**
+ * @author juven.xuxb
+ * @author Jim
+ */
+public class ProducerInstrumentation extends Instrumentation {
+
+ private final Counter totalSent;
+ private final Counter totalSentFailures;
+ private final Meter sentPerSecond;
+ private final Meter sentFailuresPerSecond;
+
+ public ProducerInstrumentation(MetricRegistry registry, String baseMetricName) {
+ super(baseMetricName);
+ this.totalSent = registry.counter(name(baseMetricName, "totalSent"));
+ this.totalSentFailures = registry.counter(name(baseMetricName, "totalSentFailures"));
+ this.sentPerSecond = registry.meter(name(baseMetricName, "sentPerSecond"));
+ this.sentFailuresPerSecond = registry.meter(name(baseMetricName, "sentFailuresPerSecond"));
+ }
+
+ public void markSent() {
+ totalSent.inc();
+ sentPerSecond.mark();
+ }
+
+ public void markSentFailure() {
+ totalSentFailures.inc();
+ sentFailuresPerSecond.mark();
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
new file mode 100644
index 000000000..b960bc320
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java
@@ -0,0 +1,32 @@
+package org.springframework.cloud.stream.binder.rocketmq.properties;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
+public class RocketMQBinderConfigurationProperties {
+
+ private String namesrvAddr;
+
+ private String logLevel = "ERROR";
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public String getLogLevel() {
+ return logLevel;
+ }
+
+ public void setLogLevel(String logLevel) {
+ this.logLevel = logLevel;
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java
new file mode 100644
index 000000000..4cefbd393
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQBindingProperties.java
@@ -0,0 +1,28 @@
+package org.springframework.cloud.stream.binder.rocketmq.properties;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQBindingProperties {
+
+ private RocketMQConsumerProperties consumer = new RocketMQConsumerProperties();
+
+ private RocketMQProducerProperties producer = new RocketMQProducerProperties();
+
+ public RocketMQConsumerProperties getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(RocketMQConsumerProperties consumer) {
+ this.consumer = consumer;
+ }
+
+ public RocketMQProducerProperties getProducer() {
+ return producer;
+ }
+
+ public void setProducer(RocketMQProducerProperties producer) {
+ this.producer = producer;
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
new file mode 100644
index 000000000..410914acd
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
@@ -0,0 +1,66 @@
+package org.springframework.cloud.stream.binder.rocketmq.properties;
+
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQConsumerProperties {
+
+ /**
+ * using '||' to split tag
+ * {@link MQPushConsumer#subscribe(String, String)}
+ */
+ private String tags;
+
+ /**
+ * {@link MQPushConsumer#subscribe(String, MessageSelector)}
+ * {@link MessageSelector#bySql(String)}
+ */
+ private String sql;
+
+ /**
+ * if orderly is true, using {@link MessageListenerOrderly}
+ * else if orderly if false, using {@link MessageListenerConcurrently}
+ */
+ private Boolean orderly = false;
+
+ private Boolean enabled = true;
+
+ public String getTags() {
+ return tags;
+ }
+
+ public void setTags(String tags) {
+ this.tags = tags;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
+
+ public Boolean getOrderly() {
+ return orderly;
+ }
+
+ public void setOrderly(Boolean orderly) {
+ this.orderly = orderly;
+ }
+
+ public Boolean getEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java
new file mode 100644
index 000000000..8f3a30ef7
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQExtendedBindingProperties.java
@@ -0,0 +1,64 @@
+package org.springframework.cloud.stream.binder.rocketmq.properties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.cloud.stream.binder.ExtendedBindingProperties;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+@ConfigurationProperties("spring.cloud.stream.rocketmq")
+public class RocketMQExtendedBindingProperties implements
+ ExtendedBindingProperties {
+
+ private Map bindings = new HashMap<>();
+
+ public Map getBindings() {
+ return this.bindings;
+ }
+
+ public void setBindings(Map bindings) {
+ this.bindings = bindings;
+ }
+
+ @Override
+ public synchronized RocketMQConsumerProperties getExtendedConsumerProperties(String channelName) {
+ if (bindings.containsKey(channelName)) {
+ if (bindings.get(channelName).getConsumer() != null) {
+ return bindings.get(channelName).getConsumer();
+ } else {
+ RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
+ this.bindings.get(channelName).setConsumer(properties);
+ return properties;
+ }
+ } else {
+ RocketMQConsumerProperties properties = new RocketMQConsumerProperties();
+ RocketMQBindingProperties rbp = new RocketMQBindingProperties();
+ rbp.setConsumer(properties);
+ bindings.put(channelName, rbp);
+ return properties;
+ }
+ }
+
+ @Override
+ public synchronized RocketMQProducerProperties getExtendedProducerProperties(String channelName) {
+ if (bindings.containsKey(channelName)) {
+ if (bindings.get(channelName).getProducer() != null) {
+ return bindings.get(channelName).getProducer();
+ } else {
+ RocketMQProducerProperties properties = new RocketMQProducerProperties();
+ this.bindings.get(channelName).setProducer(properties);
+ return properties;
+ }
+ } else {
+ RocketMQProducerProperties properties = new RocketMQProducerProperties();
+ RocketMQBindingProperties rbp = new RocketMQBindingProperties();
+ rbp.setProducer(properties);
+ bindings.put(channelName, rbp);
+ return properties;
+ }
+ }
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
new file mode 100644
index 000000000..4daed981b
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/properties/RocketMQProducerProperties.java
@@ -0,0 +1,35 @@
+package org.springframework.cloud.stream.binder.rocketmq.properties;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQProducerProperties {
+
+ private Boolean enabled = true;
+
+ /**
+ * Maximum allowed message size in bytes
+ * {@link DefaultMQProducer#maxMessageSize}
+ */
+ private Integer maxMessageSize = 0;
+
+ public Boolean getEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(Boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public Integer getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+ public void setMaxMessageSize(Integer maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java
new file mode 100644
index 000000000..b6a688efe
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/provisioning/RocketMQTopicProvisioner.java
@@ -0,0 +1,89 @@
+package org.springframework.cloud.stream.binder.rocketmq.provisioning;
+
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
+import org.springframework.cloud.stream.provisioning.ConsumerDestination;
+import org.springframework.cloud.stream.provisioning.ProducerDestination;
+import org.springframework.cloud.stream.provisioning.ProvisioningException;
+import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
+
+/**
+ * @author Timur Valiev
+ * @author Jim
+ */
+public class RocketMQTopicProvisioner
+ implements
+ ProvisioningProvider,
+ ExtendedProducerProperties> {
+
+ private static final Logger logger = LoggerFactory.getLogger(RocketMQTopicProvisioner.class);
+
+ @Override
+ public ProducerDestination provisionProducerDestination(String name,
+ ExtendedProducerProperties
+ properties)
+ throws ProvisioningException {
+ checkTopic(name);
+ return new RocketProducerDestination(name);
+ }
+
+ @Override
+ public ConsumerDestination provisionConsumerDestination(String name, String group,
+ ExtendedConsumerProperties
+ properties)
+ throws ProvisioningException {
+ checkTopic(name);
+ return new RocketConsumerDestination(name);
+ }
+
+ private void checkTopic(String topic) {
+ try {
+ Validators.checkTopic(topic);
+ } catch (MQClientException e) {
+ logger.error("topic check error: " + topic, e);
+ throw new AssertionError(e); // Can't happen
+ }
+ }
+
+ private static final class RocketProducerDestination implements ProducerDestination {
+
+ private final String producerDestinationName;
+
+ RocketProducerDestination(String destinationName) {
+ this.producerDestinationName = destinationName;
+ }
+
+ @Override
+ public String getName() {
+ return producerDestinationName;
+ }
+
+ @Override
+ public String getNameForPartition(int partition) {
+ return producerDestinationName;
+ }
+
+ }
+
+ private static final class RocketConsumerDestination implements ConsumerDestination {
+
+ private final String consumerDestinationName;
+
+ RocketConsumerDestination(String consumerDestinationName) {
+ this.consumerDestinationName = consumerDestinationName;
+ }
+
+ @Override
+ public String getName() {
+ return this.consumerDestinationName;
+ }
+
+ }
+
+}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders
new file mode 100644
index 000000000..78d86cf92
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.binders
@@ -0,0 +1 @@
+rocketmq:org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration
\ No newline at end of file
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..43b85129a
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration