diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml
index 6065685c5..07dd56440 100644
--- a/spring-cloud-alibaba-dependencies/pom.xml
+++ b/spring-cloud-alibaba-dependencies/pom.xml
@@ -183,6 +183,11 @@
spring-cloud-alicloud-context
${project.version}
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rocketmq
+ ${project.version}
+
@@ -220,6 +225,12 @@
${project.version}
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-rocketmq
+ ${project.version}
+
+
io.dropwizard.metrics
diff --git a/spring-cloud-starter-alibaba/pom.xml b/spring-cloud-starter-alibaba/pom.xml
index 5223b1d61..c9bb0bb31 100644
--- a/spring-cloud-starter-alibaba/pom.xml
+++ b/spring-cloud-starter-alibaba/pom.xml
@@ -14,5 +14,6 @@
spring-cloud-starter-alibaba-nacos-config
spring-cloud-starter-alibaba-nacos-discovery
spring-cloud-starter-alibaba-sentinel
+ spring-cloud-starter-stream-rocketmq
\ No newline at end of file
diff --git a/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml b/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml
new file mode 100644
index 000000000..5b8ba75eb
--- /dev/null
+++ b/spring-cloud-starter-alibaba/spring-cloud-starter-stream-rocketmq/pom.xml
@@ -0,0 +1,20 @@
+
+ 4.0.0
+
+
+ org.springframework.cloud
+ spring-cloud-starter-alibaba
+ 0.2.1.BUILD-SNAPSHOT
+
+ spring-cloud-starter-stream-rocketmq
+ Spring Cloud Starter Stream RocketMQ
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-rocketmq
+
+
+
+
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
index 4086f7725..3ee0335c2 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQBinderConstants.java
@@ -5,22 +5,22 @@ package org.springframework.cloud.stream.binder.rocketmq;
*/
public interface RocketMQBinderConstants {
- /**
- * Header key
- */
- String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKET_MESSAGE";
+ /**
+ * Header key
+ */
+ String ORIGINAL_ROCKET_MESSAGE = "ORIGINAL_ROCKETMQ_MESSAGE";
- String ROCKET_FLAG = "ROCKETMQ_FLAG";
+ String ROCKET_FLAG = "ROCKETMQ_FLAG";
- String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
+ String ROCKET_SEND_RESULT = "ROCKETMQ_SEND_RESULT";
- String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
+ String ACKNOWLEDGEMENT_KEY = "ACKNOWLEDGEMENT";
- /**
- * Instrumentation key
- */
- String LASTSEND_TIMESTAMP = "lastSend.timestamp";
+ /**
+ * Instrumentation key
+ */
+ String LASTSEND_TIMESTAMP = "lastSend.timestamp";
- String ENDPOINT_ID = "rocketmq-binder";
+ String ENDPOINT_ID = "rocketmq-binder";
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
index c4a4e8d90..8f3ddacb6 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQInboundChannelAdapter.java
@@ -43,6 +43,8 @@ import org.springframework.util.StringUtils;
*/
public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
+ private static final Logger logger = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
+
private ConsumerInstrumentation consumerInstrumentation;
private final ExtendedConsumerProperties consumerProperties;
@@ -132,7 +134,6 @@ public class RocketMQInboundChannelAdapter extends MessageProducerSupport {
}
protected class CloudStreamMessageListener implements MessageListener, RetryListener {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final InstrumentationManager instrumentationManager;
diff --git a/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java
new file mode 100644
index 000000000..9b1be23f5
--- /dev/null
+++ b/spring-cloud-stream-binder-rocketmq/src/test/java/org/springframework/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binder.rocketmq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
+import org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderEndpointAutoConfiguration;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
+
+/**
+ * @author Jim
+ */
+public class RocketMQAutoConfigurationTests {
+
+ private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
+ .withConfiguration(
+ AutoConfigurations.of(RocketMQBinderEndpointAutoConfiguration.class,
+ RocketMQBinderAutoConfiguration.class))
+ .withPropertyValues(
+ "spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876",
+ "spring.cloud.stream.bindings.output.destination=TopicOrderTest",
+ "spring.cloud.stream.bindings.output.content-type=application/json",
+ "spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
+ "spring.cloud.stream.bindings.input1.content-type=application/json",
+ "spring.cloud.stream.bindings.input1.group=test-group1",
+ "spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true",
+ "spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",
+ "spring.cloud.stream.bindings.input2.destination=TopicOrderTest",
+ "spring.cloud.stream.bindings.input2.content-type=application/json",
+ "spring.cloud.stream.bindings.input2.group=test-group2",
+ "spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false",
+ "spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tag1");
+
+ @Test
+ public void testProperties() {
+ this.contextRunner.run(context -> {
+ RocketMQBinderConfigurationProperties binderConfigurationProperties = context
+ .getBean(RocketMQBinderConfigurationProperties.class);
+ assertThat(binderConfigurationProperties.getNamesrvAddr())
+ .isEqualTo("127.0.0.1:9876");
+ RocketMQExtendedBindingProperties bindingProperties = context
+ .getBean(RocketMQExtendedBindingProperties.class);
+ assertThat(
+ bindingProperties.getExtendedConsumerProperties("input2").getTags())
+ .isEqualTo("tag1");
+ assertThat(bindingProperties.getExtendedConsumerProperties("input2")
+ .getOrderly()).isFalse();
+ assertThat(bindingProperties.getExtendedConsumerProperties("input1")
+ .getOrderly()).isTrue();
+ });
+ }
+
+}
\ No newline at end of file