mappings = new HashMap<>();
+ mappings.put(
+ ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.bindings"),
+ ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.default"));
+ mappings.put(
+ ConfigurationPropertyName.of("spring.cloud.stream.rocketmq.streams"),
+ ConfigurationPropertyName
+ .of("spring.cloud.stream.rocketmq.streams.default"));
+ return mappings;
+ };
+ }
+
+ @Bean
+ public RocketMQConfigBeanPostProcessor rocketMQConfigBeanPostProcessor() {
+ return new RocketMQConfigBeanPostProcessor();
+ }
+
+
+ /**
+ * if you want to customize a bean, please use this BeanName {@code RocketMQMessageConverter.DEFAULT_NAME}.
+ */
+ @Bean(RocketMQMessageConverter.DEFAULT_NAME)
+ @ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
+ public CompositeMessageConverter rocketMQMessageConverter() {
+ return new RocketMQMessageConverter().getMessageConverter();
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java
new file mode 100644
index 000000000..b9b85db24
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/RocketMQBinderAutoConfiguration.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate;
+
+import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
+import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * issue:https://github.com/alibaba/spring-cloud-alibaba/issues/1681 .
+ *
+ * @author Timur Valiev
+ * @author Jim
+ */
+@Configuration(proxyBeanMethods = false)
+@EnableConfigurationProperties({ RocketMQExtendedBindingProperties.class,
+ RocketMQBinderConfigurationProperties.class })
+public class RocketMQBinderAutoConfiguration {
+
+ @Autowired
+ private RocketMQExtendedBindingProperties extendedBindingProperties;
+
+ @Autowired
+ private RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
+
+ @Bean
+ @ConditionalOnEnabledHealthIndicator("rocketmq")
+ @ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
+ public RocketMQBinderHealthIndicator rocketMQBinderHealthIndicator() {
+ return new RocketMQBinderHealthIndicator();
+ }
+
+ @Bean
+ public RocketMQTopicProvisioner rocketMQTopicProvisioner() {
+ return new RocketMQTopicProvisioner();
+ }
+
+ @Bean
+ public RocketMQMessageChannelBinder rocketMQMessageChannelBinder(
+ RocketMQTopicProvisioner provisioningProvider) {
+ return new RocketMQMessageChannelBinder(rocketBinderConfigurationProperties,
+ extendedBindingProperties, provisioningProvider);
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
deleted file mode 100644
index 3031c2656..000000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQBinderAutoConfiguration.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq.config;
-
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
-import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
-import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
-
-/**
- * @author Timur Valiev
- * @author Jim
- */
-@Configuration(proxyBeanMethods = false)
-@Import({ RocketMQAutoConfiguration.class,
- RocketMQBinderHealthIndicatorAutoConfiguration.class })
-@EnableConfigurationProperties({ RocketMQBinderConfigurationProperties.class,
- RocketMQExtendedBindingProperties.class })
-public class RocketMQBinderAutoConfiguration {
-
- private final RocketMQExtendedBindingProperties extendedBindingProperties;
-
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- @Autowired(required = false)
- private RocketMQProperties rocketMQProperties = new RocketMQProperties();
-
- @Autowired
- public RocketMQBinderAutoConfiguration(
- RocketMQExtendedBindingProperties extendedBindingProperties,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties) {
- this.extendedBindingProperties = extendedBindingProperties;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- }
-
- @Bean
- public RocketMQTopicProvisioner provisioningProvider() {
- return new RocketMQTopicProvisioner();
- }
-
- @Bean
- public RocketMQMessageChannelBinder rocketMessageChannelBinder(
- RocketMQTopicProvisioner provisioningProvider,
- InstrumentationManager instrumentationManager) {
- RocketMQMessageChannelBinder binder = new RocketMQMessageChannelBinder(
- provisioningProvider, extendedBindingProperties,
- rocketBinderConfigurationProperties, rocketMQProperties,
- instrumentationManager);
- binder.setExtendedBindingProperties(extendedBindingProperties);
- return binder;
- }
-
- @Bean
- public InstrumentationManager instrumentationManager() {
- return new InstrumentationManager();
- }
-
-}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java
deleted file mode 100644
index 1c3b5dc0d..000000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQBinderHealthIndicatorAutoConfiguration.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq.config;
-
-import com.alibaba.cloud.stream.binder.rocketmq.actuator.RocketMQBinderHealthIndicator;
-
-import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
-import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * @author Jim
- */
-@Configuration(proxyBeanMethods = false)
-@ConditionalOnClass(Endpoint.class)
-public class RocketMQBinderHealthIndicatorAutoConfiguration {
-
- @Bean
- @ConditionalOnEnabledHealthIndicator("rocketmq")
- public RocketMQBinderHealthIndicator rocketBinderHealthIndicator() {
- return new RocketMQBinderHealthIndicator();
- }
-
-}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java
deleted file mode 100644
index 624ecde64..000000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/config/RocketMQComponent4BinderAutoConfiguration.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq.config;
-
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
-import org.apache.rocketmq.spring.config.RocketMQConfigUtils;
-import org.apache.rocketmq.spring.config.RocketMQTransactionAnnotationProcessor;
-import org.apache.rocketmq.spring.config.TransactionHandlerRegistry;
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
-
-import org.springframework.boot.autoconfigure.AutoConfigureAfter;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.core.env.Environment;
-import org.springframework.util.StringUtils;
-
-/**
- * @author Jim
- */
-@Configuration(proxyBeanMethods = false)
-@AutoConfigureAfter(RocketMQAutoConfiguration.class)
-@ConditionalOnMissingBean(DefaultMQProducer.class)
-public class RocketMQComponent4BinderAutoConfiguration {
-
- private final Environment environment;
-
- public RocketMQComponent4BinderAutoConfiguration(Environment environment) {
- this.environment = environment;
- }
-
- @Bean
- @ConditionalOnMissingBean(DefaultMQProducer.class)
- public DefaultMQProducer defaultMQProducer() {
- DefaultMQProducer producer;
- String configNameServer = environment.resolveRequiredPlaceholders(
- "${spring.cloud.stream.rocketmq.binder.name-server:${rocketmq.producer.name-server:}}");
- String ak = environment.resolveRequiredPlaceholders(
- "${spring.cloud.stream.rocketmq.binder.access-key:${rocketmq.producer.access-key:}}");
- String sk = environment.resolveRequiredPlaceholders(
- "${spring.cloud.stream.rocketmq.binder.secret-key:${rocketmq.producer.secret-key:}}");
- if (StringUtils.hasLength(ak) && StringUtils.hasLength(sk)) {
- producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP,
- new AclClientRPCHook(new SessionCredentials(ak, sk)));
- producer.setVipChannelEnabled(false);
- }
- else {
- producer = new DefaultMQProducer(RocketMQBinderConstants.DEFAULT_GROUP);
- }
- if (!StringUtils.hasLength(configNameServer)) {
- configNameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
- }
- producer.setNamesrvAddr(configNameServer);
- return producer;
- }
-
- @Bean(destroyMethod = "destroy")
- @ConditionalOnMissingBean
- public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer,
- ObjectMapper objectMapper) {
- RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
- rocketMQTemplate.setProducer(mqProducer);
- rocketMQTemplate.setObjectMapper(objectMapper);
- return rocketMQTemplate;
- }
-
- @Bean
- @ConditionalOnBean(RocketMQTemplate.class)
- @ConditionalOnMissingBean(TransactionHandlerRegistry.class)
- public TransactionHandlerRegistry transactionHandlerRegistry(
- RocketMQTemplate template) {
- return new TransactionHandlerRegistry(template);
- }
-
- @Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESSOR_BEAN_NAME)
- @ConditionalOnBean(TransactionHandlerRegistry.class)
- public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(
- TransactionHandlerRegistry transactionHandlerRegistry) {
- return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
- }
-
-}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java
new file mode 100644
index 000000000..d0a3b88e0
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/constant/RocketMQConst.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.constant;
+
+import org.apache.rocketmq.common.message.MessageConst;
+
+/**
+ * @author zkzlx
+ */
+public class RocketMQConst extends MessageConst {
+
+ /**
+ * Default NameServer value.
+ */
+ public static final String DEFAULT_NAME_SERVER = "127.0.0.1:9876";
+
+ /**
+ * Default group for SCS RocketMQ Binder.
+ */
+ public static final String DEFAULT_GROUP = "binder_default_group_name";
+
+ /**
+ * user args for SCS RocketMQ Binder.
+ */
+ public static final String USER_TRANSACTIONAL_ARGS = "TRANSACTIONAL_ARGS";
+
+ /**
+ * It is mainly provided for conversion between rocketMq-message and Spring-message,
+ * and parameters are passed through HEADERS.
+ */
+ public static class Headers {
+
+ /**
+ * keys for SCS RocketMQ Headers.
+ */
+ public static final String KEYS = MessageConst.PROPERTY_KEYS;
+
+ /**
+ * tags for SCS RocketMQ Headers.
+ */
+ public static final String TAGS = MessageConst.PROPERTY_TAGS;
+
+ /**
+ * topic for SCS RocketMQ Headers.
+ */
+ public static final String TOPIC = "MQ_TOPIC";
+
+ /**
+ * The ID of the message.
+ */
+ public static final String MESSAGE_ID = "MQ_MESSAGE_ID";
+
+ /**
+ * The timestamp that the message producer invokes the message sending API.
+ */
+ public static final String BORN_TIMESTAMP = "MQ_BORN_TIMESTAMP";
+
+ /**
+ * The IP and port number of the message producer.
+ */
+ public static final String BORN_HOST = "MQ_BORN_HOST";
+
+ /**
+ * Message flag, MQ is not processed and is available for use by applications.
+ */
+ public static final String FLAG = "MQ_FLAG";
+
+ /**
+ * Message consumption queue ID.
+ */
+ public static final String QUEUE_ID = "MQ_QUEUE_ID";
+
+ /**
+ * Message system Flag, such as whether or not to compress, whether or not to
+ * transactional messages.
+ */
+ public static final String SYS_FLAG = "MQ_SYS_FLAG";
+
+ /**
+ * The transaction ID of the transaction message.
+ */
+ public static final String TRANSACTION_ID = "MQ_TRANSACTION_ID";
+
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
deleted file mode 100644
index 92c65b65e..000000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq.consuming;
-
-import java.util.List;
-import java.util.Objects;
-
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
-import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.MessageSelector;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.spring.annotation.ConsumeMode;
-import org.apache.rocketmq.spring.annotation.MessageModel;
-import org.apache.rocketmq.spring.annotation.SelectorType;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
-import org.apache.rocketmq.spring.support.RocketMQListenerContainer;
-import org.apache.rocketmq.spring.support.RocketMQUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
-import org.springframework.context.SmartLifecycle;
-import org.springframework.integration.support.MessageBuilder;
-import org.springframework.messaging.Message;
-import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
-
-import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES;
-
-/**
- * A class that Listen on rocketmq message.
- *
- * this class will delegate {@link RocketMQListener} to handle message
- *
- * @author Jim
- * @author Xiejiashuai
- * @see RocketMQListener
- */
-public class RocketMQListenerBindingContainer
- implements InitializingBean, RocketMQListenerContainer, SmartLifecycle {
-
- private final static Logger log = LoggerFactory
- .getLogger(RocketMQListenerBindingContainer.class);
-
- private long suspendCurrentQueueTimeMillis = 1000;
-
- /**
- * Message consume retry strategy
- * -1,no retry,put into DLQ directly
- * 0,broker control retry frequency
- * >0,client control retry frequency.
- */
- private int delayLevelWhenNextConsume = 0;
-
- private List nameServer;
-
- private String consumerGroup;
-
- private String topic;
-
- private int consumeThreadMax = 64;
-
- private String charset = "UTF-8";
-
- private RocketMQListener rocketMQListener;
-
- private RocketMQHeaderMapper headerMapper;
-
- private DefaultMQPushConsumer consumer;
-
- private boolean running;
-
- private final ExtendedConsumerProperties rocketMQConsumerProperties;
-
- private final RocketMQMessageChannelBinder rocketMQMessageChannelBinder;
-
- private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties;
-
- // The following properties came from RocketMQConsumerProperties.
- private ConsumeMode consumeMode;
-
- private SelectorType selectorType;
-
- private String selectorExpression;
-
- private MessageModel messageModel;
-
- public RocketMQListenerBindingContainer(
- ExtendedConsumerProperties rocketMQConsumerProperties,
- RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties,
- RocketMQMessageChannelBinder rocketMQMessageChannelBinder) {
- this.rocketMQConsumerProperties = rocketMQConsumerProperties;
- this.rocketBinderConfigurationProperties = rocketBinderConfigurationProperties;
- this.rocketMQMessageChannelBinder = rocketMQMessageChannelBinder;
- this.consumeMode = rocketMQConsumerProperties.getExtension().getOrderly()
- ? ConsumeMode.ORDERLY : ConsumeMode.CONCURRENTLY;
- if (!StringUtils.hasLength(rocketMQConsumerProperties.getExtension().getSql())) {
- this.selectorType = SelectorType.TAG;
- this.selectorExpression = rocketMQConsumerProperties.getExtension().getTags();
- }
- else {
- this.selectorType = SelectorType.SQL92;
- this.selectorExpression = rocketMQConsumerProperties.getExtension().getSql();
- }
- this.messageModel = rocketMQConsumerProperties.getExtension().getBroadcasting()
- ? MessageModel.BROADCASTING : MessageModel.CLUSTERING;
- }
-
- @Override
- public void setupMessageListener(RocketMQListener> rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- @Override
- public void destroy() throws Exception {
- this.setRunning(false);
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- log.info("container destroyed, {}", this.toString());
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- initRocketMQPushConsumer();
- }
-
- @Override
- public boolean isAutoStartup() {
- return true;
- }
-
- @Override
- public void stop(Runnable callback) {
- stop();
- callback.run();
- }
-
- @Override
- public void start() {
- if (this.isRunning()) {
- throw new IllegalStateException(
- "container already running. " + this.toString());
- }
-
- try {
- consumer.start();
- }
- catch (MQClientException e) {
- throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
- }
- this.setRunning(true);
-
- log.info("running container: {}", this.toString());
- }
-
- @Override
- public void stop() {
- if (this.isRunning()) {
- if (Objects.nonNull(consumer)) {
- consumer.shutdown();
- }
- setRunning(false);
- }
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- private void setRunning(boolean running) {
- this.running = running;
- }
-
- @Override
- public int getPhase() {
- return Integer.MAX_VALUE;
- }
-
- private void initRocketMQPushConsumer() throws MQClientException {
- Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");
- Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
- Assert.notNull(nameServer, "Property 'nameServer' is required");
- Assert.notNull(topic, "Property 'topic' is required");
-
- String ak = rocketBinderConfigurationProperties.getAccessKey();
- String sk = rocketBinderConfigurationProperties.getSecretKey();
- if (StringUtils.hasLength(ak) && StringUtils.hasLength(sk)) {
- RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(ak, sk));
- consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook,
- new AllocateMessageQueueAveragely(),
- rocketBinderConfigurationProperties.isEnableMsgTrace(),
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook,
- topic + "|" + UtilAll.getPid()));
- consumer.setVipChannelEnabled(false);
- }
- else {
- consumer = new DefaultMQPushConsumer(consumerGroup,
- rocketBinderConfigurationProperties.isEnableMsgTrace(),
- rocketBinderConfigurationProperties.getCustomizedTraceTopic());
- }
-
- consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
- consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
- consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
-
- switch (messageModel) {
- case BROADCASTING:
- consumer.setMessageModel(
- org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
- break;
- case CLUSTERING:
- consumer.setMessageModel(
- org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
- break;
- default:
- throw new IllegalArgumentException("Property 'messageModel' was wrong.");
- }
-
- switch (selectorType) {
- case TAG:
- consumer.subscribe(topic, selectorExpression);
- break;
- case SQL92:
- consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
- break;
- default:
- throw new IllegalArgumentException("Property 'selectorType' was wrong.");
- }
-
- switch (consumeMode) {
- case ORDERLY:
- consumer.setMessageListener(new DefaultMessageListenerOrderly());
- break;
- case CONCURRENTLY:
- consumer.setMessageListener(new DefaultMessageListenerConcurrently());
- break;
- default:
- throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
- }
-
- if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
- ((RocketMQPushConsumerLifecycleListener) rocketMQListener)
- .prepareStart(consumer);
- }
-
- }
-
- @Override
- public String toString() {
- return "RocketMQListenerBindingContainer{" + "consumerGroup='" + consumerGroup
- + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\''
- + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType
- + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel="
- + messageModel + '}';
- }
-
- public long getSuspendCurrentQueueTimeMillis() {
- return suspendCurrentQueueTimeMillis;
- }
-
- public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
- this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
- }
-
- public int getDelayLevelWhenNextConsume() {
- return delayLevelWhenNextConsume;
- }
-
- public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
- this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
- }
-
- public List getNameServer() {
- return nameServer;
- }
-
- public void setNameServer(List nameServer) {
- this.nameServer = nameServer;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getConsumeThreadMax() {
- return consumeThreadMax;
- }
-
- public void setConsumeThreadMax(int consumeThreadMax) {
- this.consumeThreadMax = consumeThreadMax;
- }
-
- public String getCharset() {
- return charset;
- }
-
- public void setCharset(String charset) {
- this.charset = charset;
- }
-
- public RocketMQListener getRocketMQListener() {
- return rocketMQListener;
- }
-
- public void setRocketMQListener(RocketMQListener rocketMQListener) {
- this.rocketMQListener = rocketMQListener;
- }
-
- public DefaultMQPushConsumer getConsumer() {
- return consumer;
- }
-
- public void setConsumer(DefaultMQPushConsumer consumer) {
- this.consumer = consumer;
- }
-
- public ExtendedConsumerProperties getRocketMQConsumerProperties() {
- return rocketMQConsumerProperties;
- }
-
- public ConsumeMode getConsumeMode() {
- return consumeMode;
- }
-
- public SelectorType getSelectorType() {
- return selectorType;
- }
-
- public String getSelectorExpression() {
- return selectorExpression;
- }
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
- public RocketMQHeaderMapper getHeaderMapper() {
- return headerMapper;
- }
-
- public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
- this.headerMapper = headerMapper;
- }
-
- /**
- * Convert rocketmq {@link MessageExt} to Spring {@link Message}.
- * @param messageExt the rocketmq message
- * @return the converted Spring {@link Message}
- */
- @SuppressWarnings("unchecked")
- private Message convertToSpringMessage(MessageExt messageExt) {
-
- // add reconsume-times header to messageExt
- int reconsumeTimes = messageExt.getReconsumeTimes();
- messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
- String.valueOf(reconsumeTimes));
- Message message = RocketMQUtil.convertToSpringMessage(messageExt);
- return MessageBuilder.fromMessage(message)
- .copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
- }
-
- public class DefaultMessageListenerConcurrently
- implements MessageListenerConcurrently {
-
- @SuppressWarnings({ "unchecked", "Duplicates" })
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List msgs,
- ConsumeConcurrentlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener.onMessage(convertToSpringMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.debug("consume {} message key:[{}] cost: {} ms",
- messageExt.getMsgId(), messageExt.getKeys(), costTime);
- }
- catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- }
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
-
- }
-
- public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
-
- @SuppressWarnings({ "unchecked", "Duplicates" })
- @Override
- public ConsumeOrderlyStatus consumeMessage(List msgs,
- ConsumeOrderlyContext context) {
- for (MessageExt messageExt : msgs) {
- log.debug("received msg: {}", messageExt);
- try {
- long now = System.currentTimeMillis();
- rocketMQListener.onMessage(convertToSpringMessage(messageExt));
- long costTime = System.currentTimeMillis() - now;
- log.info("consume {} message key:[{}] cost: {} ms",
- messageExt.getMsgId(), messageExt.getKeys(), costTime);
- }
- catch (Exception e) {
- log.warn("consume message failed. messageExt:{}", messageExt, e);
- context.setSuspendCurrentQueueTimeMillis(
- suspendCurrentQueueTimeMillis);
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
-
- return ConsumeOrderlyStatus.SUCCESS;
- }
-
- }
-
-}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java
deleted file mode 100644
index 948555924..000000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQMessageQueueChooser.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq.consuming;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.rocketmq.common.message.MessageQueue;
-
-/**
- * @author Jim
- */
-public class RocketMQMessageQueueChooser {
-
- private volatile int queueIndex = 0;
-
- private volatile List messageQueues;
-
- public MessageQueue choose() {
- return messageQueues.get(queueIndex);
- }
-
- public int requeue() {
- if (queueIndex - 1 < 0) {
- this.queueIndex = messageQueues.size() - 1;
- }
- else {
- this.queueIndex = this.queueIndex - 1;
- }
- return this.queueIndex;
- }
-
- public void increment() {
- this.queueIndex = (this.queueIndex + 1) % messageQueues.size();
- }
-
- public void reset(Set queueSet) {
- this.messageQueues = null;
- this.messageQueues = new ArrayList<>(queueSet);
- this.queueIndex = 0;
- }
-
- public List getMessageQueues() {
- return messageQueues;
- }
-
-}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
new file mode 100644
index 000000000..f69290997
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.convert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.messaging.converter.ByteArrayMessageConverter;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.converter.StringMessageConverter;
+import org.springframework.util.ClassUtils;
+
+/**
+ * The default message converter of rocketMq,its bean name is {@link #DEFAULT_NAME} .
+ *
+ * @author zkzlx
+ */
+public class RocketMQMessageConverter {
+
+ /**
+ * if you want to customize a bean, please use the BeanName.
+ */
+ public static final String DEFAULT_NAME = "com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter";
+
+ private static final boolean JACKSON_PRESENT;
+
+ private static final boolean FASTJSON_PRESENT;
+
+ static {
+ ClassLoader classLoader = RocketMQMessageConverter.class.getClassLoader();
+ JACKSON_PRESENT = ClassUtils
+ .isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader)
+ && ClassUtils.isPresent("com.fasterxml.jackson.core.JsonGenerator",
+ classLoader);
+ FASTJSON_PRESENT = ClassUtils.isPresent("com.alibaba.fastjson.JSON", classLoader)
+ && ClassUtils.isPresent(
+ "com.alibaba.fastjson.support.config.FastJsonConfig",
+ classLoader);
+ }
+
+ private CompositeMessageConverter messageConverter;
+
+ public RocketMQMessageConverter() {
+ List messageConverters = new ArrayList<>();
+ ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
+ byteArrayMessageConverter.setContentTypeResolver(null);
+ messageConverters.add(byteArrayMessageConverter);
+ messageConverters.add(new StringMessageConverter());
+ if (JACKSON_PRESENT) {
+ messageConverters.add(new MappingJackson2MessageConverter());
+ }
+ if (FASTJSON_PRESENT) {
+ try {
+ messageConverters.add((MessageConverter) ClassUtils.forName(
+ "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
+ ClassUtils.getDefaultClassLoader()).newInstance());
+ }
+ catch (ClassNotFoundException | IllegalAccessException
+ | InstantiationException ignored) {
+ // ignore this exception
+ }
+ }
+ messageConverter = new CompositeMessageConverter(messageConverters);
+ }
+
+ public CompositeMessageConverter getMessageConverter() {
+ return messageConverter;
+ }
+
+ public void setMessageConverter(CompositeMessageConverter messageConverter) {
+ this.messageConverter = messageConverter;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java
new file mode 100644
index 000000000..a90e1f21a
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQBeanContainerCache.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.custom;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import org.apache.rocketmq.client.hook.CheckForbiddenHook;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.TransactionListener;
+
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.util.StringUtils;
+
+/**
+ * Gets the beans configured in the configuration file.
+ *
+ * @author junboXiang
+ */
+public final class RocketMQBeanContainerCache {
+
+ private RocketMQBeanContainerCache() {
+ }
+
+ private static final Class>[] CLASSES = new Class[] {
+ CompositeMessageConverter.class, AllocateMessageQueueStrategy.class,
+ MessageQueueSelector.class, MessageListener.class, TransactionListener.class,
+ SendCallback.class, CheckForbiddenHook.class, SendMessageHook.class,
+ ErrorAcknowledgeHandler.class };
+
+ private static final Map BEANS_CACHE = new ConcurrentHashMap<>();
+
+ static void putBean(String beanName, Object beanObj) {
+ BEANS_CACHE.put(beanName, beanObj);
+ }
+
+ static Class>[] getClassAry() {
+ return CLASSES;
+ }
+
+ public static T getBean(String beanName, Class clazz) {
+ return getBean(beanName, clazz, null);
+ }
+
+ public static T getBean(String beanName, Class clazz, T defaultObj) {
+ if (!StringUtils.hasLength(beanName)) {
+ return defaultObj;
+ }
+ Object obj = BEANS_CACHE.get(beanName);
+ if (null == obj) {
+ return defaultObj;
+ }
+ if (clazz.isAssignableFrom(obj.getClass())) {
+ return (T) obj;
+ }
+ return defaultObj;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java
new file mode 100644
index 000000000..a83fdbe08
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/custom/RocketMQConfigBeanPostProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.custom;
+
+import java.util.stream.Stream;
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+/**
+ * find RocketMQ bean by annotations.
+ *
+ * @author junboXiang
+ *
+ */
+public class RocketMQConfigBeanPostProcessor implements BeanPostProcessor {
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName)
+ throws BeansException {
+ Stream.of(RocketMQBeanContainerCache.getClassAry()).forEach(clazz -> {
+ if (clazz.isAssignableFrom(bean.getClass())) {
+ RocketMQBeanContainerCache.putBean(beanName, bean);
+ }
+ });
+ return bean;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java
new file mode 100644
index 000000000..dfe65fd94
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/extend/ErrorAcknowledgeHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.stream.binder.rocketmq.extend;
+
+import org.springframework.integration.acks.AcknowledgmentCallback.Status;
+import org.springframework.messaging.Message;
+
+/**
+ * @author zkzlx
+ */
+public interface ErrorAcknowledgeHandler {
+
+ /**
+ * Ack state handling, including receive, reject, and retry, when a consumption
+ * exception occurs.
+ * @param message message
+ * @return see {@link Status}
+ */
+ Status handler(Message> message);
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
deleted file mode 100644
index a3b680419..000000000
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright 2013-2018 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.cloud.stream.binder.rocketmq.integration;
-
-import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
-import org.springframework.integration.endpoint.MessageProducerSupport;
-import org.springframework.integration.support.MessageBuilder;
-import org.springframework.messaging.Message;
-import org.springframework.messaging.MessagingException;
-import org.springframework.retry.RecoveryCallback;
-import org.springframework.retry.RetryCallback;
-import org.springframework.retry.RetryContext;
-import org.springframework.retry.RetryListener;
-import org.springframework.retry.support.RetryTemplate;
-import org.springframework.util.Assert;
-
-/**
- * @author Jim
- */
-public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
-
- private static final Logger log = LoggerFactory
- .getLogger(RocketMQInboundChannelAdapter.class);
-
- private RetryTemplate retryTemplate;
-
- private RecoveryCallback extends Object> recoveryCallback;
-
- private RocketMQListenerBindingContainer rocketMQListenerContainer;
-
- private final ExtendedConsumerProperties consumerProperties;
-
- private final InstrumentationManager instrumentationManager;
-
- public RocketMQInboundChannelAdapter(
- RocketMQListenerBindingContainer rocketMQListenerContainer,
- ExtendedConsumerProperties consumerProperties,
- InstrumentationManager instrumentationManager) {
- this.rocketMQListenerContainer = rocketMQListenerContainer;
- this.consumerProperties = consumerProperties;
- this.instrumentationManager = instrumentationManager;
- }
-
- @Override
- protected void onInit() {
- if (consumerProperties == null
- || !consumerProperties.getExtension().getEnabled()) {
- return;
- }
- super.onInit();
- if (this.retryTemplate != null) {
- Assert.state(getErrorChannel() == null,
- "Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
- + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
- + "send an error message when retries are exhausted");
- }
-
- BindingRocketMQListener listener = new BindingRocketMQListener();
- rocketMQListenerContainer.setRocketMQListener(listener);
-
- if (retryTemplate != null) {
- this.retryTemplate.registerListener(listener);
- }
-
- try {
- rocketMQListenerContainer.afterPropertiesSet();
-
- }
- catch (Exception e) {
- log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
- throw new IllegalArgumentException(
- "rocketMQListenerContainer init error: " + e.getMessage(), e);
- }
-
- instrumentationManager.addHealthInstrumentation(
- new Instrumentation(rocketMQListenerContainer.getTopic()
- + rocketMQListenerContainer.getConsumerGroup()));
- }
-
- @Override
- protected void doStart() {
- if (consumerProperties == null
- || !consumerProperties.getExtension().getEnabled()) {
- return;
- }
- try {
- rocketMQListenerContainer.start();
- instrumentationManager
- .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
- + rocketMQListenerContainer.getConsumerGroup())
- .markStartedSuccessfully();
- }
- catch (Exception e) {
- instrumentationManager
- .getHealthInstrumentation(rocketMQListenerContainer.getTopic()
- + rocketMQListenerContainer.getConsumerGroup())
- .markStartFailed(e);
- log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
- throw new MessagingException(MessageBuilder.withPayload(
- "RocketMQTemplate startup failed, Caused by " + e.getMessage())
- .build(), e);
- }
- }
-
- @Override
- protected void doStop() {
- rocketMQListenerContainer.stop();
- }
-
- public void setRetryTemplate(RetryTemplate retryTemplate) {
- this.retryTemplate = retryTemplate;
- }
-
- public void setRecoveryCallback(RecoveryCallback extends Object> recoveryCallback) {
- this.recoveryCallback = recoveryCallback;
- }
-
- protected class BindingRocketMQListener
- implements RocketMQListener, RetryListener {
-
- @Override
- public void onMessage(Message message) {
- boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate != null;
- if (enableRetry) {
- RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> {
- RocketMQInboundChannelAdapter.this.sendMessage(message);
- return null;
- }, (RecoveryCallback